首页 > 编程语言 >这是一个基于threading可停止线程的有限容量有限并行度的python任务管理器

这是一个基于threading可停止线程的有限容量有限并行度的python任务管理器

时间:2023-08-29 14:35:31浏览次数:52  
标签:管理器 有限 self pid job 并行度 manager 任务 id

这是一个可停止线程的有限容量有限并行度的任务管理器

基于:GitHub - AlitaIcon/StopableThreadJob: 可停止线程任务管理器

Quick Start

基础调用与效果

import time
import datetime
from loguru import logger

from StopableThreadJob.job_manager import JobManager

if __name__ == '__main__':
    def slow_func( name):
        for i in range(5):
            logger.info(f"{name} -- {datetime.datetime.now()}")
            time.sleep(1)


    job_manager = JobManager()
    # 删除未添加任务
    job_manager.remove_job('2')
    for pid in range(6):
        logger.info(f"添加任务: {pid}")
        job_manager.add_job(target=slow_func, args=(pid,), job_id=f'{pid}')
    time.sleep(1)
    job_manager.start_job()
    # 删除已添加运行中任务
    job_manager.remove_job('1')
    # 删除已添加未运行中任务
    job_manager.remove_job('4')
    time.sleep(5)
    # 删除运行完成任务
    job_manager.remove_job('0')
    job_manager.print_current_job()
    print(job_manager.job_store)
    for i in [0, 1, 2, 4]:
        logger.info(f"添加任务: {i}")
        job_manager.add_job(target=slow_func, args=(i,), job_id=f'{i}')
    job_manager.print_current_job()
    job_manager.start_job()
    time.sleep(6)
    print(job_manager.job_store)
    job_manager.print_current_job()
    time.sleep(30)

文件job_manager

import ctypes
import threading
from loguru import logger


class TerminableThread(threading.Thread):
    """
    a thread that can be stopped by forcing an exception in the execution context
    可以通过在执行上下文中强制异常来停止的线程
    """

    def terminate(self, exception_cls, repeat_sec=2.0):
        if self.is_alive() is False:
            return True
        killer = ThreadKiller(self, exception_cls, repeat_sec=repeat_sec)
        killer.start()


class ThreadKiller(threading.Thread):
    """
    separate thread to kill TerminableThread
    单独的线程来终止可终止线程
    """

    def __init__(self, target_thread, exception_cls, repeat_sec=2.0):
        threading.Thread.__init__(self)
        self.target_thread = target_thread
        self.exception_cls = exception_cls
        self.repeat_sec = repeat_sec
        self.daemon = True

    def run(self):
        """loop raising exception incase it's caught hopefully this breaks us far out"""
        while self.target_thread.is_alive():
            ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.target_thread.ident),
                                                       ctypes.py_object(self.exception_cls))
            self.target_thread.join(self.repeat_sec)

# 自定义错误类型:
class StopRunningCommand(Exception):
    pass

class JobManagerID:
    """
    任务ID池,用于初始化任务ID列表,
    """
    def __init__(self, pool_size=5):
        self.pid_list = list(range(pool_size))

    def list_move(self):
        # 将pid_list 列表循环左移一位,既列表第一项移动至末尾
        b = self.pid_list[:1][0]
        c = self.pid_list[1:]
        c.append(b)
        self.pid_list = c

# 主要的任务调用对象类
class JobManager:

    def __init__(self, semaphore=2):
        """
        :param semaphore: 任务池中可并行的任务数
        """
        self.job_store = {}
        self.job_lock = threading.RLock()
        self.semaphore = threading.Semaphore(semaphore)

    def add_job(self, job_id, target, *args, **kwargs):
        # 新增指定ID的任务
        def inner_job(*args, **kwargs):
            try:
                self.semaphore.acquire()
                ret = target(*args, **kwargs)
                print(f"{job_id} is finished.")
                return ret
            except StopRunningCommand as e:
                print(f"{job_id} has been stopped.")
            except Exception as e:
                print(f"{job_id} is finished.")
                raise e
            finally:
                if job_id in self.job_store:
                    self.job_store.pop(job_id) # 运行完毕后在job_store中删除任务
                self.semaphore.release()

        with self.job_lock:
            t = TerminableThread(target=inner_job, *args, **kwargs)
            t.daemon = True
            # if job_id in self.job:
            #     self.job[job_id].terminate(StopRunningCommand)
            self.job_store[job_id] = t
        return self.job_store[job_id]

    def remove_job(self, job_id):
        # 删除指定ID的任务
        with self.job_lock:
            if job_id in self.job_store:
                self.job_store[job_id].terminate(StopRunningCommand)

    def start_job(self): 
        # 开始任务池中全部的任务,当任务执行较快时会出现该循环还未结束但已经有任务结束了,
        # 从而导致循环的字典发生变化导致错误
        with self.job_lock:
            for j, t in self.job_store.items():
                if t.is_alive() is False:
                    t.start()
    def start_job_id(self,pid): 
        # 指定id开始执行任务
        with self.job_lock:
            if self.job_store[pid].is_alive() is False:
                self.job_store[pid].start()

    def job_start(self,pid):
        # 返回指定id的任务当前状态,True为正在计算
        return self.job_store[pid].is_alive()
        
    def print_current_job(self):
        # 返回指定任务池中全部的任务的当前状态,True为正在计算
        info = {jid: t.is_alive() for jid, t in self.job_store.items()}
        logger.info(info)

为实现任务运行异步且可并行的效果

定义的方法函数

from StopableThreadJob.job_manager import *
job_manager = JobManager(semaphore=4) 
job_manager_list = JobManagerID(pool_size = 5) 
# pool_size 用于设置任务池容量的大小
# semaphore 用于设置并行度,既任务池中可同时计算的任务数


def job():
    def slow_func(name):
        for i in range(10):
            logger.info(f"{name} -- {datetime.datetime.now()}")
            time.sleep(1)
            
    pid = job_manager_list.pid_list[0]
    if pid in job_manager.job_store:
        job_manager.remove_job(pid)
    job_manager.add_job(target=slow_func, args=(pid,), job_id=pid)
    job_manager.start_job_id(pid)
    job_manager_list.list_move()

  • pool_size 用于设置任务池容量的大小
  • semaphore 用于设置并行度,既任务池中可同时计算的任务数

实现有限的任务池以及有限的并行度的计算模块。

当任务池满的时候新的任务会将旧的任务挤出任务池。

所有的任务都在任务池中排队,根据并行度决定同时计算的数目。

标签:管理器,有限,self,pid,job,并行度,manager,任务,id
From: https://www.cnblogs.com/aimoboshu/p/17664657.html

相关文章

  • 通过修改注册表的方式更改文件夹选项(文件资源管理器选项)
     文件夹选项(在控制面板里面叫做文件资源管理器选项)在注册表中的位置:[HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Explorer\Advanced]在这个地方下面有很多键值,我通过查资料和自己尝试的方式找到了它们所对应的设置项(有一些实在不知道是什么,只有放在那里......
  • 解决方案 | 1分钟快速解决 win10 任务管理器性能不显示GPU?
    1问题环境:win1022h2    2解决方法  win+r输入dxdiag回车,查看下面信息:(1)确认你的Windows10版本号大于1909,如果确认,在任务管理器进程页右键名称一栏,将GPU勾选上即可。如果Windows10版本过旧,更新至1909版本或以上即可。  (2)还是上面图片点击【显示】,确保此......
  • 广州耀海科技有限公司受邀参加“第一届空间、大气、海洋与环境光学(SAME2023)”
    由中国激光杂志社主办,中国科学院上海光学精密机械研究所、中国科学院合肥物质科学研究院安徽光学精密机械研究所、北京空间机电研究所、西安理工大学、浙江大学、南昌航空大学协办的“第一届空间、大气、海洋与环境光学(SAME2023)”学术会议于2023年4月7-9日在上海嘉定召开,来自全国各......
  • 广州耀海科技有限公司受邀参加第五届中国湿地遥感大会
    7月27-29日,由中国科学院烟台海岸带研究所承办的第五届中国湿地遥感大会在烟台顺利召开。本次会议以“湿地遥感与湿地修复”为主题,吸引了来自全国170多家科研院所、高校等从事湿地遥感及相关科学研究的专家、学者、学生,学术期刊的主编、编辑以及企业家代表共500余人出席会议。7月28......
  • python-上下文管理器Context
    1.什么是上下文管理器?上下文管理器是一个对象,他定义了执行with语句时要建立的上下文,上下文管理器处理进去和退出所需运行时上下文执行代码块。简单来说一个上下文管理器至少包含__enter__和__exit__两个方法,python提供了contextlib模块中的contextmanager用作装饰器并配合迭......
  • 番外1.ssh连接管理器
    目录本篇前瞻项目背景ssh连接管理器优点使用方式配置使用方法快速开始注意点使用样例本篇后记本篇前瞻学习完go语言基础的专栏,我们究竟写出怎么样的实用工具呢?我在github上开源的ssh连接管理器就是一个比较好的样例。项目背景这个项目的背景是之前我在上班时连接生产机器时只......
  • linux上SQL Server 配置管理器的使用
    概述我们知道Windows平台上的SQLServer配置管理器是一个图形工具,用于管理与SQLServer关联的服务、配置SQLServer使用的网络协议以及管理SQLServer客户端计算机的网络连接配置。我们还可以使用SQLServer配置管理器来启动、暂停、恢复或停止服务,查看服务属性或更改服务......
  • 网格布局管理器
    AWT布局管理器有五种:流布局管理器(FlowLayout)、网格布局管理器(GridLayout)、边界布局管理器(BorderLayout)、卡片布局管理器(CradLayout)、网格包布局管理器(GridBagLayout)参考:https://www.cnblogs.com/wzy330782/p/5427968.html......
  • RPM命令详解(程序包管理器)
    一:什么是RPM程序包管理器程序包管理器是一种用于管理软件包的工具,它可以方便地安装、升级、卸载和管理软件包。程序包管理器通常包含了一个软件包仓库,其中包含了大量预打包好的软件包供用户选择和安装。RPM(RedHatPackageManager)就是一种常见的程序包管理器,是由RedHa......
  • 中交中南工程局有限公司即将亮相2023国际桥梁与隧道技术大会
    第十一届国际桥梁与隧道技术大会将于9月23日-25日在成都举办,中交中南工程局有限公司将亮相本届大会,展示企业最新科技成果,敬请期待!近日,2023年《财富》世界500强排行榜揭晓,中交集团以1382.70亿美元营业收入排名第63位。中交集团已连续16年荣登该榜单,公司的品牌价值和市场影响力不断增......