概述
Redis作为内存数据库的一个典型代表,已经在很多应用场景中被使用,这里仅就Redis的pub/sub功能来说说怎样通过此功能来实现一个简单的作业调度系统。这里只是想展现一个简单的想法,所以还是有很多需要考虑的东西没有包括在这个例子中,比如错误处理,持久化等。
下面是实现上的想法
MyMaster:集群的master节点程序,负责产生作业,派发作业和获取执行结果。
MySlave:集群的计算节点程序,每个计算节点一个,负责获取作业并运行,并将结果发送会master节点。
channel CHANNEL_DISPATCH:每个slave节点订阅一个channel,比如“CHANNEL_DISPATCH_[idx或机器名]”,master会向此channel中publish被dispatch的作业。
channel CHANNEL_RESULT:用来保存作业结果的channel,master和slave共享此channel,master订阅此channel来获取作业运行结果,每个slave负责将作业执行结果发布到此channel中。
Master代码
#!/usr/bin/env python
02 # -*- coding: utf-8 -*-
03 import time
04 import threading
05 import random
06 import redis
07
08
09 REDIS_HOST = 'localhost'
10 REDIS_PORT = 6379
11 REDIS_DB = 0
12 CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
13 CHANNEL_RESULT = 'CHANNEL_RESULT'
14
15
16 class MyMaster():
17 def __init__(self):
18 pass
19
20 def start(self):
21 MyServerResultHandleThread().start()
22 MyServerDispatchThread().start()
23
24
25 class MyServerDispatchThread(threading.Thread):
26 def __init__(self):
27 threading.Thread.__init__(self)
28
29 def run(self):
30 r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
31 for i in range(1, 100):
32 channel = CHANNEL_DISPATCH + '_' +str(random.randint(1, 3))
33 print("Dispatch job %s to %s" % (str(i), channel))
34 ret = r.publish(channel, str(i))
35 if ret == 0:
36 print("Dispatch job %s failed." % str(i))
37 time.sleep(5)
38
39
40 class MyServerResultHandleThread(threading.Thread):
41 def __init__(self):
42 threading.Thread.__init__(self)
43
44 def run(self):
45 r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
46 p = r.pubsub()
47 p.subscribe(CHANNEL_RESULT)
48 for message in p.listen():
49 if message['type'] != 'message':
50 continue
51 print("Received finished job %s" % message['data'])
52
53
54 if __name__ == "__main__":
55 MyMaster().start()
56 time.sleep(10000)
说明
MyMaster类 – master主程序,用来启动dispatch和resulthandler的线程
MyServerDispatchThread类 – 派发作业线程,产生作业并派发到计算节点
MyServerResultHandleThread类 – 作业运行结果处理线程,从channel里获取作业结果并显示
Slave代码
#!/usr/bin/env python
02 # -*- coding: utf-8 -*-
03 from datetime import datetime
04 import time
05 import threading
06 import random
07 import redis
08
09
10 REDIS_HOST = 'localhost'
11 REDIS_PORT = 6379
12 REDIS_DB = 0
13 CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
14 CHANNEL_RESULT = 'CHANNEL_RESULT'
15
16
17 class MySlave():
18 def __init__(self):
19 pass
20
21 def start(self):
22 for i in range(1, 4):
23 MyJobWorkerThread(CHANNEL_DISPATCH + '_' +str(i)).start()
24
25
26 class MyJobWorkerThread(threading.Thread):
27
28 def __init__(self, channel):
29 threading.Thread.__init__(self)
30 self.channel = channel
31
32 def run(self):
33 r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
34 p = r.pubsub()
35 p.subscribe(self.channel)
36 for message in p.listen():
37 if message['type'] != 'message':
38 continue
39 print("%s: Received dispatched job %s " %(self.channel, message['data']))
40 print("%s: Run dispatched job %s " % (self.channel, message['data']))
41 time.sleep(2)
42 print("%s: Send finished job %s " % (self.channel, message['data']))
43 ret = r.publish(CHANNEL_RESULT, message['data'])
44 if ret == 0:
45 print("%s: Send finished job %s failed." %(self.channel, message['data']))
46
47
48 if __name__ == "__main__":
49 MySlave().start()
50 time.sleep(10000)
说明
MySlave类 – slave节点主程序,用来启动MyJobWorkerThread的线程
MyJobWorkerThread类 – 从channel里获取派发的作业并将运行结果发送回master
测试
首先运行MySlave来定义派发作业channel。
然后运行MyMaster派发作业并显示执行结果。
标签:__,Python,REDIS,self,Redis,调度,CHANNEL,message,channel From: https://blog.51cto.com/u_6186189/6458313