进程间通信与线程间通信
【一】进程间通信(IPC)
进程间通信(Inter-Process Communication,IPC)是指在不同进程之间进行数据交换和信息传递的机制。在多进程系统中,不同进程可能运行在不同的地址空间,因此需要一些特殊的方法来实现它们之间的通信。
以下是一些常见的进程间通信的方法:
- 管道(Pipe):
- 管道是一种单向通信机制,用于在两个相关的进程之间传递数据。
- 分为匿名管道和命名管道。匿名管道只能用于具有亲缘关系的进程,而命名管道可以用于无关联的进程。
- 消息队列(Message Queue):
- 消息队列是一种通过消息进行通信的机制,进程可以通过消息队列向其他进程发送消息。
- 消息队列是在内核中维护的,进程可以通过消息类型进行选择性接收。
- 共享内存(Shared Memory):
- 共享内存允许多个进程访问同一块内存区域,从而实现数据共享。
- 进程可以将数据写入共享内存区域,其他进程可以读取这些数据。
- 信号量(Semaphore):
- 信号量是一种用于进程间同步和互斥的通信机制,可以用于控制对共享资源的访问。
- 信号量通常用于解决竞争条件和临界区问题。
- 套接字(Socket):
- 套接字是一种在网络编程中常见的进程间通信方式,但也可以用于同一台机器上的进程通信。
- 套接字提供了一种通过网络进行通信的标准接口,支持不同机器、不同进程之间的通信。
- 文件映射(Memory-mapped File):
- 文件映射允许多个进程共享同一文件的内容,通过将文件映射到内存中,多个进程可以直接读写内存来实现通信。
- RPC(Remote Procedure Call):
- RPC 允许一个进程调用另一个进程中的过程或函数,实现远程通信。RPC 被广泛用于分布式系统中。
- 消息传递(Message Passing):
- 消息传递是指进程之间通过直接发送和接收消息来进行通信。这可以通过消息队列、管道等实现。
【1】队列(Queue)
【1.1】队列的本质
队列的实现本质上涉及到管道(或者其他底层的通信机制)和锁的概念,尤其在多线程或多进程环境中。
- 管道(Pipe): 管道是一种通信机制,用于在进程或线程之间传递数据。在队列中,可以使用管道来实现进程或线程之间的通信,确保数据的安全传递。在多进程环境中,
multiprocessing
模块的队列实现就是基于管道的。 - 锁(Lock): 锁是一种同步机制,用于保护共享资源,防止多个线程或进程同时访问导致数据不一致或冲突的问题。在队列中,锁可以用于确保在多个线程或进程同时进行入队和出队操作时的线程安全性。
队列的实现通常会结合这两个概念,以确保数据的有序传递和线程安全。在 Python 中,queue
模块提供了线程安全的队列实现,同时使用了锁来保护队列的操作。
- 锁的目的是保护了数据的安全,但同时一定会带来执行效率降低和消耗时间增加的问题
【1.2】消息队列
- 专业的消息队列能够解决一些特殊场景的问题,很多情况下用不到
消息队列(Message Queue)是一种进程间通信的机制,它允许不同进程之间通过在消息队列中发送和接收消息来进行通信。消息队列通常在操作系统的内核中维护,进程可以通过消息队列发送消息,而其他进程则可以通过接收消息队列中的消息来获取信息。
在消息队列中,消息由发送者进程放入队列,然后由接收者进程从队列中取出。消息队列的特点包括:
- 异步通信: 发送者和接收者进程之间的通信是异步的,即发送者可以继续执行而不必等待接收者的响应。
- 解耦: 消息队列可以解耦发送者和接收者,使它们之间的通信更加灵活和独立。发送者和接收者不需要直接知道对方的存在。
- 缓冲: 消息队列可以用作缓冲区,当发送者产生消息的速度大于接收者处理消息的速度时,消息可以在队列中缓存。
- 可靠性: 消息队列通常提供一些机制来确保消息的可靠传递,例如消息确认、持久化等。
在实际应用中,消息队列广泛用于分布式系统、异步任务处理、事件驱动系统等场景。常见的消息队列系统包括 RabbitMQ、Apache Kafka、ActiveMQ 等。
-
官网【Apache Kafka】
【1.3】进程队列(Process Queue)
进程队列(Process Queue)是在多进程编程中用于进程间通信的一种机制。Python 中的 multiprocessing
模块提供了 Queue
类,它是基于管道和锁实现的,用于在多个进程之间传递数据。
【1.3.1】进程队列的常用办法
- 创建语法
# 导入queue模块
from multiprocessing import Queue
q = Queue(maxsize)
# maxsize :队列中允许最大项数,省略则无大小限制
# 其实不是无限制,但是对我们来说是个非常大的数字SEM_VALUE_MAX = 2147483647
# 这个数字好像没有什么讲究,但是+1,是2的31次方,也就是2147483648有讲究,可以查一下
'''If maxsize is <= 0, the queue size is infinite.'''
- 常用方法
-
q.put
- 用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。
- 如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常
- 如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
-
q.get
- 可以从队列读取并且删除一个元素,同样,get方法有两个可选参数:blocked和timeout。
- 如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。
- 如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
-
q.get_nowait()
- 同q.get(False)
-
q.put_nowait()
- 同q.put(False)
-
q.empty()
- 调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
-
q.full()
- 调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
-
q.qsize()
- 返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
-
- 其他方法
q.cancel_join_thread()
- 不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
q.close()
- 关闭队列,防止队列中加入更多数据。
- 调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。
- 如果q被垃圾收集,将调用此方法。
- 关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。
- 例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
q.join_thread()
- 连接队列的后台线程。
- 此方法用于在调用q.close()方法之后,等待所有队列项被消耗。
- 默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为
下面线程队列的方法与进程队列的方法基本一致
【1.3.2】代码示例
# 导入queue模块
import queue
q = queue.Queue(4)
for i in range(1, 5):
# 将i放入队列中
q.put(i)
# 查看队列的数据状态,数据已满/无数据
print(q.empty()) # False
print(q.full()) # True
# q.put_nowait(5) # queue.Full # 当队列已满后,将直接抛出异常
print(q) # <queue.Queue object at 0x0000017DDE15A6B0>
print(q.qsize()) # 4
for j in range(1, 5):
res = q.get()
print(res) # 将按照先进先出取得队列中的值 1 2 3 4
# 查看队列的数据状态,数据已满/无数据
print(q.empty()) # True
print(q.full()) # False
# print(q.get()) # 当队列中没有数值后,get将会阻塞,等待队列中的后续数据
# print(q.get(timeout=3)) # 当队列中没有数值后,get将会阻塞,传入timeout参数后,将会在等待timeout秒后抛出异常
# print(q.get_nowait()) # 当队列中没有数值后,get将不会阻塞直接抛出异常
print(q.qsize()) # 0
- 进程间通信
- 一个进程产生消息,称为生产者
- 一个进程获取消息,称为消费者
- 这就是生产者和消费者模型,详细请看本章的【5】生产者和消费者模型
from multiprocessing import Process, Queue
def task_p(q):
# 生产消息
for i in range(10):
print(f"正在生产第{i + 1}个包子")
q.put(f"第{i + 1}个包子")
def task_c(q):
# 获取信息
while True:
res = q.get()
print(f"获取了队列中的{res}")
def main():
# 创建队列对象
q = Queue()
# 主进程
# 创建生产消息的进程
p = Process(target=task_p, args=(q,))
# 启动进程
p.start()
# 创建使用消息的进程
c = Process(target=task_c, args=(q,))
# 启动进程
c.start()
if __name__ == '__main__':
main()
'''
正在生产第1个包子
正在生产第2个包子
正在生产第3个包子
获取了队列中的第1个包子
获取了队列中的第2个包子
正在生产第4个包子
正在生产第5个包子
正在生产第6个包子
正在生产第7个包子
正在生产第8个包子
正在生产第9个包子
正在生产第10个包子
获取了队列中的第3个包子
获取了队列中的第4个包子
获取了队列中的第5个包子
获取了队列中的第6个包子
获取了队列中的第7个包子
获取了队列中的第8个包子
获取了队列中的第9个包子
获取了队列中的第10个包子
...
# 当生产者结束时,消费者将会等待后续的生产
# 详细可以看后续的生产者和消费者模型
'''
# 此处为阉割版,集体的生产者和消费者模型需要平衡生产能力和消费能力,具体请看后续
【2】管道(Pipe)
- 不建议使用
管道是一种半双工的通信方式,允许一个进程向另一个进程发送数据。在 UNIX/Linux 等系统中,管道可以通过 pipe
系统调用创建。在 Windows 系统中,也有类似的管道机制。
管道的特点:
- 单向通信: 管道是单向通信的,即数据在一个方向上流动。
- 半双工: 管道是半双工的,同一时刻只能有一个方向上进行通信。
- 相关进程: 管道通常用于相关进程之间的通信,即它们有一个共同的祖先进程。
管道的应用场景包括但不限于:
- 父子进程通信: 父进程创建子进程后,它们可以通过管道进行通信。
- 并发编程: 在多线程或多进程编程中,不同的线程或进程可以通过管道进行数据交换。
- 进程间协作: 不同的进程可以通过管道协作完成某项任务。
需要注意的是,管道是一种局限性较大的 IPC 机制,适用于特定场景。在更复杂的应用中,可能会选择其他 IPC 机制,如消息队列、共享内存等。
- 在 Python 的
multiprocessing
模块中,也提供了Pipe
类来创建管道
【2.1】常用方法
-
Pipe()
:返回一个元组(conn1,conn2),表示管道两端的连接对象- 必须在产生Process对象之前产生管道
-
conn1.recv()
- 接收conn2.send(obj)发送的对象。
- 如果没有消息可接收,recv方法会一直阻塞。
- 如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
-
conn1.send(obj)
- 通过连接发送对象。obj是与序列化兼容的任意对象
-
conn1.close()
- 关闭连接。如果conn1被垃圾回收,将自动调用此方法
-
conn1.fileno()
- 返回连接使用的整数文件描述符
-
conn1.poll([timeout])
- 如果连接上的数据可用,返回True。
- timeout指定等待的最长时限。
- 如果省略此参数,方法将立即返回结果。
- 如果将timeout射成None,操作将无限期地等待数据到达。
-
conn1.recv_bytes([maxlength])
- 接收c.send_bytes()方法发送的一条完整的字节消息。
- maxlength指定要接收的最大字节数。
- 如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。
- 如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
-
conn.send_bytes(buffer [, offset [, size]])
- 通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。
- 结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收
-
conn1.recv_bytes_into(buffer [, offset])
:- 接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。
- offset指定缓冲区中放置消息处的字节位移。
- 返回值是收到的字节数。
- 如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
- 基于管道实现进程间通信(与队列的方式是类似的,队列就是管道加锁实现的)
【2.2】基本模板
'''父子进程之间的通信'''
import os
from multiprocessing import Pipe, Process
def recv_task(pipe):
l_conn, r_conn = pipe
l_conn.close()
while True:
try:
msg = r_conn.recv()
print(f"来自ppid{os.getppid()}的消息【{msg}】")
except EOFError:
r_conn.close()
break
def send_recv(pipe):
l_conn, r_conn = pipe
r_conn.close()
for i in range(1,5):
l_conn.send(i)
l_conn.close()
if __name__ == '__main__':
pipe = Pipe()
child_process = Process(target=recv_task,args=(pipe,))
child_process.start()
send_recv(pipe)
child_process.join()
print("=====")
# 来自ppid4476的消息【1】
# 来自ppid4476的消息【2】
# 来自ppid4476的消息【3】
# 来自ppid4476的消息【4】
# =====
【2.2.1】注意事项
- 生产者和消费者都没有使用管道的某个端点,就应该将其关闭,
- 如在生产者中关闭管道的右端,在消费者中关闭管道的左端。
- 如果忘记执行这些步骤,程序可能再消费者中的recv()操作上挂起。
- 管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生产EOFError异常。
- 因此在生产者中关闭管道不会有任何效果,除非消费者中也关闭了相同的管道端点。
【二】线程间通信
-
线程间通信是多线程编程中一个重要的概念,它涉及到不同线程之间的信息传递、同步和协作。
-
线程中通信方法大致有如下三种:
-
threading.Event
:事件event.clear()
:重置event,使得所有该event事件都处于待命状态event.set()
:等待接收event的指令,决定是否阻塞程序执event.wait()
:发送event指令,使所有设置该event事件的线程执行
-
threading.Condition
:条件cond.wait()
:等待指定触发,同时会释放对锁的获取,直到被notify才重新占有琐。- 设置条件满足后执行,与lock一样,可以通过
with cond
实现自动加锁解锁cond.acquire()
:类似lock.acquire()cond.release()
:类似lock.release()
cond.notify()
:发送指定,触发执行
-
queue.Queue
:队列q.put
q.get
-
【1】线程队列(Thread Queue)
- 操作线程队列的方法与进程队列基本一致,写下来会有些许繁琐,我直接偷懒,你看上面的常用方法吧
线程队列的特点包括:
- 线程安全: 线程队列是线程安全的,多个线程可以同时读写队列而不会发生冲突。
- 基于锁实现: 线程队列的实现基于锁,确保数据的安全传递。
- 阻塞与非阻塞:
put
和get
操作默认是阻塞的,可以通过设置block=False
参数实现非阻塞操作。
线程队列适用于在多个线程之间传递数据的场景,是一种常用的线程间通信方式。需要注意的是,在多线程编程中,使用线程队列可以避免竞争条件和数据不一致的问题。
【1.1】线程队列的三种类型
-
Queue
:先进先出 -
LifoQueue
:后进先出(last in first out) -
PriorityQueue
:可以设置优先级
【1.1.1】Queue
:先进先出
'''
Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
'''
from queue import Queue
q1 = Queue()
q1.put(1)
q1.put(2)
q1.put(3)
print(q1.get()) # 1
print(q1.get()) # 2
print(q1.get()) # 3
- 后面两个类,都是继承Queue类,只是多了一些其他的功能
【1.1.2】LifoQueue
:后进先出
'''Variant of Queue that retrieves most recently added entries first.'''
from queue import LifoQueue
q2 = LifoQueue()
q2.put(1)
q2.put(2)
q2.put(3)
print(q2.get()) # 3
print(q2.get()) # 2
print(q2.get()) # 1
【1.1.3】PriorityQueue
:设置优先级
'''
Variant of Queue that retrieves open entries in priority order (lowest first).
Entries are typically tuples of the form: (priority number, data).
'''
from queue import PriorityQueue
q3 = PriorityQueue()
'''优先级队列的put有些不一样,第一个值为优先级排序的值,数值越小,优先级越高,越先取出'''
# 返回值为元组
q3.put((10, 'a'))
q3.put((20, 'b'))
q3.put((30, 'c'))
q3.put((90, 'd'))
q3.put((50, 'e'))
q3.put((30, 'z'))
print(q3.get()) # (10, 'a')
print(q3.get()) # (20, 'b')
print(q3.get()) # (30, 'c')
print(q3.get()) # (30, 'z')
print(q3.get()) # (50, 'e')
print(q3.get()) # (90, 'd')
【2】事件
【2.1】事件(Event)
事件(Event)是一种线程间通信的同步机制,它提供了一种线程间的触发和等待的机制。在多线程编程中,一个线程通常需要等待另一个线程发生某个事件,或者通知其他线程某个事件已经发生。
事件有两个基本操作:
- 设置(Set): 将事件的状态设置为"已发生",唤醒等待该事件的线程。
- 清除(Clear): 将事件的状态设置为"未发生",使线程等待该事件。
线程可以通过等待事件的发生来进入阻塞状态,直到其他线程将事件设置为"已发生"。一旦事件被设置,所有等待该事件的线程都将被唤醒。
事件的使用场景包括但不限于:
- 线程间的协同工作,其中一个线程需要等待其他线程完成某个任务后再继续执行。
- 用于发信号,通知其他线程某个条件已经满足。
- 控制线程的启动和停止。
需要注意的是,事件是一次性的,即一旦事件被设置,再次等待该事件的线程将无法再次被唤醒。如果需要多次使用的事件,可以考虑使用 threading.Condition
。
【2.2】代码演示
import threading
import time
# 创建事件
event = threading.Event()
def worker():
print(f"Thread {threading.current_thread().name} is waiting for the event.")
event.wait() # 阻塞等待事件发生
print(f"Thread {threading.current_thread().name} has received the event.")
# 创建多个线程
threads = [threading.Thread(target=worker) for i in range(3)]
# 启动线程
for thread in threads:
thread.start()
# 主线程等待一段时间后设置事件
time.sleep(2)
print("Setting the event.")
event.set() # 设置事件
# 等待所有线程完成
for thread in threads:
thread.join()
【2.3】捉迷藏小游戏
'''阻塞任务,知道条件符合就继续执行任务'''
import random
import threading
import time
event = threading.Event()
def hider():
# 老鼠
print(f"{threading.current_thread().name}:我正在躲藏!")
time.sleep(0.1)
print(f"{threading.current_thread().name}:我藏好了!")
event.set()
time.sleep(0.3)
event.wait()
time.sleep(0.1)
print("====游戏结束====")
def seeker():
# 猫
print(f"{threading.current_thread().name}:捉迷藏游戏开始喽!")
event.wait()
print(f"{threading.current_thread().name}:我正在抓捕!")
time.sleep(0.3)
res = bool(random.randint(0, 1))
if res:
print(f"{threading.current_thread().name}:抓到你啦!")
else:
print(f"{threading.current_thread().name}:没有找到你!")
event.set()
if __name__ == '__main__':
hider_one = threading.Thread(target=hider)
seeker_one = threading.Thread(target=seeker)
seeker_one.start()
time.sleep(0.1)
hider_one.start()
hider_one.join()
seeker_one.join()
# Thread-2 (seeker):捉迷藏游戏开始喽!
# Thread-1 (hider):我正在躲藏!
# Thread-1 (hider):我藏好了!
# Thread-2 (seeker):我正在抓捕!
# Thread-2 (seeker):没有找到你!
# ====游戏结束====
【三】生产者和消费者模型
生产者和消费者模型是一种常见的并发编程模型,用于解决多线程或多进程之间的协作问题。在这个模型中,有两类角色:生产者和消费者。
- 生产者(Producer): 负责生成数据或任务,并将其放入共享的数据结构(如队列)中。
- 消费者(Consumer): 负责从共享的数据结构中取出数据或任务,并进行相应的处理。
这种模型的主要目的是实现生产者和消费者之间的解耦,使它们能够独立执行,从而提高系统的效率和可维护性。
- 进程或线程都可以,以下案例没有统一使用哪一种,突发奇想 想用线程了我就用了线程
- 只是一种编程的模型
【1】基础版(消费者能力强,会卡住)
from multiprocessing import Process, Queue
def task_producer(q):
# 生产者
for i in range(10):
print(f"正在生产第{i + 1}个包子")
q.put(f"第{i + 1}个包子")
def task_consumer(q):
# 获取信息
while True:
res = q.get()
print(f"获取了队列中的{res}")
def main():
# 创建队列对象
q = Queue()
# 主进程
# 创建生产消息的进程
producer = Process(target=task_producer, args=(q,))
# 启动进程
producer.start()
# 创建使用消息的进程
consumer = Process(target=task_consumer, args=(q,))
# 启动进程
consumer.start()
if __name__ == '__main__':
main()
【2】设置标志位
【2.1】标志位
- 添加None等特定内容作为标志,当消费者取到标志位时,意味着生产结束
from multiprocessing import Process, Queue
def task_producer(q):
# 生产者
for i in range(10):
print(f"正在生产第{i + 1}个包子")
q.put(f"第{i + 1}个包子")
# 当生产者结束后,放入None告知消费者
q.put(None)
def task_consumer(q):
# 获取信息
while True:
res = q.get()
if not res:
# 当获取到None时,意味着生产结束了
break
print(f"获取了队列中的{res}")
def main():
# 创建队列对象
q = Queue()
# 主进程
# 创建生产消息的进程
producer = Process(target=task_producer, args=(q,))
# 启动进程
producer.start()
'''为进程添加阻塞,当生产完毕后,消费者再启动进程'''
producer.join()
# 创建使用消息的进程
consumer = Process(target=task_consumer, args=(q,))
# 启动进程
consumer.start()
if __name__ == '__main__':
main()
【2.2】JoinableQueue
队列
mutiprocessing
提供了JoinableQueue
队列类- 其中有方法
join()
和task_done()
task_done()
: 用于标记一个任务已经被处理。每次从队列中取出一个项目后,需要调用一次task_done()
。join()
: 阻塞调用,等待队列中的所有项目都被处理。必须在所有的put()
和task_done()
操作完成后调用,以确保主进程在队列中的所有任务都被处理完毕。
- 本质也是标志位,不过不需要通过if判断
'''使用JoinableQueue需要注意join的使用'''
from multiprocessing import Process, JoinableQueue
def task_producer(q):
# 生产者
for i in range(10):
print(f"正在生产第{i + 1}个包子")
q.put(f"第{i + 1}个包子")
def task_consumer(q):
# 获取信息
while True:
res = q.get()
print(f"获取了队列中的{res}")
# 通过使用task_done来告知队列完成了一个任务了
print(f"当前队列中的剩余任务数{q.qsize()}")
q.task_done()
def main():
# 创建队列对象
q = JoinableQueue()
# 主进程
# 创建生产消息的进程
producer = Process(target=task_producer, args=(q,))
# 启动进程
producer.start()
# 创建使用消息的进程
consumer = Process(target=task_consumer, args=(q,), daemon=True)
# 当生产者消费生产完毕后再执行消费者
producer.join()
# 启动进程
consumer.start()
# 设置join,确保队列中的数据已经结束
q.join()
if __name__ == '__main__':
main()
【3】思考
-
目前未设置队列的大小,如果设置了,该如何告知生产者结束生产
-
未设置延迟,模拟实际情况下的延迟,会出现什么变化
-
当出现生产者数量多,消费者数量少,该如何平衡——队列积压
-
当出现消费者数量多,生产者数量少,该如何平衡——队列空闲(大概是这个名词?)
【3.1】尝试
- 调整速率,如果队列积压,生产者的速率降一下
- 如果队列空闲,生产的速率加一下或者消费者速率降一下
- 调整队列大小
【1.0】一直在循环
- 我不太确定这样是否可以解决,算是我目前的水平可以实现的一种
- 但这样其实就是一直在循环,如果消费者走了,我应该怎么结束
'''生产者数量多,消费者数量少'''
import time
from multiprocessing import Process, JoinableQueue
def task_producer(q):
# 生产者
while True:
print(f"正在生产包子")
q.put(f"包子")
def task_consumer(q):
# 获取信息
while True:
time.sleep(2) # 等待生产者启动2秒后再启动,防止还没生产出来5个就拿走了
res = q.get()
print(f"获取了队列中的{res}")
def main():
# 创建队列对象
q = JoinableQueue(5) # 设置缓冲区,最少得有5个内容,其余,当消费者拿走了一个,再生产一个
# 主进程
# 创建生产消息的进程
producer = Process(target=task_producer, args=(q,))
# 创建使用消息的进程
consumer = Process(target=task_consumer, args=(q,), daemon=True)
# 启动进程
producer.start()
# 启动进程
consumer.start()
consumer.join()
if __name__ == '__main__':
main()
【2.0】优化版:做了if条件判断
- 问题:只有队列慢了才开始取数据
import time
from threading import Thread
from queue import Queue
count = 0
def task_producer(q):
# 产生任务
while True:
global count
count += 1
print(f"生产了包子{count}")
q.put(f"包子{count}")
time.sleep(1)
def task_consumer(q):
time.sleep(1)
# 执行任务
for i in range(10):
print(q.get())
def main():
# 主进程
q = Queue(5)
# 当队列中空置了的时候,生产任务
# 当队列中慢了的时候,执行任务
# 如何在消费者走了以后,停止生产
while True:
if not q.qsize():
p = Thread(target=task_producer, args=(q,), daemon=True)
p.start()
time.sleep(0.5)
continue
elif q.full():
c = Thread(target=task_consumer, args=(q,))
c.start()
c.join()
break
if __name__ == '__main__':
main()
print("==========")
【3.0】优化版:多生产者和多消费者,枷锁
- 因为如果线程速度过快,会导致三个人吃了同一个包子,所以加了个锁
import time
from threading import Thread, Lock
from queue import Queue
count = 0
def task_producer(q):
# 产生任务
while True:
global count
count += 1
print(f"生产了包子{count}")
q.put(f"包子{count}")
time.sleep(0.5)
def task_consumer(q, name, lock):
# 执行任务
for i in range(20):
time.sleep(0.1)
# 当休眠时间过久,取出的速度跟不上生产的速度将会出现,生产到了150个,才消费了50多个
# 休眠时间差不多时,将会减少浪费
with lock:
print(f"{name}吃了{q.get()}")
def main():
# 主进程
q = Queue(100)
lock = Lock()
# 当队列中空置了的时候,生产任务
# 当队列中慢了的时候,执行任务
# 如何在消费者走了以后,停止生产
while True:
if not q.qsize():
p = Thread(target=task_producer, args=(q,), daemon=True)
p1 = Thread(target=task_producer, args=(q,), daemon=True)
p2 = Thread(target=task_producer, args=(q,), daemon=True)
p.start()
p1.start()
p2.start()
continue
elif not q.empty():
c = Thread(target=task_consumer, args=(q, 'user', lock))
c1 = Thread(target=task_consumer, args=(q, 'Zz', lock))
c2 = Thread(target=task_consumer, args=(q, 'lea4ning', lock))
c.start()
c1.start()
c2.start()
c.join()
c1.join()
c2.join()
break
if __name__ == '__main__':
main()
print("==========")
标签:__,task,Python,print,间通信,队列,线程,进程
From: https://www.cnblogs.com/Lea4ning/p/17985001