多进程实现简易版的抢票工具
实现:多个进程共享同一文件,把文件当数据库,用多个进程模拟多个人执行抢票任务
关键词:多进程,锁
1) 多进程
import json
import os
import time
from multiprocessing import Process, Lock, set_start_method
FILE_PATH = os.path.join(os.getcwd(), 'data.json')
class BuyTickets:
def search(self, name):
with open(FILE_PATH, 'r') as f:
dic = json.loads(f.read())
time.sleep(1)
print(f"{name}用户查看剩余余票数为:{dic['count']}")
def get(self, name):
with open(FILE_PATH, 'r') as f_read:
dic = json.loads(f_read.read())
if dic.get('count') > 0:
dic['count'] -= 1
time.sleep(1)
with open(FILE_PATH, 'w') as f_write:
json.dump(dic, f_write)
print(f'{name}购票成功!')
print(f'剩余票数为:{dic["count"]}')
else:
print('没票了')
def task(self, name, lock):
self.search(name)
with lock:
self.get(name)
if __name__ == '__main__':
set_start_method('fork')
_buy = BuyTickets()
mutex = Lock()
for i in range(10):
p = Process(target=_buy.task, args=("路人%s" % i, mutex))
p.start()
- 进程池
from multiprocessing import Pool,Manager
class BuyTickets:
def search(self, name):
with open(FILE_PATH, 'r') as f:
dic = json.loads(f.read())
time.sleep(1)
print(f"{name}用户查看剩余余票数为:{dic['count']}")
def get(self, name):
with open(FILE_PATH, 'r') as f_read:
dic = json.loads(f_read.read())
if dic.get('count') > 0:
dic['count'] -= 1
time.sleep(1)
with open(FILE_PATH, 'w') as f_write:
json.dump(dic, f_write)
print(f'{name}购票成功!')
print(f'剩余票数为:{dic["count"]}')
else:
print('没票了')
def task(self, name, lock):
self.search(name)
with lock:
self.get(name)
if __name__ == '__main__':
p = Pool(5)
set_start_method('forkserver', force=True)
_buy = BuyTickets()
mutex = Manager().Lock()
for i in range(10):
res = p.apply_async(_buy.task, args=("路人%s" % i, mutex))
p.close()
p.join()
标签:__,name,self,dic,print,json,进程,multiprocessing
From: https://www.cnblogs.com/hikk/p/16741991.html