首页 > 数据库 >Python使用Redis实现一个简单作业调度系统

Python使用Redis实现一个简单作业调度系统

时间:2023-06-11 17:37:11浏览次数:64  
标签:__ Python REDIS self Redis 调度 CHANNEL message channel


      

概述

Redis作为内存数据库的一个典型代表,已经在很多应用场景中被使用,这里仅就Redis的pub/sub功能来说说怎样通过此功能来实现一个简单的作业调度系统。这里只是想展现一个简单的想法,所以还是有很多需要考虑的东西没有包括在这个例子中,比如错误处理,持久化等。

下面是实现上的想法

MyMaster:集群的master节点程序,负责产生作业,派发作业和获取执行结果。

MySlave:集群的计算节点程序,每个计算节点一个,负责获取作业并运行,并将结果发送会master节点。

channel CHANNEL_DISPATCH:每个slave节点订阅一个channel,比如“CHANNEL_DISPATCH_[idx或机器名]”,master会向此channel中publish被dispatch的作业。

channel CHANNEL_RESULT:用来保存作业结果的channel,master和slave共享此channel,master订阅此channel来获取作业运行结果,每个slave负责将作业执行结果发布到此channel中。

Master代码

	#!/usr/bin/env python
02	# -*- coding: utf-8 -*-
03	import time
04	import threading
05	import random
06	import redis
07	 
08	 
09	REDIS_HOST = 'localhost'
10	REDIS_PORT = 6379
11	REDIS_DB = 0
12	CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
13	CHANNEL_RESULT = 'CHANNEL_RESULT'
14	 
15	 
16	class MyMaster():
17	    def __init__(self):
18	        pass
19	 
20	    def start(self):
21	        MyServerResultHandleThread().start()
22	        MyServerDispatchThread().start()
23	 
24	 
25	class MyServerDispatchThread(threading.Thread):
26	    def __init__(self):
27	        threading.Thread.__init__(self)
28	 
29	    def run(self):
30	        r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
31	        for i in range(1, 100):
32	            channel = CHANNEL_DISPATCH + '_' +str(random.randint(1, 3))
33	            print("Dispatch job %s to %s" % (str(i), channel))
34	            ret = r.publish(channel, str(i))
35	            if ret == 0:
36	                print("Dispatch job %s failed." % str(i))
37	            time.sleep(5)
38	 
39	 
40	class MyServerResultHandleThread(threading.Thread):
41	    def __init__(self):
42	        threading.Thread.__init__(self)
43	 
44	    def run(self):
45	        r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
46	        p = r.pubsub()
47	        p.subscribe(CHANNEL_RESULT)
48	        for message in p.listen():
49	            if message['type'] != 'message':
50	                continue
51	            print("Received finished job %s" % message['data'])
52	 
53	 
54	if __name__ == "__main__":
55	    MyMaster().start()
56	    time.sleep(10000)

说明

MyMaster类 – master主程序,用来启动dispatch和resulthandler的线程

MyServerDispatchThread类 – 派发作业线程,产生作业并派发到计算节点

MyServerResultHandleThread类 – 作业运行结果处理线程,从channel里获取作业结果并显示

Slave代码

	#!/usr/bin/env python
02	# -*- coding: utf-8 -*-
03	from datetime import datetime
04	import time
05	import threading
06	import random
07	import redis
08	 
09	 
10	REDIS_HOST = 'localhost'
11	REDIS_PORT = 6379
12	REDIS_DB = 0
13	CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
14	CHANNEL_RESULT = 'CHANNEL_RESULT'
15	 
16	 
17	class MySlave():
18	    def __init__(self):
19	        pass
20	 
21	    def start(self):
22	        for i in range(1, 4):
23	            MyJobWorkerThread(CHANNEL_DISPATCH + '_' +str(i)).start()
24	 
25	 
26	class MyJobWorkerThread(threading.Thread):
27	 
28	    def __init__(self, channel):
29	        threading.Thread.__init__(self)
30	        self.channel = channel
31	 
32	    def run(self):
33	        r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
34	        p = r.pubsub()
35	        p.subscribe(self.channel)
36	        for message in p.listen():
37	            if message['type'] != 'message':
38	                continue
39	            print("%s: Received dispatched job %s " %(self.channel, message['data']))
40	            print("%s: Run dispatched job %s " % (self.channel, message['data']))
41	            time.sleep(2)
42	            print("%s: Send finished job %s " % (self.channel, message['data']))
43	            ret = r.publish(CHANNEL_RESULT, message['data'])
44	            if ret == 0:
45	                print("%s: Send finished job %s failed." %(self.channel, message['data']))
46	 
47	 
48	if __name__ == "__main__":
49	    MySlave().start()
50	    time.sleep(10000)

说明

MySlave类 – slave节点主程序,用来启动MyJobWorkerThread的线程

MyJobWorkerThread类 – 从channel里获取派发的作业并将运行结果发送回master

测试

首先运行MySlave来定义派发作业channel。

然后运行MyMaster派发作业并显示执行结果。

标签:__,Python,REDIS,self,Redis,调度,CHANNEL,message,channel
From: https://blog.51cto.com/u_6186189/6458313

相关文章

  • Python内存数据库/引擎(sqlite memlite pydblite)
        1初探在平时的开发工作中,我们可能会有这样的需求:我们希望有一个内存数据库或者数据引擎,用比较Pythonic的方式进行数据库的操作(比如说插入和查询)。举个具体的例子,分别向数据库db中插入两条数据,”a=1,b=1″和“a=1,b=2”,然后想查询a=1的数据可能会使用这样的语句db......
  • Python使用multiprocessing实现一个最简单的分布式作业调度系统
    介绍Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信。想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。实现Job首先创建一个Job类,为......
  • python flask 表单处理Flask-WTF
        涉及到的插件和包有Flask-WTF,WTForms。内容有表单的创建使用流程,一些最佳实践,还有在页面显示提示消息的简单方式,配合Flask内置的flash()。Flask的requset对象包含了client端发送过来的所有请求,在request.form中就有POST方法提交过来的表单数据。直接使用这些数据可以......
  • Django使用redis缓存服务器
        redis相信大家都很熟悉了,和memcached一样是一个高性能的key-value数据库,至于什么是缓存服务器,度娘都有很明白的介绍了,我在这里就不一一介绍了。那我们一般什么情况下才会使用缓存服务器呢?可不是什么情况都需要的哦,一般来说是在需要频繁对一个字段读取的时候才会需要将这......
  • python调用浏览器,实现刷网页小程序
       python打开浏览器,可以做简单的刷网页的小程序and其他有想象力的程序。不过仅供学习,勿用非法用途。python的webbrowser模块支持对浏览器进行一些操作主要有以下三个方法:webbrowser.open(url,new=0,autoraise=True)webbrowser.open_new(url)webbrowser.open_n......
  • python中文乱码问题大总结
        在运行这样类似的代码:#!/usr/bin/envpythons="中文"prints最近经常遇到这样的问题:问题一:SyntaxError:Non-ASCIIcharacter'\xe4'infileE:\coding\python\Untitled6.pyonline3,butnoencodingdeclared;seehttp://www.python.org/peps/pep-0263.......
  • Python中http请求方法库汇总
    最近在使用python做接口测试,发现python中http请求方法有许多种,今天抽点时间把相关内容整理,分享给大家,具体内容如下所示:一、python自带库----urllib2python自带库urllib2使用的比较多,简单使用如下:importurllib2response=urllib2.urlopen('http://localhost:8080/jenkins/api/jso......
  • Python爬虫
    目录PythonSpider第一章爬虫入门1.1爬虫概述1.1.1爬虫原理1.1.2爬虫分类1.1.3爬虫应用1.2爬虫流程1.2.1爬取网页1.2.2解析网页1.2.3存储数据1.3爬虫协议1.3.1Robots协议1.3.2robots.txt文件简介1.3.3robots.txt文件详解1.3.4爬虫准则1.4爬虫环境1.4.1原生Python+......
  • 实验6 turtle绘图与python库应用编程体验
    task1_1代码:fromturtleimport*defmove(x,y):'''画笔移动到坐标(x,y)处'''penup()goto(x,y)pendown()defdraw(n,size=100):'''绘制边长为size的正n变形'''foriinrange(n):......
  • Redis学习笔记4-脚本、持久化和集群 Redis学习笔记1-基础命令及数据结构: http://blog.
        Redis学习笔记4-脚本、持久化和集群Redis学习笔记1-基础命令及数据结构:http://blog.guoyb.com/2016/07/21/learn-redis-basic-commands/Redis学习笔记2-事务与过期时间:http://blog.guoyb.com/2016/08/23/learn-redis-adv/Redis学习笔记3-排序与消息通知:http://blog......