背景:
实际项目中要使用pyqt/pyside技术,涉及到qt和asyncio的事件循环,并需要使用到qt的信号(Signal)和槽(Slot)机制,从底层了解事件循环的原理使得后续工作更好入手。
事件循环是什么?
事件循环(Event Loop)是一种用于处理和调度异步任务的机制。它通常用于编写异步编程,特别是在处理IO密集型任务时非常有用。事件循环通过不断地检查事件队列中是否有待处理的事件,并按照特定的顺序处理它们。
在事件循环中,程序会不断地轮询事件队列,如果队列中有待处理的事件,事件循环会将它们取出并交给相应的处理器进行处理。处理完一个事件后,事件循环会继续轮询事件队列,直到队列为空或者程序被要求退出。
为什么要使用事件循环(目的)?
确保异步任务的调度和执行,并保证线程安全。
使用python实例来逐步了解事件循环:
可以先了解一下生产者消费者模型能够帮助更好的了解事件循环,使用demo了解事件循环的原理,最后类比qt的信号和槽机制。
先了解生产者消费者模型:生产者、消费者、缓冲区。生产者生产物品放入到缓冲区;若缓冲区为空,消费者就等待;若不为空,消费者从缓冲区取走物品。
此例中:两个生产者进程分别1s生产1个orange,2s生产一个apple,每生产一个就放入队列缓冲区中(队列缓冲区理解为队列即可,先放进去的先消费,后放进去的后消费,类似于排队,先排队的人先被服务);消费者进程只要队列中有orange或apple就消费,没有就阻塞等待。
生产者只需要关注生产,消费者只需要关注消费,两者只需要关注自己要做的事情即可。
# producer_consumer.py
from multiprocessing import Queue, Process
import time
# 生产者进程1生产orange
def producer1(queue: Queue, something):
count = 1
while True:
# 将orange放入队列缓冲区中
queue.put(f"{something}{count}")
print(f"生产{something}{count}")
count += 1
# 等待1s再生产
time.sleep(1)
# 生产者进程2生产apple
def producer2(queue: Queue, something):
count = 1
while True:
# 将apple放入队列缓冲区中
queue.put(f"{something}{count}")
print(f"生产{something}{count}")
count += 1
# 等待2s再生产
time.sleep(2)
# 消费者进程
def consumer(queue: Queue):
while True:
# 从队列中取出一个水果消费,queue.get()会在队列为空时阻塞
something = queue.get()
print(f"消费{something}")
# 等待2s再消费
time.sleep(2)
def main():
queue = Queue()
# 创建生产者进程p1,p2
p1 = Process(target=producer1, args=(queue, "orange"))
p2 = Process(target=producer2, args=(queue, "apple"))
# 创建消费者进程
c1 = Process(target=consumer, args=(queue,))
# 开启三个线程
p1.start()
p2.start()
c1.start()
main()
输出结果如图,消费者按生产者放入队列缓冲区的水果顺序从队列中取出可消费的水果。
事件循环(结合代码理解下述内容):
用简单示例来理解事件循环,事件循环,异步任务,事件队列,分别可以对应理解为消费者,生产者,队列缓冲区
事件循环逻辑:事件循环(主线程)一直循环检测事件队列中是否有待处理事件,如果有,则根据按顺序取出,判断事件类型让对应的事件处理函数去处理;异步任务(子线程)可以是一些会阻塞的io事件(网络请求、耗时的io如等待键盘输入等),本例中使用网络请求和定时事件为例。
大家各干各的事情,事件循环有事儿就处理,没事儿就继续检测事件队列;异步任务就有事件了就放到事件队列里面啥也不管了就干这一件事情。
# eventloop.py
import socket
import threading
import time
import queue
# 循环监听,接收网络命令
def receive_command(q: queue.Queue):
sock = get_socket()
print("Listening for incoming data...")
while True:
try:
# 接收消息
data, addr = sock.recvfrom(1024)
instruction = data.decode("utf-8")
# 将事件需要的事件名以及其他需要的参数包含在event中,方便主线程判断事件类型并分发给对应的事件处理函数处理
event = {"name": "received", "data": instruction}
q.put(event)
except Exception as e:
print(f"Error in receive_thread: {e}")
# 处理接收到的网络请求
def handle_cmd(instruction):
print(f"handle {instruction}")
# 处理定时任务
def handle_timer():
print(f" handle timer")
# 每秒发送一个数,发送10次,time.sleep会造成阻塞,此处阻塞1s
def timer(q: queue.Queue):
for i in range(10):
event = {"name": "timer"}
q.put(event)
time.sleep(1)
# 使用socket绑定当前地址
def get_socket():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("127.0.0.1", 8889))
return sock
# 主线程
def main(q: queue.Queue):
# 创建两个子线程,分别用于接收网络请求和模拟定时接收到消息
task1 = threading.Thread(target=receive_command, args=(q,))
task2 = threading.Thread(target=timer, args=(q,))
# 开启子线程
task1.start()
task2.start()
# 主线程一直循环检测事件队列
while True:
# 事件队列不为空,则取出事件进行处理
if not q.empty():
event = q.get()
# 如果是网络请求,则处理接收到的请求
if event["name"] == "received":
handle_cmd(event["data"])
# 否则处理定时事件
else:
handle_timer()
q = queue.Queue()
main(q)
# controller.py
import socket
发送控制信息
def send_thread(sock):while True:try:终端输入命令发送给客户端
instruction = input()
sock.sendto(instruction.encode("utf-8"), target_address)except Exception as e:print(f"Error in send_thread: {e}")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("127.0.0.1", 8890)) 控制器地址
target_address = ("127.0.0.1", 8889) 客户端地址
send_thread(sock)
controller.py的终端输入如下图:
eventloop.py的终端输出如下图,队列里面来啥处理啥,各自的异步任务也不会阻塞主线程。
类比理解qt的信号和槽机制:
其原理和event loop是一样的,只是将其底层细节(循环检测队列、事件放置到队列中、从队列中取出事件进行分发等)进行了封装实现。
信号(signal)类比于异步任务,槽(slot)函数类比于事件处理函数。一个对象中设置一个信号,另一个对象中设置这个信号的槽函数,如果信号被触发,qt事件循环会自动将信号绑定的槽函数放置到事件队列中等待事件循环的分发执行。很方便的实现两个对象的通信。