首页 > 编程语言 >Python使用multiprocessing实现一个最简单的分布式作业调度系统

Python使用multiprocessing实现一个最简单的分布式作业调度系统

时间:2023-06-11 17:35:00浏览次数:47  
标签:Run Python Job Dispatch queue job Finished multiprocessing 分布式


介绍

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信。

想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。

实现

Job

首先创建一个Job类,为了测试简单,只包含一个job id属性,将来可以封装一些作业状态,作业命令,执行用户等属性。

job.py


#!/usr/bin/env python
2	# -*- coding: utf-8 -*-
3	 
4	class Job:
5	    def __init__(self, job_id):
6	        self.job_id = job_id


Master

Master用来派发作业和显示运行完成的作业信息

master.py

#!/usr/bin/env python
02	# -*- coding: utf-8 -*-
03	 
04	from Queue import Queue
05	from multiprocessing.managers import BaseManager
06	from job import Job
07	 
08	 
09	class Master:
10	 
11	    def __init__(self):
12	        # 派发出去的作业队列
13	        self.dispatched_job_queue = Queue()
14	        # 完成的作业队列
15	        self.finished_job_queue = Queue()
16	 
17	    def get_dispatched_job_queue(self):
18	        return self.dispatched_job_queue
19	 
20	    def get_finished_job_queue(self):
21	        return self.finished_job_queue
22	 
23	    def start(self):
24	        # 把派发作业队列和完成作业队列注册到网络上
25	        BaseManager.register('get_dispatched_job_queue',callable=self.get_dispatched_job_queue)
26	        BaseManager.register('get_finished_job_queue',callable=self.get_finished_job_queue)
27	 
28	        # 监听端口和启动服务
29	        manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs')
30	        manager.start()
31	 
32	        # 使用上面注册的方法获取队列
33	        dispatched_jobs = manager.get_dispatched_job_queue()
34	        finished_jobs = manager.get_finished_job_queue()
35	 
36	        # 这里一次派发10个作业,等到10个作业都运行完后,继续再派发10个作业
37	        job_id = 0
38	        while True:
39	            for i in range(0, 10):
40	                job_id = job_id + 1
41	                job = Job(job_id)
42	                print('Dispatch job: %s' % job.job_id)
43	                dispatched_jobs.put(job)
44	 
45	            while not dispatched_jobs.empty():
46	                job = finished_jobs.get(60)
47	                print('Finished Job: %s' % job.job_id)
48	 
49	        manager.shutdown()
50	 
51	if __name__ == "__main__":
52	    master = Master()
53	    master.start()

Slave

Slave用来运行master派发的作业并将结果返回

slave.py


#!/usr/bin/env python
02	# -*- coding: utf-8 -*-
03	 
04	import time
05	from Queue import Queue
06	from multiprocessing.managers import BaseManager
07	from job import Job
08	 
09	 
10	class Slave:
11	 
12	    def __init__(self):
13	        # 派发出去的作业队列
14	        self.dispatched_job_queue = Queue()
15	        # 完成的作业队列
16	        self.finished_job_queue = Queue()
17	 
18	    def start(self):
19	        # 把派发作业队列和完成作业队列注册到网络上
20	        BaseManager.register('get_dispatched_job_queue')
21	        BaseManager.register('get_finished_job_queue')
22	 
23	        # 连接master
24	        server = '127.0.0.1'
25	        print('Connect to server %s...' % server)
26	        manager = BaseManager(address=(server, 8888), authkey='jobs')
27	        manager.connect()
28	 
29	        # 使用上面注册的方法获取队列
30	        dispatched_jobs = manager.get_dispatched_job_queue()
31	        finished_jobs = manager.get_finished_job_queue()
32	 
33	        # 运行作业并返回结果,这里只是模拟作业运行,所以返回的是接收到的作业
34	        while True:
35	            job = dispatched_jobs.get(timeout=1)
36	            print('Run job: %s ' % job.job_id)
37	            time.sleep(1)
38	            finished_jobs.put(job)
39	 
40	if __name__ == "__main__":
41	    slave = Slave()
42	    slave.start()


测试

分别打开三个linux终端,第一个终端运行master,第二个和第三个终端用了运行slave,运行结果如下

master


$ python master.py
02	Dispatch job: 1
03	Dispatch job: 2
04	Dispatch job: 3
05	Dispatch job: 4
06	Dispatch job: 5
07	Dispatch job: 6
08	Dispatch job: 7
09	Dispatch job: 8
10	Dispatch job: 9
11	Dispatch job: 10
12	Finished Job: 1
13	Finished Job: 2
14	Finished Job: 3
15	Finished Job: 4
16	Finished Job: 5
17	Finished Job: 6
18	Finished Job: 7
19	Finished Job: 8
20	Finished Job: 9
21	Dispatch job: 11
22	Dispatch job: 12
23	Dispatch job: 13
24	Dispatch job: 14
25	Dispatch job: 15
26	Dispatch job: 16
27	Dispatch job: 17
28	Dispatch job: 18
29	Dispatch job: 19
30	Dispatch job: 20
31	Finished Job: 10
32	Finished Job: 11
33	Finished Job: 12
34	Finished Job: 13
35	Finished Job: 14
36	Finished Job: 15
37	Finished Job: 16
38	Finished Job: 17
39	Finished Job: 18
40	Dispatch job: 21
41	Dispatch job: 22
42	Dispatch job: 23
43	Dispatch job: 24
44	Dispatch job: 25
45	Dispatch job: 26
46	Dispatch job: 27
47	Dispatch job: 28
48	Dispatch job: 29
49	Dispatch job: 30
————————————————


slave1

	$ python slave.py
02	Connect to server 127.0.0.1...
03	Run job: 1
04	Run job: 2
05	Run job: 3
06	Run job: 5
07	Run job: 7
08	Run job: 9
09	Run job: 11
10	Run job: 13
11	Run job: 15
12	Run job: 17
13	Run job: 19
14	Run job: 21
15	Run job: 23

slave2

$ python slave.py
02	Connect to server 127.0.0.1...
03	Run job: 4
04	Run job: 6
05	Run job: 8
06	Run job: 10
07	Run job: 12
08	Run job: 14
09	Run job: 16
10	Run job: 18
11	Run job: 20
12	Run job: 22
13	Run job: 24


标签:Run,Python,Job,Dispatch,queue,job,Finished,multiprocessing,分布式
From: https://blog.51cto.com/u_6186189/6458359

相关文章

  • python flask 表单处理Flask-WTF
        涉及到的插件和包有Flask-WTF,WTForms。内容有表单的创建使用流程,一些最佳实践,还有在页面显示提示消息的简单方式,配合Flask内置的flash()。Flask的requset对象包含了client端发送过来的所有请求,在request.form中就有POST方法提交过来的表单数据。直接使用这些数据可以......
  • python调用浏览器,实现刷网页小程序
       python打开浏览器,可以做简单的刷网页的小程序and其他有想象力的程序。不过仅供学习,勿用非法用途。python的webbrowser模块支持对浏览器进行一些操作主要有以下三个方法:webbrowser.open(url,new=0,autoraise=True)webbrowser.open_new(url)webbrowser.open_n......
  • python中文乱码问题大总结
        在运行这样类似的代码:#!/usr/bin/envpythons="中文"prints最近经常遇到这样的问题:问题一:SyntaxError:Non-ASCIIcharacter'\xe4'infileE:\coding\python\Untitled6.pyonline3,butnoencodingdeclared;seehttp://www.python.org/peps/pep-0263.......
  • Python中http请求方法库汇总
    最近在使用python做接口测试,发现python中http请求方法有许多种,今天抽点时间把相关内容整理,分享给大家,具体内容如下所示:一、python自带库----urllib2python自带库urllib2使用的比较多,简单使用如下:importurllib2response=urllib2.urlopen('http://localhost:8080/jenkins/api/jso......
  • Python爬虫
    目录PythonSpider第一章爬虫入门1.1爬虫概述1.1.1爬虫原理1.1.2爬虫分类1.1.3爬虫应用1.2爬虫流程1.2.1爬取网页1.2.2解析网页1.2.3存储数据1.3爬虫协议1.3.1Robots协议1.3.2robots.txt文件简介1.3.3robots.txt文件详解1.3.4爬虫准则1.4爬虫环境1.4.1原生Python+......
  • 实验6 turtle绘图与python库应用编程体验
    task1_1代码:fromturtleimport*defmove(x,y):'''画笔移动到坐标(x,y)处'''penup()goto(x,y)pendown()defdraw(n,size=100):'''绘制边长为size的正n变形'''foriinrange(n):......
  • Python中的logging模块
    官方文档基本用法下面的代码展示了logging最基本的用法。#-*-coding:utf-8-*-importloggingimportsys#获取logger实例,如果参数为空则返回rootloggerlogger=logging.getLogger("AppName")#指定logger输出格式formatter=logging.Formatter('%(ascti......
  • Centos 7.4+ 通过anaconda 安装Python3.10
    做记录,在centos里安装3.10版本时,老是报错ssl。或者一些其他问题,做个记录吧。大概用了2天才弄好,主业不是运维所以不太了解在https://www.anaconda.com/官网下载安装,此处自己根据系统、根据版本,自己安装下载地址:https://www.anaconda.com/download#downloads安装好后condai......
  • 实验6 turtle绘图与python库应用编程体验
    实验任务1task1_1.py程序源码:1fromturtleimport*23defmove(x,y):#画笔移动到坐标(x,y)处4penup()5goto(x,y)6pendown()78defdraw(n,size=100):#绘制边长为size的正n变形9foriinrange(n):10forward(size)11......
  • python: Decorators
      #装饰器defprintpy(func):definner_func():func()print("hellopython!GeovinDu")returninner_func#@装饰器@printpydefprinthello():print("helloworld!")#调用printhello()'''De......