ZeroMQ的使用(python)
1. python环境使用
1.1 非源码安装
在安装pyzmq
和libzmq3-devel
之后便可以使用 zeromq 了。需要提前安装好python环境。
(venv-patroni-4.0.3) [fbase@localhost zmq]$ mkdir ~/soft/zmq
(venv-patroni-4.0.3) [fbase@localhost zmq]$ cd ~/soft/zmq
(venv-patroni-4.0.3) [fbase@localhost zmq]$ vi test.py
(venv-patroni-4.0.3) [fbase@localhost zmq]$ vi test1.py
(venv-patroni-4.0.3) [fbase@localhost zmq]$ chmod +x test.py
(venv-patroni-4.0.3) [fbase@localhost zmq]$ chmod +x test1.py
(venv-patroni-4.0.3) [fbase@localhost zmq]$ ll
total 8
-rwxrwxr-x 1 fbase fbase 572 Dec 19 11:31 test1.py
-rwxrwxr-x 1 fbase fbase 466 Dec 19 11:31 test.py
-
test.py:相当于服务器。
# # Hello World server in Python # Binds REP socket to tcp://*:5555 # Expects b"Hello" from client, replies with b"World" # import time import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: # Wait for next request from client message = socket.recv() print(f"Received request: {message}") # Do some 'work' time.sleep(1) # Send reply back to client socket.send(b"World")
服务器创建一个响应类型的套接字,将其绑定到 端口5555,然后等待消息。你也可以看到我们的 配置,我们只是发送字符串。
-
test1.py:相当于客户端。
# # Hello World client in Python # Connects REQ socket to tcp://localhost:5555 # Sends "Hello" to server, expects "World" back # import zmq context = zmq.Context() # Socket to talk to server print("Connecting to hello world server…") socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") # Do 10 requests, waiting each time for a response for request in range(10): print(f"Sending request {request} …") socket.send(b"Hello") # Get the reply. message = socket.recv() print(f"Received reply {request} [ {message} ]")
客户端创建请求类型的套接字,连接并开始发送 充分和传播
send
和receive
方法都是阻塞的(默认)。对于接收 这很简单:如果没有消息,则该方法将阻塞。发送它是 更复杂,取决于插座类型。对于请求套接字,如果 达到高水位线或没有对等体被连接,则该方法将阻塞。
1.2 源码安装
1.2.1 c方式
源码安装一开始只有c方式,先用c方式进行测试。
(venv-patroni-4.0.3) [fbase@localhost zmq]$ touch hwserver.c
(venv-patroni-4.0.3) [fbase@localhost zmq]$ ll
total 3164
-rw-rw-r-- 1 fbase fbase 0 Dec 19 14:57 hwserver.c
drwxr-xr-x 12 fbase fbase 4096 Dec 19 14:47 zeromq-4.1.8
-rw-r--r-- 1 fbase fbase 1479856 Dec 19 10:46 zeromq-4.1.8.tar.gz
(venv-patroni-4.0.3) [fbase@localhost zmq]$ vi hwserver.c
(venv-patroni-4.0.3) [fbase@localhost zmq]$ gcc -o hwserver hwserver.c -lzmq
(venv-patroni-4.0.3) [fbase@localhost zmq]$ ll
total 3180
-rwxrwxr-x 1 fbase fbase 8923 Dec 19 14:58 hwserver
-rw-rw-r-- 1 fbase fbase 560 Dec 19 14:57 hwserver.c
drwxr-xr-x 12 fbase fbase 4096 Dec 19 14:47 zeromq-4.1.8
-rw-r--r-- 1 fbase fbase 1479856 Dec 19 10:46 zeromq-4.1.8.tar.gz
运行hwserver,会发现报错,如下所示:
(venv-patroni-4.0.3) [fbase@localhost zmq]$ hwserver
hwserver: error while loading shared libraries: libzmq.so.5: cannot open shared object file: No such file or directory
这个原因是没有找到libzmq.so.5
,这个依赖在/usr/local/lib
中,将其添加到LD_LIBRARY_PATH
中。
(venv-patroni-4.0.3) [fbase@localhost zmq]$ ls /usr/local/include/
zmq.h zmq_utils.h
(venv-patroni-4.0.3) [fbase@localhost zmq]$ ls /usr/local/lib
libzmq.a libzmq.la libzmq.so libzmq.so.5 libzmq.so.5.0.4 pkgconfig
(venv-patroni-4.0.3) [fbase@localhost zmq]$ echo 'export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH' >> ~/.bashrc
(venv-patroni-4.0.3) [fbase@localhost zmq]$ source ~/.bashrc
(venv-patroni-4.0.3) [fbase@localhost zmq]$ echo $LD_LIBRARY_PATH
/usr/local/fbase/13/lib:/usr/local/lib:/usr/local/lib:/usr/local/fbase/13/lib:
(venv-patroni-4.0.3) [fbase@localhost zmq]$ sudo ldconfig
[sudo] password for fbase:
(venv-patroni-4.0.3) [fbase@localhost zmq]$ ldd $(which hwserver)
linux-vdso.so.1 => (0x00007fff33997000)
libzmq.so.5 => /usr/local/lib/libzmq.so.5 (0x00007f61d4a56000)
libc.so.6 => /lib64/libc.so.6 (0x00007f61d468c000)
librt.so.1 => /lib64/librt.so.1 (0x00007f61d4484000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x00007f61d4268000)
libstdc++.so.6 => /lib64/libstdc++.so.6 (0x00007f61d3f5f000)
libm.so.6 => /lib64/libm.so.6 (0x00007f61d3c5d000)
libgcc_s.so.1 => /lib64/libgcc_s.so.1 (0x00007f61d3a47000)
/lib64/ld-linux-x86-64.so.2 (0x00007f61d4cc2000)
(venv-patroni-4.0.3) [fbase@localhost zmq]$ hwserver
1.2.2 python方式
编译安装pyzmq
tar -zxvf pyzmq-26.2.0.tar.gz
cd pyzmq-26.2.0
# 编译安装(低版本)
python setup.py configure --zmq=/usr/local #set the zmq install path
# 编译安装高版本
pip install setuptools wheel
pip install .
pip show pyzmq
使用与1.1一致。
2. Zeromq使用介绍
2.1 Bind vs Connect
使用ZeroMQ套接字,谁绑定谁连接都无所谓。在上述 您可能已经注意到,服务器使用Bind,而客户端使用Connect。 这是为什么,有什么区别?
ZeroMQ为每个底层连接创建队列。如果您的套接字连接到 三个对等套接字,那么在幕后有三个消息队列。
使用Bind,您允许对等端连接到您,因此您不知道有多少对等端 将来会有,你不能提前创建队列。 相反,队列是在各个对等点连接到绑定套接字时创建的。
通过Connect,ZeroMQ知道至少会有一个对等点, 因此它可以立即创建单个队列。这适用于所有套接字类型 除了ROUTER,其中队列仅在我们连接到的对等体 承认我们的联系
因此,当向没有对等体的绑定套接字或ROUTER发送消息时, 由于没有实时连接,因此没有队列可用于存储消息。
2.1.1 什么时候应该使用bind和connect?
一般来说,从架构中最稳定的点使用绑定, 使用带有volatile端点的动态组件的connect。对于请求/答复, 服务提供者可能是您绑定和客户端使用的点 connect.就像普通的TCP一样。
如果你不能弄清楚哪些部分更稳定(即点对点), 考虑中间的稳定设备,所有侧都可以连接到该设备。
2.2 高水位线
高水位线是一个硬限制的最大数量的优秀 消息ZeroMQ正在内存中为指定的 socket正在与通信。
如果已达到此限制,则套接字进入异常状态, 根据套接字类型,ZeroMQ将采取适当的操作,例如 阻止或丢弃已发送的消息。请参阅各个插座的说明 有关针对每种套接字类型采取的确切操作的详细信息,请参见下面的。
2.3 消息传递模式
在ZeroMQ的套接字API的牛皮纸包装下, 信息模式ZeroMQ模式是由套接字对实现的, 匹配类型。
内置的核心ZeroMQ模式是:
- Request-reply:将一组客户端连接到一组服务。这 是一种远程过程调用和任务分发模式。
- Pub-sub:将一组发布者连接到一组订阅者。这 是一种数据分布模式。
- Pipeline:它以扇出/扇入模式连接节点, 多个步骤和循环。这是一个并行的任务分配和收集 格局
- Exclusive pair:独占地连接两个套接字。这是一种模式 用于在进程中连接两个线程,不要与“正常”混淆 成对的插座。
还有更多ZeroMQ模式仍处于草案状态:
- Client-server:允许单个ZeroMQ服务器与一个或多个进行对话 ZeroMQ客户端。客户总是先开始谈话,然后 任一对等体可以异步地向另一对等体发送消息。
- Radio-dish:用于单个数据的一对多分发 发布者以扇出方式发送给多个订阅者。
3. 使用
在pyzmq中有一些基本使用,如下所示:
# 创建 ZeroMQ 上下文
context = zmq.Context()
# 创建一个请求-响应套接字
socket = context.socket(zmq.REP)
这两步是基本的使用,获取上下文,和根据需要获取套接字,下面是对各个模型的演示代码和套接字介绍和使用。
3.1 Request-reply请求响应模型
请求-响应模式由http://rfc.zeromq.org/spec:28正式定义。
请求-应答模式应该是最常见的交互模式,如果连接之后,服务器终止,那么客户端也终止,从崩溃的过程中恢复不太容易
因此,做一个可靠的请求-应答模式很复杂。
“请求-响应模型”支持的套接字类型有4种:
- ZMQ_REP
- ZMQ_REQ
- ZMQ_DEALER
- ZMQ_ROUTER
3.1.1 “REQ-REP”套接字类型
- 请求-响应模式用于将请求从ZMQ_REQ客户端发送到一个或多个ZMQ_REP服务,并接收对每个发送的请求的后续答复
- REQ-REP套接字对是步调一致的。它们两者的次序必须有规则,不能同时发送或接收,否则无效果
3.1.1.1 ZMQ_REQ
客户端使用ZMQ_REQ类型的套接字向服务发送请求并从服务接收答复。
此套接字类型仅允许zmq_send(request)和后续zmq_recv(reply)调用交替序列。发送的每个请求都在所有服务中轮流轮询,并且收到的每个答复都与最后发出的请求匹配。
如果没有可用的服务,则套接字上的任何发送操作都应阻塞,直到至少有一项服务可用为止。REQ套接字不会丢弃消息
ZMQ_REQ特性摘要 | |
---|---|
兼容的对等套接字 | ZMQ_REP、ZMQ_ROUTER |
方向 | 双向 |
发送/接收模式 | 发送、接收、发送、接收...... |
入网路由策略 | 最后一位(Last peer) |
外发路由策略 | 轮询 |
静音状态下的操作 | 阻塞 |
3.1.1.2 ZMQ_REP
服务使用ZMQ_REP类型的套接字来接收来自客户端的请求并向客户端发送回复。
此套接字类型仅允许zmq_recv(request)和后续zmq_send(reply)调用的交替序列。接收到的每个请求都从所有客户端中公平排队,并且发送的每个回复都路由到发出最后一个请求的客户端。
如果原始请求者不再存在,则答复将被静默丢弃。
ZMQ_REP特性摘要 | |
---|---|
兼容的对等套接字 | ZMQ_REQ、ZMQ_DEALER |
方向 | 双向 |
发送/接收模式 | 发送、接收、发送、接收...... |
入网路由策略 | 公平排队 |
外发路由策略 | 最后一位(Last peer) |
3.1.2 代码演示
服务端创建REP套接字,阻塞等待客户端消息的到达,当客户端有消息达到时给客户端回送“World”字符串。
客户端创建REP套接字,向服务端发送字符串“Hello”,然后等待服务端回送消息。
-
服务端代码
server.py
:import zmq import time context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: # 接收数据 msg = socket.recv() if msg is not None: print(f"msg:{msg.decode('utf-8')}") time.sleep(1) # 发送数据 # msg = socket.recv_string() # reply_msg = "world" # result = socket.send_string(reply_msg) # msg = socket.recv() # reply_msg = b"world" # result = socket.send(reply_msg) reply_msg = "world" result = socket.send(reply_msg.encode("utf-8")) print(f"reply result:{result}")
- 如果消息是字符串,使用
send_string
。 - 如果你使用
send
,确保传入的是字节数据(通过.encode()
转换)。或者发送二进制的数组。
- 如果消息是字符串,使用
-
客户端代码
client.py
:import zmq context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") while True: # 发送数据 # req_msg = "hello" # result = socket.send_string(req_msg) # reply_msg = socket.recv_string() # req_msg = b"hello" # result = socket.send(req_msg) # reply_msg = socket.recv() req_msg = "hello" result = socket.send(req_msg.encode("utf-8")) print(f"request result:{result}") # 接收回复数据数据 reply_msg = socket.recv() if reply_msg is not None: print(f"reply msg:{reply_msg.decode('utf-8')}")
结果展示:
# server
msg:hello
reply result:None
msg:hello
reply result:None
# client
request result:None
reply msg:world
request result:None
reply msg:world
3.2 Exclusive pair推拉模型
管道模式由http://rfc.zeromq.org/spec:30正式定义。
也被称为管道模型;管道模式在有的地方也称为“流水线”模式。
管道模式用于将数据分发到布置在流水线中的节点。数据始终沿流水线向下流动,流水线的每一级都连接到至少一个节点。当流水线级连接到多个节点时,数据在所有连接的节点之间进行轮询。
“管道模式“支持的套接字类型有2种:
- ZMQ_PUSH
- ZMQ_PULL
3.2.1 PUSH-PULL”套接字类型
3.2.1.1 ZMQ_PUSH
管道节点使用类型为ZMQ_PUSH的套接字将消息发送到下游流水线节点。消息循环到所有连接的下游节点。
该套接字类型不支持zmq_msg_recv()等接收数据的函数。
当ZMQ_PUSH套接字由于已达到所有下游节点的高水位线而进入静音状态时,或者如果根本没有下游节点,则套接字上的任何zmq_send()操作都应阻塞,直到静音状态结束或处于至少一个下游节点可用于发送;消息不会被丢弃。
ZMQ_PUSH特性摘要 | |
---|---|
兼容的对等套接字 | ZMQ_PUSH |
方向 | 单向 |
发送/接收模式 | 仅发送 |
入网路由策略 | 不适用(N/A) |
外发路由策略 | 轮询 |
静音状态下的操作 | 阻塞 |
3.2.1.2 ZMQ_PULL
管道节点使用ZMQ_PULL类型的套接字从上游管道节点接收消息。
消息从所有连接的上游节点中公平排队。
该套接字类型不支持zmq_msg_send()等发送数据的函数。
ZMQ_PULL特性摘要 | |
---|---|
兼容的对等套接字 | ZMQ_PULL |
方向 | 单向 |
发送/接收模式 | 仅接收 |
入网路由策略 | 公平排队 |
外发路由策略 | 不适用(N/A) |
静音状态下的操作 | 阻塞 |
3.2.2 代码演示
演示案例:
一台发生器(taskvent.py),它产生可以并行执行的任务。
工人1(taskwork.py),用于处理任务。
工人2(taskwork-2.py),用于处理任务。
一个接收器(tasksink.py),用于收集工作进程返回的结果。
-
发生器
taskvent.py
:import zmq import time context = zmq.Context() socket = context.socket(zmq.PUSH) socket.connect("tcp://localhost:5555") socket.connect("tcp://localhost:5554") print("Task generator started, sending tasks...") for i in range(20): task = f"Task {i + 1}" print(f"Generating: {task}") socket.send_string(task) time.sleep(1) socket.close() context.term()
-
工人1
taskwork.py
:import zmq import math import time context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.bind("tcp://localhost:5555") sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5556") print("Worker started, processing tasks...") while True: # 接收发生器的消息 task = receiver.recv_string() print(f"Proccessing 1 : {task}") task_number = int(task.split()[1]) # 给接收器发送消息 result_msg = f"work1: Result of {math.pow(task_number, 2)}" sender.send_string(result_msg) time.sleep(1)
-
工人2
taskwork-2.py
:import zmq import time context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.bind("tcp://localhost:5554") sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5556") print("Worker2 started, processing tasks...") while True: # 接收发生器的消息 task = receiver.recv_string() print(f"Proccessing 2 : {task}") task_number = int(task.split()[1]) # 给接收器发送消息 result_msg = f"work2: Result of {task_number * 2}" sender.send_string(result_msg) time.sleep(1)
-
接收器
tasksink.py
:import zmq context = zmq.Context() socket = context.socket(zmq.PULL) socket.bind("tcp://localhost:5556") print("Receiver started, collecting results...") for i in range(20): result = socket.recv_string() print(f"Result: {result}") socket.close() context.term()
结果展示:
# taskvent
Task generator started, sending tasks...
Generating: Task 1
Generating: Task 2
Generating: Task 3
Generating: Task 4
Generating: Task 5
Generating: Task 6
Generating: Task 7
Generating: Task 8
Generating: Task 9
Generating: Task 10
Generating: Task 11
Generating: Task 12
Generating: Task 13
Generating: Task 14
Generating: Task 15
Generating: Task 16
Generating: Task 17
Generating: Task 18
Generating: Task 19
Generating: Task 20
# taskwork
Worker started, processing tasks...
Proccessing 1 : Task 1
Proccessing 1 : Task 3
Proccessing 1 : Task 5
Proccessing 1 : Task 7
Proccessing 1 : Task 9
Proccessing 1 : Task 11
Proccessing 1 : Task 13
Proccessing 1 : Task 15
Proccessing 1 : Task 17
Proccessing 1 : Task 19
# taskwork2
Worker2 started, processing tasks...
Proccessing 2 : Task 2
Proccessing 2 : Task 4
Proccessing 2 : Task 6
Proccessing 2 : Task 8
Proccessing 2 : Task 10
Proccessing 2 : Task 12
Proccessing 2 : Task 14
Proccessing 2 : Task 16
Proccessing 2 : Task 18
Proccessing 2 : Task 20
# tasksink
Receiver started, collecting results...
Result: work2: Result of 4
Result: work2: Result of 8
Result: work2: Result of 12
Result: work2: Result of 16
Result: work1: Result of 1.0
Result: work1: Result of 9.0
Result: work1: Result of 25.0
Result: work1: Result of 49.0
Result: work1: Result of 81.0
Result: work2: Result of 20
Result: work1: Result of 121.0
Result: work2: Result of 24
Result: work1: Result of 169.0
Result: work2: Result of 28
Result: work1: Result of 225.0
Result: work2: Result of 32
Result: work1: Result of 289.0
Result: work2: Result of 36
Result: work1: Result of 361.0
Result: work2: Result of 40
3.3.3 模式总结
- 工人向上连接到发生器,并且向下连接到接收器。这意味着你可以随意添加工人。 因此,发生器和接收器是架构的固定部分,而工人是动态部分。
- 我们必须同步开始同批次的所有工人的启动和运行。这是ZeroMQ中存在的一个相当普遍的疑难杂症,并没有简单的解决办法。connect方法需要一定的时间,所以当一组工人连接到发生器时,第一个成功连接的工人会在这短短的时间得到消息的整个负载,而其他工人仍在进行连接。如果不知何故批次的开始不同步,那么系统就将无法并行运行。
- 发生器的PUSH套接字将任务均匀地分配给工人。这就是所谓的负载均衡。
- 接收器的PULL套接字均匀地收集来自工人的结果。这就是所谓的公平排队。
管道模式也有类似“慢木匠”的现象
- 它导致了对PUSH套接字不能正确地负载均衡的指责。如果你使用的是PUSH和PULL,并且你的某个工人得到比其他工人更多的信息,这是因为PULL套接字已比别人更快地连接,并在其他工人试图连接之前抓取了很多消息。
3.3 Pipeline共享队列/代理模型
3.3.1 “DEALER-ROUTER”套接字类型
3.3.1.1 ZMQ_DEALER
ZMQ_DEALER类型的套接字是用于扩展“请求/应答”套接字的高级模式。
发送消息时:当ZMQ_DEALER套接字由于已达到所有对等点的最高水位而进入静音状态时,或者如果根本没有任何对等点,则套接字上的任何zmq_send()操作都应阻塞,直到静音状态结束或至少一个对等方变得可以发送;消息不会被丢弃。
接收消息时:发送的每条消息都是在所有连接的对等方之间进行轮询,并且收到的每条消息都是从所有连接的对等方进行公平排队的
将ZMQ_DEALER套接字连接到ZMQ_REP套接字时,发送的每个消息都必须包含一个空的消息部分,定界符以及一个或多个主体部分。
ZMQ_DEALER特性摘要 | |
---|---|
兼容的对等套接字 | ZMQ_ROUTER、ZMQ_REP、ZMQ_DEALER |
方向 | 双向 |
发送/接收模式 | 无限制 |
入网路由策略 | 公平排队 |
外发路由策略 | 轮询 |
静音状态下的操作 | 阻塞 |
3.3.1.2 ZMQ_ROUTER
ZMQ_ROUTER类型的套接字是用于扩展请求/答复套接字的高级套接字类型。
当收到消息时:ZMQ_ROUTER套接字在将消息传递给应用程序之前,应在消息部分之前包含消息的始发对等方的路由ID。接收到的消息从所有连接的同级之间公平排队。
发送消息时:
- ZMQ_ROUTER套接字应删除消息的第一部分,并使用它来确定消息应路由到的对等方的_routing id _。如果该对等点不再存在或从未存在,则该消息将被静默丢弃。
- 但是,如果ZMQ_ROUTER_MANDATORY套接字选项设置为1,这两种情况下套接字都将失败并显示EHOSTUNREACH。
高水位标记影响:
- 当ZMQ_ROUTER套接字由于达到所有同位体的高水位线而进入静音状态时,发送到该套接字的任何消息都将被丢弃,直到静音状态结束为止。同样,任何路由到已达到单个高水位标记的对等方的消息也将被丢弃。
- 如果ZMQ_ROUTER_MANDATORY套接字选项设置为1,则在两种情况下套接字都应阻塞或返回EAGAIN。
ZMQ_ROUTER_MANDATORY套接字选项:
- 当ZMQ_ROUTER套接字的ZMQ_ROUTER_MANDATORY标志设置为1时,套接字应在接收到来自一个或多个对等方的消息后生成ZMQ_POLLIN事件。
- 同样,当至少一个消息可以发送给一个或多个对等方时,套接字将生成ZMQ_POLLOUT事件。
当ZMQ_REQ套接字连接到ZMQ_ROUTER套接字时,除了始发对等方的路由ID外,每个收到的消息都应包含一个空的定界符消息部分。因此,由应用程序看到的每个接收到的消息的整个结构变为:一个或多个路由ID部分,定界符部分,一个或多个主体部分。将回复发送到ZMQ_REQ套接字时,应用程序必须包括定界符部分。
ZMQ_ROUTER特性摘要 | |
---|---|
兼容的对等套接字 | ZMQ_DEALER、ZMQ_REQ、ZMQ_ROUTER |
方向 | 双向 |
发送/接收模式 | 无限制 |
入网路由策略 | 公平排队 |
外发路由策略 | 看上面介绍 |
静音状态下的操作 | 丢弃(见上面介绍) |
3.3.2 代码演示
演示示例:
架构图见3.3.3的方法②。
扩展了上面的“REQ-REP”演示示例:REQ和ROUTER交流,DEALER与REP交流。代理节点从一个套接字读取消息,并将消息转发到其他套接字。
客户端:将REQ套接字连接到代理的ROUTER节点上,向ROUTER节点发送“Hello”,接收到“World”的回复。
代理端:
-
创建一个ROUTER套接字与客户端相连接,创建一个DEALER套接字与服务端相连接
-
ROUTER套接字从客户端接收请求数据,并把请求数据发送给服务端
-
DEALER套接字从服务端接收响应数据,并把响应数据发送给客户端
服务端:将REP套接字连接到代理的DEALER节点上。
-
客户端代码:
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") print("客户端-1已启动,正在发送信息...") while True: # 发送数据 req_msg = "client-1-hello" socket.send_string(req_msg) print(f"clinet-1 req_msg:{req_msg}") # 接收回复数据数据 reply_msg = socket.recv_string() print(f"reply clinet-1 msg:{reply_msg}") time.sleep(1)
-
服务端代码:
import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.connect("tcp://localhost:5556") # 服务端监听正确的端口 print("服务端-1已启动,正在接收信息...") while True: msg = socket.recv_string() # 接收消息 print(f"server-1 msg: {msg}") reply_msg = "server-1 world" socket.send_string(reply_msg) # 回复消息 print(f"reply server-1 reply_msg: {reply_msg}")
-
代理端代码:
import zmq context = zmq.Context() # 创建前端 ROUTER 套接字并绑定到客户端连接地址 frontend = context.socket(zmq.ROUTER) frontend.bind("tcp://*:5555") # 创建后端 DEALER 套接字并绑定到服务端连接地址 backend = context.socket(zmq.DEALER) backend.bind("tcp://*:5556") # 创建 poller 用于同时监听多个套接字 poller = zmq.Poller() poller.register(frontend, zmq.POLLIN) poller.register(backend, zmq.POLLIN) print("代理节点已启动,正在处理信息...") while True: socks = dict(poller.poll()) if frontend in socks: # 从 ROUTER 接收来自客户端的消息(multipart 消息) parts = frontend.recv_multipart() print(f"收到客户端消息: {parts}") # 将完整的 multipart 消息转发给 DEALER backend.send_multipart(parts) if backend in socks: # 从 DEALER 接收来自服务端的回复 reply_parts = backend.recv_multipart() print(f"收到服务端回复: {reply_parts}") # 将回复转发回原始客户端(假设第一个部分是客户端 ID) frontend.send_multipart(reply_parts)
如果不使用
zmq.Poller()
import zmq import time context = zmq.Context() # 创建前端 ROUTER 套接字并绑定到客户端连接地址 frontend = context.socket(zmq.ROUTER) frontend.bind("tcp://*:5555") # 创建后端 DEALER 套接字并绑定到服务端连接地址 backend = context.socket(zmq.DEALER) backend.bind("tcp://*:5556") print("代理节点已启动,正在处理信息...") try: while True: # 尝试从前端接收消息,如果无消息则快速跳过 try: parts = frontend.recv_multipart(flags=zmq.NOBLOCK) print(f"收到客户端消息: {parts}") # 将完整的 multipart 消息转发给后端 backend.send_multipart(parts) except zmq.Again: pass # No message received from frontend, continue # 尝试从后端接收回复,如果无消息则快速跳过 try: reply_parts = backend.recv_multipart(flags=zmq.NOBLOCK) print(f"收到服务端回复: {reply_parts}") # 将回复转发回原始客户端 frontend.send_multipart(reply_parts) except zmq.Again: pass # No message received from backend, continue # 避免CPU占用过高 time.sleep(0.01) # 等待10毫秒再检查 except KeyboardInterrupt: print("中断信号收到,退出程序...") finally: frontend.close() backend.close() context.term()
结果展示:
# client
客户端-1已启动,正在发送信息...
clinet-1 req_msg:client-1-hello
reply clinet-1 msg:server-1 world
clinet-1 req_msg:client-1-hello
reply clinet-1 msg:server-1 world
# proxt
代理节点已启动,正在处理信息...
收到客户端消息: [b'\x00k\x8bEg', b'', b'client-1-hello']
收到服务端回复: [b'\x00k\x8bEg', b'', b'server-1 world']
# server
服务端-1已启动,正在接收信息...
server-1 msg: client-1-hello
reply server-1 reply_msg: server-1 world
server-1 msg: client-1-hello
reply server-1 reply_msg: server-1 world
如果有多个服务器和客户端,可以通过b'\x00k\x8bEg'来进行客户端的选择。
3.3.3 模式总结
共享队列/代理
- 在“REP-REQ”的演示案例中,我们只有一个客户端和一个服务端进行交流。但是实际中,我们通常允许多个客户端与多个服务端之间相互交流。
- 将多个客户端连接到多个服务器的方法有两种:
- 方法①:将每个客户端都连接到多个服务端点。
- 方法②:使用代理。
方法①:
- 原理:一种是将每个客户端套接字连接到多个服务端点。REQ套接字随后会将请求发送到服务端上。比如说一个客户端连接了三个服务端点:A、B、C,之后发送请求R1、R4到服务A上,发送请求R2到服务B上、发送请求R3到服务C上(如下图所示)。
- 对于这种设计来说,服务器属于静态部分,客户端属于动态部分,客户端的增减无所谓,但是服务器的增减确实致命的。假设现在有100个客户端都连接了服务器,如果此时新增了三台服务器,为了让客户端识别这新增的三台服务器,那么就需要将所有的客户端都停止重新配置然后再重新启动。
方法②:
- 我们可以编写一个小型消息排队代理,使我们具备灵活性。
- 原理:该代理绑定到了两个端点,一个用于客户端的前端(ZMQ_ROUTER),另一个用于服务的后端(ZMQ_DEALER)。然后带来使用zmq_poll()来轮询这两个套接字的活动,当有消息时,代理会将消息在两个套接字之间频繁地输送。
- 该代理其实并不显式管理任何队列,其只负责消息的传送,ØMQ会自动将消息在每个套接字上进行排队。
- 使用zmq_poll()配合DEALER-ROUTER:
- 在上面我们使用REQ-REP套接字时,会有一个严格同步的请求-响应对话,就是必须客户端先发送一个请求,然后服务端读取请求并发送应答,最后客户端读取应答,如此反复。如果客户端或服务端尝试破坏这种约束(例如,连续发送两个请求,而没有等待响应),那么将返回一个错误。
- 我们的代理必须是非阻塞的,可以使用zmq_poll()来轮询任何一个套接字上的活动,但我们不能使用REQ-REQ。幸运地是,有两个称为DEALER和ROUTER的套接字,它们能我们可以执行无阻塞的请求-响应。
3.4 Pub-sub发布订阅模型
发布-订阅模式由https://rfc.zeromq.org/spec/29/正式定义
在发布-订阅模式中,有一个发布者用来发送消息,该模式中有很多订阅者会接收发布者发布的消息
“发布-订阅”模型支持的套接字类型有4种:
- ZMQ_PUB
- ZMQ_SUB
- ZMQ_XPUB
- ZMQ_XSUB
3.4.1 “PUB-SUB”套接字类型
- PUB就是发布者,SUB就是订阅者。
3.4.1.1 ZMQ_PUB
发布者使用类型为ZMQ_PUB的套接字来分发数据。发送的消息以扇出方式分发给所有连接的对等方。
在ZMQ_PUB类型的套接字上不能执行zmq_msg_recv()等接收数据的函数。
当ZMQ_PUB套接字由于已达到订阅者的高水位标记而进入静音状态时,将发送给有问题的订阅者的任何消息都将被丢弃,直到静音状态结束为止。
对于该套接字类型,zmq_msg_send()函数将永远不会阻塞。
ZMQ_PUB特性摘要 | |
---|---|
兼容的对等套接字 | ZMQ_SUB、ZMQ_XSUB |
方向 | 单向 |
发送/接收模式 | 仅发送 |
入网路由策略 | 不适用(N/A) |
外发路由策略 | 扇出(Fan out) |
静音状态下的操作 | 丢弃 |
3.4.1.2 ZMQ_SUB
订阅者使用ZMQ_SUB类型的套接字来订阅发布者分发的数据。
ZMQ_SUB套接字创建完成之后,ZMQ_SUB套接字未订阅任何消息,请使用zmq_setsockopt()的ZMQ_SUBSCRIBE选项指定要订阅的消息。
在ZMQ_PUB类型的套接字上不能执行zmq_msg_recv()等接收数据的函数。
ZMQ_SUB特性摘要 | |
---|---|
兼容的对等套接字 | ZMQ_PUB、ZMQ_XPUB |
方向 | 单向 |
发送/接收模式 | 仅接收 |
入网路由策略 | 公平排队 |
外发路由策略 | 不适用(N/A) |
3.4.2 “XPUB-XSUB”套接字类型
- “XPUB-XSUB”套接字类型与“PUB-SUB”套接字类型相同,也是属于发布-订阅。
- 在“PUB-SUB”中,订阅者通过zmq_connect()向发布者发起订阅;但是“XPUB-XSUB”套接字类型允许订阅者通过发送一条订阅信息到发布者来完成订阅。
3.4.2.1 ZMQ_XPUB
用法与ZMQ_PUB大部分相同。
但是有一点与ZMQ_PUB不同:ZMQ_XPUB(自己)的订阅方可以向自己发送一个订阅信息来进行订阅。订阅消息是字节1(用于订阅)或字节0(用于取消订阅),后跟订阅主体。也接收不带子/取消订阅前缀的消息,但对订阅状态没有影响。
ZMQ_XPUB特性摘要 | |
---|---|
兼容的对等套接字 | ZMQ_SUB、ZMQ_XSUB |
方向 | 单向 |
发送/接收模式 | 发送消息,接收订阅 |
入网路由策略 | 不适用(N/A) |
外发路由策略 | 扇出(Fan out) |
静音状态下的操作 | 丢弃 |
3.4.2.2 ZMQ_XSUB
- 用法与ZMQ_SUB大部分相同。
- 但是有一点与ZMQ_SUB不同:自己可以向发布者发送一条订阅信息来进行订阅。订阅消息是字节1(用于订阅)或字节0(用于取消订阅),后跟订阅主体。也接收不带子/取消订阅前缀的消息,但对订阅状态没有影响。
ZMQ_XSUB特性摘要 | |
---|---|
兼容的对等套接字 | ZMQ_PUB、ZMQ_XPUB |
方向 | 单向 |
发送/接收模式 | 接收消息,发送订阅 |
入网路由策略 | 公平排队 |
外发路由策略 | 不适用(N/A) |
静音状态下的操作 | 丢弃 |
3.4.3 代码演示
3.4.3.1 PUB-SUB
演示示例:
发布者:类似于一个天气更新服务器,向订阅者发送天气更新,内容包括邮政编码、温度、湿度等信息。
订阅者:它监听发布者更新的数据流,过滤只接收与特定邮政编码相关的天气信息,默认接收接收10条数据。
-
发布者:
import zmq import random import time import json context = zmq.Context() publisher = context.socket(zmq.PUB) publisher.bind("tcp://localhost:5555") print("发布者已启动,正在发送天气更新...") # 模拟天气更新的数据 postal_codes = ['10001', '20002', '30003', '40004', '50005'] # 邮政编码列表 while True: # 随机选择一个邮政编码 postal_code = random.choice(postal_codes) # 随机生成温度和湿度 temperature = random.uniform(-10, 40) humidity = random.uniform(30, 90) # 构建消息 message = { "postal_code": postal_code, "temperature": temperature, "humidity": humidity, } message_str = json.dumps(message) # 发送消息 print(f"pub: {message_str}") publisher.send_string(f"{postal_code} {message_str}") time.sleep(1)
-
订阅者-1:
import zmq import json context = zmq.Context() subscriber = context.socket(zmq.SUB) subscriber.connect("tcp://localhost:5555") # 设置订阅过滤器,默认订阅所有消息 postal_code_filter = '10001' # 订阅特定邮政编码的天气信 # 只订阅以指定邮政编码开头的消息 subscriber.setsockopt_string(zmq.SUBSCRIBE, postal_code_filter) print(f"订阅者已启动,正在接收邮政编码为 {postal_code_filter} 的天气更新...") # 接收前 10 条数据 count = 0 while count < 10: # 接收消息 message_str = subscriber.recv_string() print(f"sub-10001: {message_str}") # 解析json字符串 message = json.loads(message_str.split(' ', 1)[1]) # 打印天气信息 print(f"接收到天气更新: 邮政编码: {message['postal_code']}," f"温度: {message['temperature']}°C, 湿度: {message['humidity']}%") count += 1 subscriber.close() context.term()
-
订阅者-2:
import zmq import json context = zmq.Context() subscriber = context.socket(zmq.SUB) subscriber.connect("tcp://localhost:5555") # 设置订阅过滤器,默认订阅所有消息 postal_code_filter = '20002' # 订阅特定邮政编码的天气信 # 只订阅以指定邮政编码开头的消息 subscriber.setsockopt_string(zmq.SUBSCRIBE, postal_code_filter) print(f"订阅者-2已启动,正在接收邮政编码为 {postal_code_filter} 的天气更新...") # 接收前 10 条数据 count = 0 while count < 10: # 接收消息 message_str = subscriber.recv_string() print(f"sub-10002: {message_str}") # 解析json字符串 message = json.loads(message_str.split(' ', 1)[1]) # 打印天气信息 print(f"接收到天气更新: 邮政编码: {message['postal_code']}," f"温度: {message['temperature']}°C, 湿度: {message['humidity']}%") count += 1 subscriber.close() context.term()
结果展示:
# puber
pub: {"postal_code": "20002", "temperature": 13.25265265413962, "humidity": 86.0784125194719}
pub: {"postal_code": "40004", "temperature": 31.73599959814129, "humidity": 52.561033432350925}
pub: {"postal_code": "50005", "temperature": 14.90289689583275, "humidity": 74.72166996686823}
pub: {"postal_code": "20002", "temperature": 9.867402200794743, "humidity": 32.0353602888176}
pub: {"postal_code": "50005", "temperature": 24.332321511064087, "humidity": 74.40272310252055}
pub: {"postal_code": "30003", "temperature": 3.039558928478584, "humidity": 72.99202727412626}
pub: {"postal_code": "10001", "temperature": 7.746891871815958, "humidity": 78.89402815964135}
pub: {"postal_code": "40004", "temperature": 32.4869161115785, "humidity": 55.37044485578605}
pub: {"postal_code": "50005", "temperature": 24.873966694343203, "humidity": 60.45065293887665}
pub: {"postal_code": "40004", "temperature": 29.947344215184877, "humidity": 42.01658424992969}
# sub-1
订阅者-1已启动,正在接收邮政编码为 10001 的天气更新...
sub-10001: 10001 {"postal_code": "10001", "temperature": 7.746891871815958, "humidity": 78.89402815964135}
接收到天气更新: 邮政编码: 10001,温度: 7.746891871815958°C, 湿度: 78.89402815964135%
# sub-2
订阅者-2已启动,正在接收邮政编码为 20002 的天气更新...
sub-10002: 20002 {"postal_code": "20002", "temperature": 13.25265265413962, "humidity": 86.0784125194719}
接收到天气更新: 邮政编码: 20002,温度: 13.25265265413962°C, 湿度: 86.0784125194719%
sub-10002: 20002 {"postal_code": "20002", "temperature": 9.867402200794743, "humidity": 32.0353602888176}
接收到天气更新: 邮政编码: 20002,温度: 9.867402200794743°C, 湿度: 32.0353602888176%
3.4.3.2 XPUB-XSUB
演示示例:
发布者1:类似于一个天气更新服务器,向代理节点发送天气更新,内容包括邮政编码、温度、湿度等信息。
发布者2:累死一个 用户操作记录服务器,向代理节点发送当前用户的操作记录。
代理节点:代理节点接受连接的订阅者(XPUB)和发布者(XSUB)。它将订阅者的请求转发到发布者,并将消息从发布者转发到订阅者。
订阅者1:它监听代理节点更新的数据流,过滤只接收与特定邮政编码相关的天气信息,默认接收接收10条数据。
订阅者2:它监听代理节点更新的数据流,过滤只接收与用户操作记录相关的信息,默认接收接收10条数据。
-
发布者1:
import zmq import random import time import json context = zmq.Context() publisher = context.socket(zmq.PUB) publisher.connect("tcp://localhost:5555") print("发布者-1已启动,正在发送天气更新...") # 模拟天气更新的数据 postal_codes = ['10001', '20002', '30003', '40004', '50005'] # 邮政编码列表 while True: # 随机选择一个邮政编码 postal_code = random.choice(postal_codes) # 随机生成温度和湿度 temperature = random.uniform(-10, 40) humidity = random.uniform(30, 90) # 构建消息 message = { "postal_code": postal_code, "temperature": temperature, "humidity": humidity, } message_str = json.dumps(message) # 发送消息 print(f"pub-1: {message_str}") publisher.send_string(f"{postal_code} {message_str}") time.sleep(1)
-
发布者2:
import zmq import random import time import json context = zmq.Context() publisher = context.socket(zmq.PUB) publisher.connect("tcp://localhost:5555") print("发布者-2已启动,正在发送天气更新...") # 模拟天气更新的数据 userIds = ['001', '002'] # 邮政编码列表 while True: # 随机选择一个邮政编码ss userId = random.choice(userIds) # 当前用户登录和登录时间 action = "login" timestamp = time.time() # 构建消息 message = { "userId": userId, "action": action, "timestamp": timestamp, } message_str = json.dumps(message) # 发送消息 print(f"pub-2: {message_str}") publisher.send_string(f"{userId} {message_str}") time.sleep(1)
-
订阅者1:
import zmq import json context = zmq.Context() subscriber = context.socket(zmq.SUB) subscriber.connect("tcp://localhost:5556") # 设置订阅过滤器,默认订阅所有消息 postal_code_filter = '10001' # 订阅特定邮政编码的天气信 # 只订阅以指定邮政编码开头的消息 subscriber.setsockopt_string(zmq.SUBSCRIBE, postal_code_filter) print(f"订阅者-1已启动,正在接收邮政编码为 {postal_code_filter} 的天气更新...") # 接收前 10 条数据 count = 0 while count < 10: # 接收消息 message_str = subscriber.recv_string() print(f"sub-1-10001: {message_str}") # 解析json字符串 message = json.loads(message_str.split(' ', 1)[1]) # 打印天气信息 print(f"接收到天气更新: 邮政编码: {message['postal_code']}," f"温度: {message['temperature']}°C, 湿度: {message['humidity']}%") count += 1 subscriber.close() context.term()
-
订阅者2:
import zmq import json context = zmq.Context() subscriber = context.socket(zmq.SUB) subscriber.connect("tcp://localhost:5556") # 设置订阅过滤器,默认订阅所有消息 userId = '001' # 只订阅以指定用户Id开头的消息 subscriber.setsockopt_string(zmq.SUBSCRIBE, userId) print(f"订阅者-2已启动,正在接收用户id为 {userId} 的操作行为更新...") # 接收前 10 条数据 count = 0 while count < 10: # 接收消息 message_str = subscriber.recv_string() print(f"sub-2-001: {message_str}") # 解析json字符串 message = json.loads(message_str.split(' ', 1)[1]) # 打印天气信息 print(f"接收到用户行为更新: 用户ID: {message['userId']}," f"操作: {message['action']}, 时间: {message['timestamp']}%") count += 1 subscriber.close() context.term()
-
代理节点:
import zmq import json context = zmq.Context() subscriber = context.socket(zmq.SUB) subscriber.connect("tcp://localhost:5556") # 设置订阅过滤器,默认订阅所有消息 userId = '001' # 只订阅以指定用户Id开头的消息 subscriber.setsockopt_string(zmq.SUBSCRIBE, userId) print(f"订阅者-2已启动,正在接收用户id为 {userId} 的操作行为更新...") # 接收前 10 条数据 count = 0 while count < 10: # 接收消息 message_str = subscriber.recv_string() print(f"sub-2-001: {message_str}") # 解析json字符串 message = json.loads(message_str.split(' ', 1)[1]) # 打印天气信息 print(f"接收到用户行为更新: 用户ID: {message['userId']}," f"操作: {message['action']}, 时间: {message['timestamp']}%") count += 1 subscriber.close() context.term()
结果展示:
# proxt
proxt-xsub-msg: 001 {"userId": "001", "action": "login", "timestamp": 1734625019.16635}
proxt-xpub-msg: 001
proxt-xsub-msg: 10001 {"postal_code": "10001", "temperature": 7.875253853621338, "humidity": 37.31015663936709}
proxt-xsub-msg: 10001 {"postal_code": "10001", "temperature": 14.432860810309766, "humidity": 60.22298999335305}
proxt-xsub-msg: 10001 {"postal_code": "10001", "temperature": 15.503168013798806, "humidity": 59.75713604714118}
proxt-xsub-msg: 10001 {"postal_code": "10001", "temperature": -7.395234809523146, "humidity": 65.5193954895997}
proxt-xsub-msg: 10001 {"postal_code": "10001", "temperature": -9.75615742629257, "humidity": 74.21744572318175}
proxt-xsub-msg: 10001 {"postal_code": "10001", "temperature": 5.268318532469344, "humidity": 45.62217272071367}
proxt-xpub-msg: 10001
# puber1
pub-1: {"postal_code": "10001", "temperature": 10.813840832122981, "humidity": 85.29650973319102}
pub-1: {"postal_code": "50005", "temperature": 38.40227985356804, "humidity": 50.82447611291441}
pub-1: {"postal_code": "50005", "temperature": 28.751940848644928, "humidity": 68.55122723607448}
pub-1: {"postal_code": "40004", "temperature": 28.696542004302806, "humidity": 58.85859477448625}
pub-1: {"postal_code": "20002", "temperature": -6.89923052809386, "humidity": 61.09597083888137}
pub-1: {"postal_code": "20002", "temperature": 13.419115255532269, "humidity": 52.01525249018736}
pub-1: {"postal_code": "50005", "temperature": 29.04016206865918, "humidity": 58.77557836353199}
pub-1: {"postal_code": "30003", "temperature": 18.47547641106557, "humidity": 41.33830374452378}
pub-1: {"postal_code": "50005", "temperature": 27.766425247968726, "humidity": 77.09619320847364}
pub-1: {"postal_code": "50005", "temperature": 37.632560879484906, "humidity": 78.04897441464384}
pub-1: {"postal_code": "40004", "temperature": 0.9880948271698973, "humidity": 34.36147305497506}
pub-1: {"postal_code": "30003", "temperature": 39.722978064052114, "humidity": 40.02421184794968}
pub-1: {"postal_code": "40004", "temperature": -3.6255678397909357, "humidity": 40.45473079718421}
# puber2
pub-2: {"userId": "002", "action": "login", "timestamp": 1734625077.2781255}
pub-2: {"userId": "001", "action": "login", "timestamp": 1734625078.2800937}
pub-2: {"userId": "001", "action": "login", "timestamp": 1734625079.2821612}
pub-2: {"userId": "001", "action": "login", "timestamp": 1734625080.2840903}
pub-2: {"userId": "002", "action": "login", "timestamp": 1734625081.286704}
pub-2: {"userId": "001", "action": "login", "timestamp": 1734625082.2886486}
pub-2: {"userId": "002", "action": "login", "timestamp": 1734625083.2907314}
pub-2: {"userId": "001", "action": "login", "timestamp": 1734625084.2925353}
pub-2: {"userId": "002", "action": "login", "timestamp": 1734625085.2943888}
pub-2: {"userId": "002", "action": "login", "timestamp": 1734625086.296313}
pub-2: {"userId": "001", "action": "login", "timestamp": 1734625087.298361}
pub-2: {"userId": "002", "action": "login", "timestamp": 1734625088.3002558}
# suber-1
sub-1-10001: 10001 {"postal_code": "10001", "temperature": -7.395234809523146, "humidity": 65.5193954895997}
接收到天气更新: 邮政编码: 10001,温度: -7.395234809523146°C, 湿度: 65.5193954895997%
sub-1-10001: 10001 {"postal_code": "10001", "temperature": -9.75615742629257, "humidity": 74.21744572318175}
接收到天气更新: 邮政编码: 10001,温度: -9.75615742629257°C, 湿度: 74.21744572318175%
sub-1-10001: 10001 {"postal_code": "10001", "temperature": 5.268318532469344, "humidity": 45.62217272071367}
接收到天气更新: 邮政编码: 10001,温度: 5.268318532469344°C, 湿度: 45.62217272071367%
# suber-2
sub-2-001: 001 {"userId": "001", "action": "login", "timestamp": 1734625012.1531057}
接收到用户行为更新: 用户ID: 001,操作: login, 时间: 1734625012.1531057%
sub-2-001: 001 {"userId": "001", "action": "login", "timestamp": 1734625014.1569405}
接收到用户行为更新: 用户ID: 001,操作: login, 时间: 1734625014.1569405%
sub-2-001: 001 {"userId": "001", "action": "login", "timestamp": 1734625015.1587276}
接收到用户行为更新: 用户ID: 001,操作: login, 时间: 1734625015.1587276%
3.4.4 模式总结
关于“订阅”的说明(对于SUB、SUBX端来说)
- 当你使用一个SUB、XSUB套接字时(订阅方)必须使用zmq_setsockopt()和SUBSCRIBE设置一个订阅(例如上面的客户端的代码所示)。如果你创建了一个SUB套接字,但是没有设置任何订阅,那么就不会得到任何消息。
- 订阅者可以设置许多的订阅,它们被累加在一起。也就是说,如果某个更新匹配任何订阅,那么订阅者都会接收到它。
- 订阅者也可以取消特定的订阅。
- 订阅经常但不一定是一个可打印的字符串。
- 要了解这是如何工作的,请参见zmq_setsockopt()和下面的演示案例。
慢木匠”症状
-
如果发布者开启之后再启动订阅者,那么在订阅者启动的这段时间内,发布者发送的消息订阅者就接收不到了。
-
“满木匠”症状:即使先启动订阅者,再启动发布者,订阅者也可能错过发布者发送的消息:因此当订阅者连接到发布者时(这需要的时间很短,但非0),发布者可能已经将消息发送出去了。
-
“满木匠”症状案例:
- ZeroMQ在后台执行异步I/O。假设你有两个节点执行次操作,顺序如下:
- 订阅者接连到一个端点,并接收和处理消息。
- 发布者绑定到一个端点,并立即发送10000条消息。
- 订阅者很可能不会收到任何东西(在设置了正确的过滤器的情况下)。
- 建立TCP连接包含会花几毫秒的握手,这取决于你的网络和节点间的跳数。在这段时间里,ZeroMQ可以发送很多消息。
- ZeroMQ在后台执行异步I/O。假设你有两个节点执行次操作,顺序如下:
-
解决方法:
- 之后介绍如何来同步发布者和订阅者,这样就不会启动数据发布,直到订阅者真正连接并准备就绪。
- 有一个简单(愚蠢)的方式来延迟发布,就是休眠。但是,在实际中不建议这么做。
- 同步的替代办法是简单地假定发布的数据流是无限的,它没有起点也没有终点。人们还假设订阅者不关心它启动前发生了什么事情。
关于发布-订阅模式的几个要点:
- 订阅者不可以使用zmq_msg_send()等发送消息的函数,发布者不可以使用zmq_msg_recv()等接收消息的函数。
- 一个订阅者可以连接到多个发布者,每次使用一个connect调用。那么数据将交错到达(“公平排队”)。
- 如果一个发布者没有连接的订阅者,那么它简单地丢弃所有消息。
- 如果你使用的是TCP并且订阅者是慢速的,那么消息将在发布方排队。我们将在后面的文章中介绍如何通过使用“高水位线”来针对这种情况保护发布者。
- 从ZeroMQ v3.x开始,在使用连接的协议(tcp或ipc)时,过滤发生在发布方。使用epgm协议,过滤发生在订阅方。但在ZeroMQ v2.x版本中,所有过滤都发生在订阅方。
3.5 独家对模式
独家对模式(Exclusive pair)用于将一个对等点精确地连接到另一个对等点。此模式用于跨inproc传输的线程间通信。
互斥对模式由http://rfc.zeromq.org/spec:31正式定义。
“独家对模式”支持的套接字类型只有1种:
- ZMQ_PAIR
3.5.1 “PAIR”套接字类型
3.5.1.1 ZMQ_PAIR
ZMQ_PAIR类型的套接字只能一次连接到单个对等方。对通过ZMQ_PAIR套接字发送的消息不执行消息路由或筛选。
当ZMQ_PAIR套接字由于已达到连接对等方的高水位线而进入静音状态时,或者如果没有连接任何对等方,则套接字上的任何zmq_send()操作都应阻塞,直到对等方可用于发送;消息不会被丢弃。
适用协议:
- ZMQ_PAIR套接字设计用于通过nproc传输进行线程间通信,并且不实现自动重新连接等功能。
- 尽管ZMQ_PAIR套接字可用于inproc以外的其他传输协议,但是它们无法自动重新连接,并且当以前存在任何连接(包括关闭状态的连接)时,新的传入连接将被终止,这使得它们在大多数情况下不适合TCP。
ZMQ_PAIR特性摘要 | |
---|---|
兼容的对等套接字 | ZMQ_PAIR |
方向 | 双向 |
发送/接收模式 | 无限制 |
入网路由策略 | 不适用(N/A) |
外发路由策略 | 不适用(N/A) |
静音状态下的操作 | 阻塞 |
3.3.2 代码演示
演示示例:
PAIR3:主线程中的PAIR套接字,等待PAIR2发来通知消息
PAIR2:在主线程中调用pthread_create()创建线程,在线程的回调函数中创建PAIR2套接字,该套接字等待PAIR1发来通知消息
PAIR1:PAIR2所在的线程再调用一次pthread_create(),在线程的回调函数中创建PAIR1套接字,PAIR1会向PAIR2发送消息
整体的流程就是:PAIR1发送消息给PAIR2,PAIR2接收到PAIR1的消息之后再发送消息给PAIR3,PAIR3接收到PAIR2的消息之后退出程序
案例分析
这是使用ØMQ进行多线程编程的一个经典模式:
- 两个线程通过inproc通信,使用的是共享的上下文。
- 父线程创建一个套接字,将其绑定到一个inproc端点,然后启动子线程,将上下文传递给它。
- 子线程创建第二个套接字,将它连接到该inproc端点,然后发信号告诉父线程,它已准备就绪。
使用这种模式的多线程代码是不可扩展到进程的。如果你使用inproc和套接字对,你就正在构建一个紧耦合的应用程序,也就是说,其中你的线程在结构是相互依存的,只有在低延迟真的很重要的时候才这样做。另一种设计模式是一个松耦合的应用程序,其中的线程有自己的上下文并通过ipc或tcp通信。你可以轻松地将松耦合的线程分解为单独的进程。
代码示例:
import zmq
import time
import threading
def step3():
context = zmq.Context()
socket3 = context.socket(zmq.PAIR)
socket3.bind("tcp://*:5553")
print("PAIR3: Waiting for message from PAIR2...")
# 等待 PAIR2 发来的消息
message = socket3.recv_string()
print(f"PAIR3 received message: {message}")
def step2():
context = zmq.Context()
# 创建两个套接字,一个用于接收,另一个用于发送
socket2_recv = context.socket(zmq.PAIR)
socket2_send = context.socket(zmq.PAIR)
socket2_recv.bind("tcp://*:5552") # 用于接收消息
socket2_send.connect("tcp://localhost:5553") # 用于发送消息
print("PAIR2: Waiting for message from PAIR1...")
# 等待 PAIR1 发来的消息
message = socket2_recv.recv_string()
print(f"PAIR2 received message: {message}")
# 向 PAIR3 发送消息
print("PAIR2: Sending message to PAIR3...")
socket2_send.send_string("Hello from PAIR2!")
def step1():
context = zmq.Context()
socket1 = context.socket(zmq.PAIR)
socket1.bind("tcp://*:5551")
print("PAIR1: Sending message to PAIR2...")
# 向 PAIR2 发送消息
socket1.connect("tcp://localhost:5552")
socket1.send_string("Hello from PAIR1!")
if __name__ == '__main__':
# 创建并启动线程 2 (PAIR1)
thread2 = threading.Thread(target=step1)
thread2.start()
# 创建并启动线程 1 (PAIR2)
thread1 = threading.Thread(target=step2)
thread1.start()
# 主线程中的 PAIR3
step3()
# 等待线程 1 和线程 2 完成
thread1.join()
thread2.join()
结果展示:
PAIR1: Sending message to PAIR2...
PAIR2: Waiting for message from PAIR1...
PAIR3: Waiting for message from PAIR2...
PAIR2 received message: Hello from PAIR1!
PAIR2: Sending message to PAIR3...
PAIR3 received message: Hello from PAIR2!
3.3.3 模式总结
PAIR套接字应用场景:协调线程
在编写多线程应用程序时,会遇到如何“协调线程”的问题,例如一个线程状态发生改变时同时另一个线程:
- 如果使用以往的多线程程序,你可能会使用信号量或互斥等技术。
- 但是在ØMQ中,你可以使用ZMQ_PAIR套接字来进行线程间的通信。
为什么选择的是PAIR?
此处使用的是PAIR套接字,其他套接字组合也能够完成上面相同的工作,但是其他台套接字都有副作用,可能会干扰信令:
你可以让发送者使用PUSH并让接收者使用PULL,但是PUSH会把消息发送给所有存在的接收者,假设你启动了2个接收者,那么就会“丢失”一半的信号。PAIR具有拒绝多个连接,两个连接的组成的对是独占的。
你可以让发送者使用DEALER并让接收者使用ROUTER,但是ROUTER将你的信息包装在一个“封包”中,这意味着你的大小为0的信号变成了一个多部分消息。如果你不关心数据并把任何东西都当做一个有效的信号,并且如果你不会不止一次地从套接字上读取,这并不重要。但是,如果你决定要发送实际数据时,你会突然发现ROUTER为你提供了“错误”的消息。DEALER也分发传出消息,这带来与PUSH相同的风险。
你可以让发送者使用PUB,而让接收者使用SUB,这将完全按照你发送它们的原样正确地传递你的消息,而且PUB不像PUSH或DEALER那样分发消息。但是,你需要用空订阅配置订阅者,这比较麻烦,更糟的是,PUB-SUB链接的可靠性是与实践相关的,并且如果在PUB套接字发送消息时,SUB套接字正在连接,信息就有可能会丢失。
综合以上的原因,使得PAIR成为线程对之间协调的最佳选择。
4. 使用总结
4.1 bind和connect的区别
connect
:
- 用于客户端套接字,主动连接到一个已经开放并监听端口的服务器套接字。
- 客户端可以使用
connect
方法连接多个远程服务(例如连接多个服务实例),但只会连接到服务器的某个端口。 - 连接后,客户端就能发送或接收消息。
bind
:
- 用于服务端套接字,绑定到本地的一个端口,等待客户端连接。
- 服务器端使用
bind
来打开一个端口,允许多个客户端通过这个端口与其通信。 - 每个
bind
调用会使套接字监听一个端口,可以让多个客户端连接到该端口。
具体的应用场景
connect
: 用于客户端或主动请求的一方。一个套接字可以连接到多个远程地址。例如,在一个PUSH
套接字的情况下,你可以通过多次调用connect
来将任务发送到多个PULL
套接字,这样可以实现负载均衡。bind
: 用于服务端或被动接收请求的一方。通常每个bind
调用会启动一个监听端口。例如,一个PULL
套接字在服务端可能会调用bind
来绑定一个本地端口,等待PUSH
套接字的任务。
4.2 接收和发送字符串的区别
接收和发送字符串的话可以采用msg = b'str'的方式转换为字节串,就可以使用send
和recv
函数来接收发送。
如果要发送字符串,需要使用send_string
和recv_string
函数。
如果对字符串进行了编码操作,那么需要在接收字符串之后也进行解码操作。
4.3 订阅者和发布者的绑定
订阅者和发布者的绑定依赖于消息前缀。
对于发布者,如果要对不同的订阅者发布不同的消息,需要在消息前缀中与订阅者指定的过滤器相同。
对于订阅者,如要只订阅自己感兴趣的消息,需要使用setsockopt_string来过滤掉自己需要的前缀消息。
# pub
publisher.send_string(f"{postal_code} {message_str}")
# sub
subscriber.setsockopt_string(zmq.SUBSCRIBE, postal_code_filter)
4.4 同一个socket接收和发送消息出现问题
当同一个socket既接收消息有往另外一个端口发送消息,发现发送消息出现问题。
示例代码:
def step2():
context = zmq.Context()
socket2 = context.socket(zmq.PAIR)
socket2.bind("tcp://*:5552")
print("PAIR2: Waiting for message from PAIR1...")
# 等待 PAIR1 发来的消息
message = socket2.recv_string()
print(f"PAIR2 received message: {message}")
# 向 PAIR3 发送消息
socket2.connect("tcp://localhost:5553")
print("PAIR2: Sending message to PAIR3...")
socket2.send_string("Hello from PAIR2!")
根本原因在于你使用了一个套接字 (socket2
) 既接收消息又发送消息,这样会导致 PAIR2
发送消息给 PAIR3
时出现问题。zmq.PAIR
套接字是全双工的,但当一个套接字同时用于接收和发送消息时,你需要确保消息的发送和接收是通过不同的操作来进行的,而在你的代码中,socket2
在接收消息时也被用来发送消息,这会导致程序的行为不如预期。
解决方案:
你需要为接收和发送分别创建不同的套接字,这样可以避免套接字同时进行接收和发送操作。
def step2():
context = zmq.Context()
# 创建两个套接字,一个用于接收,另一个用于发送
socket2_recv = context.socket(zmq.PAIR)
socket2_send = context.socket(zmq.PAIR)
socket2_recv.bind("tcp://*:5552") # 用于接收消息
socket2_send.connect("tcp://localhost:5553") # 用于发送消息
print("PAIR2: Waiting for message from PAIR1...")
# 等待 PAIR1 发来的消息
message = socket2_recv.recv_string()
print(f"PAIR2 received message: {message}")
# 向 PAIR3 发送消息
print("PAIR2: Sending message to PAIR3...")
socket2_send.send_string("Hello from PAIR2!")
4.4 send不同方式的区别
在 ZeroMQ 中,Python 环境下的 send
方法有多种不同的用法,主要区别在于发送消息时的一些参数设置。这些参数控制消息的发送方式,包括消息的格式、是否阻塞、是否需要分割等。以下是常见的几种 send
方法和它们的区别:
1. send_string()
send_string()
是 zmq
的专用方法,用于发送字符串消息。这会自动将 Python 字符串编码为字节流。
send_string("Hello, ZeroMQ!")
特点:
- 适用于发送简单的字符串消息。
- 自动将字符串转换为字节流(默认编码为 UTF-8)。
- 无需显式地进行字节编码。
2. send_bytes()
send_bytes()
用于发送原始字节数据。如果你有字节类型的数据,直接使用 send_bytes()
会更加高效。
send_bytes(b"Hello, ZeroMQ!")
特点:
- 适用于发送原始的字节数据。
- 如果你已经有字节数据(例如文件内容、图片等),使用
send_bytes()
可以避免额外的编码操作。 - 需要手动确保数据是字节类型(
bytes
类型)。
3. send()
send()
是 ZeroMQ 中的通用发送方法,支持发送任意类型的数据(如字节、字符串、对象等)。如果你希望发送非字符串或字节数据,可以使用这个方法。
send(b"Hello, ZeroMQ!")
或者,你也可以指定 flags
参数来控制消息的行为(例如 zmq.DONTWAIT
)。
特点:
- 支持发送不同类型的数据(如字节数据、字符串等)。
- 需要确保传入的是字节数据(
bytes
类型),否则可能会引发错误。 - 可以通过
flags
参数指定一些特定的行为,例如是否阻塞,是否是最后一部分消息(在消息分割时很有用)。
4. send_multipart()
send_multipart()
用于发送多个消息部分。在 ZeroMQ 中,消息是按部分(part)进行传输的,send_multipart()
允许你分多部分发送一个消息。例如,可以先发送一个头部,然后发送正文。
send_multipart([b"Header", b"Body", b"Footer"])
特点:
- 适用于发送由多个部分组成的消息。
- 每个部分可以是字节数据或字符串。
- 消息的接收端需要用
recv_multipart()
来接收这些部分。
5. send_json()
send_json()
是 ZeroMQ 提供的一个方便的功能,专门用于发送 JSON 数据。它会将 Python 的字典(或其他支持的对象)自动编码为 JSON 格式,并将其发送。
send_json({"key": "value"})
特点:
- 专门用于发送 JSON 数据,Python 对象会被自动转换为 JSON 格式。
- 使用此方法时无需手动编码 JSON 数据,方便快捷。
6. send()
with flags
send()
方法还支持额外的 flags
参数,用于控制消息的发送行为。这些标志通常用于细粒度的消息控制,例如非阻塞发送、分段发送等。
send(b"Hello, ZeroMQ!", zmq.DONTWAIT)
常见的 flags
:
zmq.DONTWAIT
:非阻塞发送,如果消息不能立即发送,会抛出异常。zmq.SNDMORE
:在分段发送消息时,标志“更多部分将在后面发送”。zmq.NOBLOCK
:类似于DONTWAIT
,用于非阻塞发送。
这些标志通常与分布式应用场景中的消息分割和流控制有关。
总结:
send_string()
:专门发送字符串,自动处理编码。send_bytes()
:用于发送原始字节数据。send()
:通用的发送方法,适用于发送字节流数据。send_multipart()
:用于发送由多个部分组成的消息。send_json()
:用于发送 JSON 格式的数据,自动处理 Python 对象到 JSON 的转换。flags
参数:控制消息发送的行为,例如非阻塞发送、分段发送等。
选择哪种发送方式取决于你要发送的数据类型和具体的需求,例如是否需要分割消息,是否需要 JSON 格式,或者是否要控制消息发送的行为。
标签:zmq,socket,python,message,发送,使用,ZeroMQ,ZMQ,接字 From: https://www.cnblogs.com/zreo2home/p/18628575