如果希望在检测到文件时动态创建进程而不预先指定进程数,并确保写文件时不发生冲突,可以使用队列和锁的机制。以下是一个改进的方案:
python
from multiprocessing import Process, Queue, Lock
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
import sys
class FileProcessor(Process):
def __init__(self, queue, lock):
super().__init__()
self.queue = queue
self.lock = lock
def run(self):
while True:
path = self.queue.get()
if path is None: # 接收到结束信号
self.queue.task_done()
break
# 假设的文件处理逻辑
result = f"Processed content of {path}"
with self.lock:
with open(f"{path}.processed", "w") as f:
f.write(result)
self.queue.task_done()
class MyHandler(FileSystemEventHandler):
def __init__(self, queue):
self.queue = queue
def on_created(self, event):
if event.is_directory:
return
self.queue.put(event.src_path)
def main():
manager = Manager()
queue = manager.Queue()
lock = manager.Lock()
observer = Observer()
observer.schedule(MyHandler(queue), path='./watchdir', recursive=False)
observer.start()
try:
while True:
# 动态创建进程,但需考虑上限,这里简单示例,实际情况可能需要更复杂的管理逻辑
if queue.qsize() > 0 and len(active_processes) < MAX_PROCESSES:
p = FileProcessor(queue, lock)
p.start()
active_processes.append(p)
time.sleep(1) # 防止过于频繁的检查,可根据需要调整
except KeyboardInterrupt:
observer.stop()
print("Stopping...")
# 清理和结束进程
for _ in active_processes:
queue.put(None) # 发送结束信号
observer.join()
for p in active_processes:
p.join()
if __name__ == "__main__":
MAX_PROCESSES = 5 # 最大同时运行的进程数,根据实际情况调整
active_processes = [] # 用于追踪活动的进程
main()
我们定义了一个FileProcessor类继承自Process,这样每个处理任务都在一个独立的进程中运行。在MyHandler检测到文件创建时,将文件路径加入队列。主程序会检查队列大小以及当前活跃进程的数量,如果队列中有待处理的任务且活跃进程数未达到预设的最大值(例如MAX_PROCESSES),则创建新的FileProcessor进程。每个FileProcessor在完成任务或接收到结束信号后会自动结束,同时使用锁来保护写文件操作,防止冲突。
请注意,虽然这个方案允许动态创建进程,但实际应用中应当考虑设置合理的最大进程数限制,以避免因资源耗尽导致的系统不稳定。此外,还需要考虑如何优雅地管理和结束这些动态创建的进程,尤其是在程序退出时。