Linux
多进程tempfile读取异常,不知道原因是什么!!!!
Linux下运行一遍就知道了
from multiprocessing import Process, Pipe from tempfile import TemporaryFile import time class MultiDeal: def deal(self): # input_fn, out_put_fn, pipe, line_range=None # 统计结果用管道输出 # 行范围左闭右开 pass def __init__(self, input_fn, output_fn, proc=4, deal_func=None, retain_merged_file=True): self.input_fn = input_fn # 输入文件名 self.output_fn = output_fn # 输出文件名 self.proc = proc or 2 # 进程数 self.deal_func = deal_func or self.deal # 处理多行数据 self.retain_merged_file = retain_merged_file # 保留合并的文件 # 统计行数 self.line_num = 0 for i in open(input_fn): self.line_num += 1 # 判断是否有必要采用多进程 self.max_needed_line_num = 100 def start(self): if self.line_num <= self.max_needed_line_num or self.proc == 1: # 启用单进程 f = open(self.output_fn, mode='w+') p1, p2 = Pipe() self.deal_func(self.input_fn, f, p2) return p1.recv() else: # 分配多进程任务 one_num = self.line_num // self.proc print(one_num, '行/进程') pipes = [Pipe() for i in range(self.proc)] # 进程通信管道 temp_fs = [TemporaryFile(mode='w+') for i in range(self.proc)] # 临时文件,程序结束即自动删除 for i in range(self.proc): Process( target=self.deal_func, args=(self.input_fn, temp_fs[i], pipes[i][1], (one_num*i, one_num*(i+1) if i != self.proc - 1 else self.line_num)), daemon=True ).start() # 等待结果 data = {} for p in pipes: recv = p[0].recv() for i in recv: data[i] = data.get(i, 0) + recv[i] print('data:', data) # 合并临时文件 print('开始合并文件') if self.retain_merged_file: f = open(self.output_fn, 'w') for tf in temp_fs: tf.seek(0) d = 0 for i in tf: d += 1 f.write(i) print('读了', d, '行') tf.close() # 删除临时文件 return data def generate_test_ww_txt(): open('ww.txt', 'w').write('123\n'*int(10**7/2)) if __name__ == '__main__': fn = 'ww.txt' generate_test_ww_txt() def cal(input_fn, out_put_fn, pipe, line_range=None): print('范围:', line_range) f = open(input_fn) if type(input_fn) is str else input_fn fo = open(out_put_fn, 'w') if type(out_put_fn) is str else out_put_fn msg = {} c = 0 for index, line in enumerate(f): if (line_range and line_range[0] <= index < line_range[1]) or line_range is None: c += 1 if line != '123\n': print('异常行:', line, index) fo.write(line.strip('\n')+'54\n') print('写了', c, '行') msg = {'ccc': 1} pipe.send(msg) t0 = time.time() # cal(fn, 'out1.txt', p2) MultiDeal(fn, 'out3.txt', deal_func=cal, proc=1).start() t1 = time.time() print('单线程用时:', t1 - t0) m = MultiDeal(fn, 'out2.txt', deal_func=cal) m.start() print('多进程用时:', time.time() - t1)
标签:deal,Python,self,num,input,sb,line,fn From: https://www.cnblogs.com/roundfish/p/16892445.html