首页 > 系统相关 >python watchdog检测到文件产生,动态创建进程,不指定进程数去处理,处理过程中需要写文件,同时产生多个文件的话会同时创建多个进程去处理,怎么保存写文件不冲突

python watchdog检测到文件产生,动态创建进程,不指定进程数去处理,处理过程中需要写文件,同时产生多个文件的话会同时创建多个进程去处理,怎么保存写文件不冲突

时间:2024-06-14 18:29:59浏览次数:30  
标签:__ 文件 数去 self queue lock 进程 path

如果希望在检测到文件时动态创建进程而不预先指定进程数,并确保写文件时不发生冲突,可以使用队列和锁的机制。以下是一个改进的方案:

python

from multiprocessing import Process, Queue, Lock
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
import sys

class FileProcessor(Process):
    def __init__(self, queue, lock):
        super().__init__()
        self.queue = queue
        self.lock = lock

    def run(self):
        while True:
            path = self.queue.get()
            if path is None:  # 接收到结束信号
                self.queue.task_done()
                break
            
            # 假设的文件处理逻辑
            result = f"Processed content of {path}"
            
            with self.lock:
                with open(f"{path}.processed", "w") as f:
                    f.write(result)
            self.queue.task_done()

class MyHandler(FileSystemEventHandler):
    def __init__(self, queue):
        self.queue = queue

    def on_created(self, event):
        if event.is_directory:
            return
        self.queue.put(event.src_path)

def main():
    manager = Manager()
    queue = manager.Queue()
    lock = manager.Lock()
    
    observer = Observer()
    observer.schedule(MyHandler(queue), path='./watchdir', recursive=False)
    observer.start()
    
    try:
        while True:
            # 动态创建进程,但需考虑上限,这里简单示例,实际情况可能需要更复杂的管理逻辑
            if queue.qsize() > 0 and len(active_processes) < MAX_PROCESSES:
                p = FileProcessor(queue, lock)
                p.start()
                active_processes.append(p)
            time.sleep(1)  # 防止过于频繁的检查,可根据需要调整
    except KeyboardInterrupt:
        observer.stop()
        print("Stopping...")
    
    # 清理和结束进程
    for _ in active_processes:
        queue.put(None)  # 发送结束信号
    observer.join()
    for p in active_processes:
        p.join()

if __name__ == "__main__":
    MAX_PROCESSES = 5  # 最大同时运行的进程数,根据实际情况调整
    active_processes = []  # 用于追踪活动的进程
    main()

我们定义了一个FileProcessor类继承自Process,这样每个处理任务都在一个独立的进程中运行。在MyHandler检测到文件创建时,将文件路径加入队列。主程序会检查队列大小以及当前活跃进程的数量,如果队列中有待处理的任务且活跃进程数未达到预设的最大值(例如MAX_PROCESSES),则创建新的FileProcessor进程。每个FileProcessor在完成任务或接收到结束信号后会自动结束,同时使用锁来保护写文件操作,防止冲突。
请注意,虽然这个方案允许动态创建进程,但实际应用中应当考虑设置合理的最大进程数限制,以避免因资源耗尽导致的系统不稳定。此外,还需要考虑如何优雅地管理和结束这些动态创建的进程,尤其是在程序退出时。

标签:__,文件,数去,self,queue,lock,进程,path
From: https://blog.csdn.net/zengliguang/article/details/139610739

相关文章

  • 如何将文件从 PC 传输到 iPhone [完整指南]
    iPhone设备以其出色的功能和便携性受到用户青睐。为了确保数据安全或方便查看,将文件从PC备份到iPhone是一个明智的选择。以下是五种将文件从PC传输到iPhone的方法。 方法1-使用CoolmusteriOSAssistant传输文件CoolmusteriOSAssistant是一款强大的手机......
  • Go黑帽子|文件搜索和数据库矿工
    文件搜索filepath.Walk遍历目录,regexp.MustCompile来匹配关键字packagemainimport( "fmt" "log" "os" "path/filepath" "regexp")varregexexs=[]*regexp.Regexp{ regexp.MustCompile(`(?i)user`), regexp.MustCompil......
  • Allegro光绘Gerber文件、IPC网表、坐标文件、装配PDF文件导出打包
    Allegro光绘Gerber文件、IPC网表、坐标文件、装配PDF文件导出打包一、Gerber文件层叠与参数设置二、装配图文件设置导出三、光绘参数设置四、Gerber孔符图、钻孔表及钻孔文件输出五、输出Gerber文件六、输出IPC网表七、导出坐标文件八、文件打包一、Gerber文件层叠与......
  • Angular 集成 StreamSaver 大文件下载
    应用场景:实现目标:在网页端实现大文件(文件大小>=2G)断点续传实际方案:发送多次请求,每次请求一部分文件数据,然后通过续写将文件数据全部写入.难点:无法实现文件续写,最后采用 StreamSaver来解决这个问题. 1.首先从github将 StreamSaver拉取下来.Strea......
  • 在Linux中,如何停止正在运行的进程?
    在Linux中,停止正在运行的进程可以采取多种方法,具体取决于你希望如何控制进程以及进程的当前状态。以下是一些常用的方法:1.使用kill命令kill命令是最常用的停止进程的方法。你需要知道进程的进程ID(PID)。发送SIGTERM信号(默认):killPID这会给进程发送一个终止信号(SIGTERM),......
  • 在Linux中,如何搜索文件?
    在Linux中,搜索文件是一项常见的任务,有多种工具可以用来搜索系统中的文件。以下是一些常用的命令和方法:1.find命令find是最强大的文件搜索命令之一。它可以在指定目录及其子目录下搜索符合条件的文件。基本用法:find[搜索路径][搜索条件]-exec命令\;示例:搜索/......
  • 批量解压tar文件并删除压缩包
    有些时候,我们需要解压多个压缩包,而使用压缩工具,只能一个一个的操作,十分浪费时间,这篇博客中,我使用了python书写了一个代码,能够对一个文件夹下的多个tar格式的压缩包进行解压,同时以压缩包的名字保存压缩内容,并删除压缩包以节约空间。importosimporttarfiledefextract_to_n......
  • 前端大文件分断上传
     functionupload(){constfileInput=document.getElementById('fileInput');constfile=fileInput.files[0];constchunkSize=1024*1024;//每个分片的大小,这里设置为1MBconsttotalChunks=Math.ceil(file.size/chunkSize);//总分片数letcurrentCh......
  • 进程还在,JSF接口不干活了,这你敢信?
    1、问题背景:应用在配合R2m升级redis版本的过程中,上游反馈调用接口报错,RpcException:[Bizthreadpoolofproviderhasbeenexhausted],通过监控系统和日志系统定位到现象只出现在一两个节点,并持续出现。第一时间通过JSF将有问题的节点下线,保留现场,业务恢复。报错日志如下:24-03-......
  • Ubuntu的文件权限介绍
    Linux系统是一个多用户系统,每个用户都会创建自己的文件。为了防止其他人擅自改动他人的文件,需要拥有一套完善的文件保护机制。在Linux系统中,这种保护机制就是文件的访问权限。文件的访问权限决定了谁可以访问和如何访问特定的文件。为了便于读者理解后面的内容,下面首先介绍一......