首页 > 编程语言 >ZeroMQ的使用(python)

ZeroMQ的使用(python)

时间:2024-12-24 19:54:33浏览次数:3  
标签:zmq socket python message 发送 使用 ZeroMQ ZMQ 接字

ZeroMQ的使用(python)

1. python环境使用

1.1 非源码安装

在安装pyzmqlibzmq3-devel之后便可以使用 zeromq 了。需要提前安装好python环境。

(venv-patroni-4.0.3) [fbase@localhost zmq]$ mkdir ~/soft/zmq
(venv-patroni-4.0.3) [fbase@localhost zmq]$ cd ~/soft/zmq
(venv-patroni-4.0.3) [fbase@localhost zmq]$ vi test.py 
(venv-patroni-4.0.3) [fbase@localhost zmq]$ vi test1.py 
(venv-patroni-4.0.3) [fbase@localhost zmq]$ chmod +x test.py 
(venv-patroni-4.0.3) [fbase@localhost zmq]$ chmod +x test1.py 
(venv-patroni-4.0.3) [fbase@localhost zmq]$ ll
total 8
-rwxrwxr-x 1 fbase fbase 572 Dec 19 11:31 test1.py
-rwxrwxr-x 1 fbase fbase 466 Dec 19 11:31 test.py
  • test.py:相当于服务器。

    #
    #   Hello World server in Python
    #   Binds REP socket to tcp://*:5555
    #   Expects b"Hello" from client, replies with b"World"
    #
    
    import time
    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:5555")
    
    while True:
        #  Wait for next request from client
        message = socket.recv()
        print(f"Received request: {message}")
    
        #  Do some 'work'
        time.sleep(1)
    
        #  Send reply back to client
        socket.send(b"World")
    

    服务器创建一个响应类型的套接字,将其绑定到 端口5555,然后等待消息。你也可以看到我们的 配置,我们只是发送字符串。

  • test1.py:相当于客户端。

    #
    #   Hello World client in Python
    #   Connects REQ socket to tcp://localhost:5555
    #   Sends "Hello" to server, expects "World" back
    #
    
    import zmq
    
    context = zmq.Context()
    
    #  Socket to talk to server
    print("Connecting to hello world server…")
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
    
    #  Do 10 requests, waiting each time for a response
    for request in range(10):
        print(f"Sending request {request} …")
        socket.send(b"Hello")
    
        #  Get the reply.
        message = socket.recv()
        print(f"Received reply {request} [ {message} ]")
    

    客户端创建请求类型的套接字,连接并开始发送 充分和传播

    sendreceive方法都是阻塞的(默认)。对于接收 这很简单:如果没有消息,则该方法将阻塞。发送它是 更复杂,取决于插座类型。对于请求套接字,如果 达到高水位线或没有对等体被连接,则该方法将阻塞。

1.2 源码安装

1.2.1 c方式

源码安装一开始只有c方式,先用c方式进行测试。

(venv-patroni-4.0.3) [fbase@localhost zmq]$ touch hwserver.c
(venv-patroni-4.0.3) [fbase@localhost zmq]$ ll
total 3164
-rw-rw-r--  1 fbase fbase       0 Dec 19 14:57 hwserver.c
drwxr-xr-x 12 fbase fbase    4096 Dec 19 14:47 zeromq-4.1.8
-rw-r--r--  1 fbase fbase 1479856 Dec 19 10:46 zeromq-4.1.8.tar.gz
(venv-patroni-4.0.3) [fbase@localhost zmq]$ vi hwserver.c 
(venv-patroni-4.0.3) [fbase@localhost zmq]$ gcc -o hwserver hwserver.c -lzmq
(venv-patroni-4.0.3) [fbase@localhost zmq]$ ll
total 3180
-rwxrwxr-x  1 fbase fbase    8923 Dec 19 14:58 hwserver
-rw-rw-r--  1 fbase fbase     560 Dec 19 14:57 hwserver.c
drwxr-xr-x 12 fbase fbase    4096 Dec 19 14:47 zeromq-4.1.8
-rw-r--r--  1 fbase fbase 1479856 Dec 19 10:46 zeromq-4.1.8.tar.gz

运行hwserver,会发现报错,如下所示:

(venv-patroni-4.0.3) [fbase@localhost zmq]$ hwserver
hwserver: error while loading shared libraries: libzmq.so.5: cannot open shared object file: No such file or directory

这个原因是没有找到libzmq.so.5,这个依赖在/usr/local/lib中,将其添加到LD_LIBRARY_PATH中。

(venv-patroni-4.0.3) [fbase@localhost zmq]$ ls /usr/local/include/
zmq.h  zmq_utils.h
(venv-patroni-4.0.3) [fbase@localhost zmq]$ ls /usr/local/lib
libzmq.a  libzmq.la  libzmq.so  libzmq.so.5  libzmq.so.5.0.4  pkgconfig
(venv-patroni-4.0.3) [fbase@localhost zmq]$ echo 'export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH' >> ~/.bashrc
(venv-patroni-4.0.3) [fbase@localhost zmq]$ source ~/.bashrc
(venv-patroni-4.0.3) [fbase@localhost zmq]$ echo $LD_LIBRARY_PATH
/usr/local/fbase/13/lib:/usr/local/lib:/usr/local/lib:/usr/local/fbase/13/lib:
(venv-patroni-4.0.3) [fbase@localhost zmq]$ sudo ldconfig
[sudo] password for fbase: 
(venv-patroni-4.0.3) [fbase@localhost zmq]$ ldd $(which hwserver)
	linux-vdso.so.1 =>  (0x00007fff33997000)
	libzmq.so.5 => /usr/local/lib/libzmq.so.5 (0x00007f61d4a56000)
	libc.so.6 => /lib64/libc.so.6 (0x00007f61d468c000)
	librt.so.1 => /lib64/librt.so.1 (0x00007f61d4484000)
	libpthread.so.0 => /lib64/libpthread.so.0 (0x00007f61d4268000)
	libstdc++.so.6 => /lib64/libstdc++.so.6 (0x00007f61d3f5f000)
	libm.so.6 => /lib64/libm.so.6 (0x00007f61d3c5d000)
	libgcc_s.so.1 => /lib64/libgcc_s.so.1 (0x00007f61d3a47000)
	/lib64/ld-linux-x86-64.so.2 (0x00007f61d4cc2000)
(venv-patroni-4.0.3) [fbase@localhost zmq]$ hwserver 

1.2.2 python方式

编译安装pyzmq

tar -zxvf pyzmq-26.2.0.tar.gz
cd pyzmq-26.2.0
# 编译安装(低版本)
python setup.py configure --zmq=/usr/local #set the zmq install path
# 编译安装高版本
pip install setuptools wheel
pip install .
pip show pyzmq

使用与1.1一致。

2. Zeromq使用介绍

2.1 Bind vs Connect

使用ZeroMQ套接字,谁绑定谁连接都无所谓。在上述 您可能已经注意到,服务器使用Bind,而客户端使用Connect。 这是为什么,有什么区别?

ZeroMQ为每个底层连接创建队列。如果您的套接字连接到 三个对等套接字,那么在幕后有三个消息队列。

使用Bind,您允许对等端连接到您,因此您不知道有多少对等端 将来会有,你不能提前创建队列。 相反,队列是在各个对等点连接到绑定套接字时创建的。

通过Connect,ZeroMQ知道至少会有一个对等点, 因此它可以立即创建单个队列。这适用于所有套接字类型 除了ROUTER,其中队列仅在我们连接到的对等体 承认我们的联系

因此,当向没有对等体的绑定套接字或ROUTER发送消息时, 由于没有实时连接,因此没有队列可用于存储消息。

2.1.1 什么时候应该使用bind和connect?

一般来说,从架构中最稳定的点使用绑定, 使用带有volatile端点的动态组件的connect。对于请求/答复, 服务提供者可能是您绑定和客户端使用的点 connect.就像普通的TCP一样。

如果你不能弄清楚哪些部分更稳定(即点对点), 考虑中间的稳定设备,所有侧都可以连接到该设备。

2.2 高水位线

高水位线是一个硬限制的最大数量的优秀 消息ZeroMQ正在内存中为指定的 socket正在与通信。

如果已达到此限制,则套接字进入异常状态, 根据套接字类型,ZeroMQ将采取适当的操作,例如 阻止或丢弃已发送的消息。请参阅各个插座的说明 有关针对每种套接字类型采取的确切操作的详细信息,请参见下面的。

2.3 消息传递模式

在ZeroMQ的套接字API的牛皮纸包装下, 信息模式ZeroMQ模式是由套接字对实现的, 匹配类型。

内置的核心ZeroMQ模式是:

  • Request-reply:将一组客户端连接到一组服务。这 是一种远程过程调用和任务分发模式。
  • Pub-sub:将一组发布者连接到一组订阅者。这 是一种数据分布模式。
  • Pipeline:它以扇出/扇入模式连接节点, 多个步骤和循环。这是一个并行的任务分配和收集 格局
  • Exclusive pair:独占地连接两个套接字。这是一种模式 用于在进程中连接两个线程,不要与“正常”混淆 成对的插座。

还有更多ZeroMQ模式仍处于草案状态:

  • Client-server:允许单个ZeroMQ服务器与一个或多个进行对话 ZeroMQ客户端。客户总是先开始谈话,然后 任一对等体可以异步地向另一对等体发送消息。
  • Radio-dish:用于单个数据的一对多分发 发布者以扇出方式发送给多个订阅者。

3. 使用

在pyzmq中有一些基本使用,如下所示:

# 创建 ZeroMQ 上下文
context = zmq.Context()
# 创建一个请求-响应套接字
socket = context.socket(zmq.REP)

这两步是基本的使用,获取上下文,和根据需要获取套接字,下面是对各个模型的演示代码和套接字介绍和使用。

3.1 Request-reply请求响应模型

请求-响应模式由http://rfc.zeromq.org/spec:28正式定义。

请求-应答模式应该是最常见的交互模式,如果连接之后,服务器终止,那么客户端也终止,从崩溃的过程中恢复不太容易
因此,做一个可靠的请求-应答模式很复杂。

“请求-响应模型”支持的套接字类型有4种:

  • ZMQ_REP
  • ZMQ_REQ
  • ZMQ_DEALER
  • ZMQ_ROUTER

3.1.1 “REQ-REP”套接字类型

  • 请求-响应模式用于将请求从ZMQ_REQ客户端发送到一个或多个ZMQ_REP服务,并接收对每个发送的请求的后续答复
  • REQ-REP套接字对是步调一致的。它们两者的次序必须有规则,不能同时发送或接收,否则无效果
3.1.1.1 ZMQ_REQ

客户端使用ZMQ_REQ类型的套接字向服务发送请求并从服务接收答复。

此套接字类型仅允许zmq_send(request)和后续zmq_recv(reply)调用交替序列。发送的每个请求都在所有服务中轮流轮询,并且收到的每个答复都与最后发出的请求匹配。

如果没有可用的服务,则套接字上的任何发送操作都应阻塞,直到至少有一项服务可用为止。REQ套接字不会丢弃消息

ZMQ_REQ特性摘要
兼容的对等套接字 ZMQ_REP、ZMQ_ROUTER
方向 双向
发送/接收模式 发送、接收、发送、接收......
入网路由策略 最后一位(Last peer)
外发路由策略 轮询
静音状态下的操作 阻塞
3.1.1.2 ZMQ_REP

服务使用ZMQ_REP类型的套接字来接收来自客户端的请求并向客户端发送回复。

此套接字类型仅允许zmq_recv(request)和后续zmq_send(reply)调用的交替序列。接收到的每个请求都从所有客户端中公平排队,并且发送的每个回复都路由到发出最后一个请求的客户端。

如果原始请求者不再存在,则答复将被静默丢弃。

ZMQ_REP特性摘要
兼容的对等套接字 ZMQ_REQ、ZMQ_DEALER
方向 双向
发送/接收模式 发送、接收、发送、接收......
入网路由策略 公平排队
外发路由策略 最后一位(Last peer)

3.1.2 代码演示

服务端创建REP套接字,阻塞等待客户端消息的到达,当客户端有消息达到时给客户端回送“World”字符串。

客户端创建REP套接字,向服务端发送字符串“Hello”,然后等待服务端回送消息。

  • 服务端代码server.py

    import zmq
    import time
    
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:5555")
    
    while True:
        # 接收数据
        msg = socket.recv()
        if msg is not None:
            print(f"msg:{msg.decode('utf-8')}")
    
        time.sleep(1)
    
        # 发送数据
        # msg = socket.recv_string()
        # reply_msg = "world"
        # result = socket.send_string(reply_msg)
    
        # msg = socket.recv()
        # reply_msg = b"world"
        # result = socket.send(reply_msg)
    
        reply_msg = "world"
        result = socket.send(reply_msg.encode("utf-8"))
        print(f"reply result:{result}")
    
    • 如果消息是字符串,使用 send_string
    • 如果你使用 send,确保传入的是字节数据(通过 .encode() 转换)。或者发送二进制的数组。
  • 客户端代码client.py

    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
    
    while True:
        # 发送数据
        # req_msg = "hello"
        # result = socket.send_string(req_msg)
        # reply_msg = socket.recv_string()
    
        # req_msg = b"hello"
        # result = socket.send(req_msg)
        # reply_msg = socket.recv()
    
        req_msg = "hello"
        result = socket.send(req_msg.encode("utf-8"))
        print(f"request result:{result}")
    
        # 接收回复数据数据
        reply_msg = socket.recv()
        if reply_msg is not None:
            print(f"reply msg:{reply_msg.decode('utf-8')}")
    

结果展示:

# server
msg:hello
reply result:None
msg:hello
reply result:None

# client
request result:None
reply msg:world
request result:None
reply msg:world

3.2 Exclusive pair推拉模型

管道模式由http://rfc.zeromq.org/spec:30正式定义。

也被称为管道模型;管道模式在有的地方也称为“流水线”模式。

管道模式用于将数据分发到布置在流水线中的节点。数据始终沿流水线向下流动,流水线的每一级都连接到至少一个节点。当流水线级连接到多个节点时,数据在所有连接的节点之间进行轮询。

“管道模式“支持的套接字类型有2种:

  • ZMQ_PUSH
  • ZMQ_PULL

3.2.1 PUSH-PULL”套接字类型

3.2.1.1 ZMQ_PUSH

管道节点使用类型为ZMQ_PUSH的套接字将消息发送到下游流水线节点。消息循环到所有连接的下游节点。

该套接字类型不支持zmq_msg_recv()等接收数据的函数。

当ZMQ_PUSH套接字由于已达到所有下游节点的高水位线而进入静音状态时,或者如果根本没有下游节点,则套接字上的任何zmq_send()操作都应阻塞,直到静音状态结束或处于至少一个下游节点可用于发送;消息不会被丢弃。

ZMQ_PUSH特性摘要
兼容的对等套接字 ZMQ_PUSH
方向 单向
发送/接收模式 仅发送
入网路由策略 不适用(N/A)
外发路由策略 轮询
静音状态下的操作 阻塞
3.2.1.2 ZMQ_PULL

管道节点使用ZMQ_PULL类型的套接字从上游管道节点接收消息。

消息从所有连接的上游节点中公平排队。

该套接字类型不支持zmq_msg_send()等发送数据的函数。

ZMQ_PULL特性摘要
兼容的对等套接字 ZMQ_PULL
方向 单向
发送/接收模式 仅接收
入网路由策略 公平排队
外发路由策略 不适用(N/A)
静音状态下的操作 阻塞

3.2.2 代码演示

演示案例:

一台发生器(taskvent.py),它产生可以并行执行的任务。

工人1(taskwork.py),用于处理任务。

工人2(taskwork-2.py),用于处理任务。

一个接收器(tasksink.py),用于收集工作进程返回的结果。

  • 发生器taskvent.py

    import zmq
    import time
    
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.connect("tcp://localhost:5555")
    socket.connect("tcp://localhost:5554")
    
    print("Task generator started, sending tasks...")
    for i in range(20):
        task = f"Task {i + 1}"
        print(f"Generating: {task}")
        socket.send_string(task)
        time.sleep(1)
    
    socket.close()
    context.term()
    
  • 工人1taskwork.py

    import zmq
    import math
    import time
    
    context = zmq.Context()
    receiver = context.socket(zmq.PULL)
    receiver.bind("tcp://localhost:5555")
    sender = context.socket(zmq.PUSH)
    sender.connect("tcp://localhost:5556")
    
    print("Worker started, processing tasks...")
    
    while True:
        # 接收发生器的消息
        task = receiver.recv_string()
        print(f"Proccessing 1 : {task}")
        task_number = int(task.split()[1])
    
        # 给接收器发送消息
        result_msg = f"work1: Result of {math.pow(task_number, 2)}"
        sender.send_string(result_msg)
        time.sleep(1)
    
  • 工人2taskwork-2.py

    import zmq
    import time
    
    context = zmq.Context()
    receiver = context.socket(zmq.PULL)
    receiver.bind("tcp://localhost:5554")
    sender = context.socket(zmq.PUSH)
    sender.connect("tcp://localhost:5556")
    
    print("Worker2 started, processing tasks...")
    
    while True:
        # 接收发生器的消息
        task = receiver.recv_string()
        print(f"Proccessing 2 : {task}")
        task_number = int(task.split()[1])
    
        # 给接收器发送消息
        result_msg = f"work2: Result of {task_number * 2}"
        sender.send_string(result_msg)
        time.sleep(1)
    
  • 接收器tasksink.py

    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.bind("tcp://localhost:5556")
    
    print("Receiver started, collecting results...")
    for i in range(20):
        result = socket.recv_string()
        print(f"Result: {result}")
    
    socket.close()
    context.term()
    

结果展示:

# taskvent
Task generator started, sending tasks...
Generating: Task 1
Generating: Task 2
Generating: Task 3
Generating: Task 4
Generating: Task 5
Generating: Task 6
Generating: Task 7
Generating: Task 8
Generating: Task 9
Generating: Task 10
Generating: Task 11
Generating: Task 12
Generating: Task 13
Generating: Task 14
Generating: Task 15
Generating: Task 16
Generating: Task 17
Generating: Task 18
Generating: Task 19
Generating: Task 20

# taskwork
Worker started, processing tasks...
Proccessing 1 : Task 1
Proccessing 1 : Task 3
Proccessing 1 : Task 5
Proccessing 1 : Task 7
Proccessing 1 : Task 9
Proccessing 1 : Task 11
Proccessing 1 : Task 13
Proccessing 1 : Task 15
Proccessing 1 : Task 17
Proccessing 1 : Task 19
    
# taskwork2
Worker2 started, processing tasks...
Proccessing 2 : Task 2
Proccessing 2 : Task 4
Proccessing 2 : Task 6
Proccessing 2 : Task 8
Proccessing 2 : Task 10
Proccessing 2 : Task 12
Proccessing 2 : Task 14
Proccessing 2 : Task 16
Proccessing 2 : Task 18
Proccessing 2 : Task 20

# tasksink
Receiver started, collecting results...
Result: work2: Result of 4
Result: work2: Result of 8
Result: work2: Result of 12
Result: work2: Result of 16
Result: work1: Result of 1.0
Result: work1: Result of 9.0
Result: work1: Result of 25.0
Result: work1: Result of 49.0
Result: work1: Result of 81.0
Result: work2: Result of 20
Result: work1: Result of 121.0
Result: work2: Result of 24
Result: work1: Result of 169.0
Result: work2: Result of 28
Result: work1: Result of 225.0
Result: work2: Result of 32
Result: work1: Result of 289.0
Result: work2: Result of 36
Result: work1: Result of 361.0
Result: work2: Result of 40

3.3.3 模式总结

  • 工人向上连接到发生器,并且向下连接到接收器。这意味着你可以随意添加工人。 因此,发生器和接收器是架构的固定部分,而工人是动态部分。
  • 我们必须同步开始同批次的所有工人的启动和运行。这是ZeroMQ中存在的一个相当普遍的疑难杂症,并没有简单的解决办法。connect方法需要一定的时间,所以当一组工人连接到发生器时,第一个成功连接的工人会在这短短的时间得到消息的整个负载,而其他工人仍在进行连接。如果不知何故批次的开始不同步,那么系统就将无法并行运行。
  • 发生器的PUSH套接字将任务均匀地分配给工人。这就是所谓的负载均衡。
  • 接收器的PULL套接字均匀地收集来自工人的结果。这就是所谓的公平排队。

管道模式也有类似“慢木匠”的现象

  • 它导致了对PUSH套接字不能正确地负载均衡的指责。如果你使用的是PUSH和PULL,并且你的某个工人得到比其他工人更多的信息,这是因为PULL套接字已比别人更快地连接,并在其他工人试图连接之前抓取了很多消息。

3.3 Pipeline共享队列/代理模型

3.3.1 “DEALER-ROUTER”套接字类型

3.3.1.1 ZMQ_DEALER

ZMQ_DEALER类型的套接字是用于扩展“请求/应答”套接字的高级模式。

发送消息时:当ZMQ_DEALER套接字由于已达到所有对等点的最高水位而进入静音状态时,或者如果根本没有任何对等点,则套接字上的任何zmq_send()操作都应阻塞,直到静音状态结束或至少一个对等方变得可以发送;消息不会被丢弃。

接收消息时:发送的每条消息都是在所有连接的对等方之间进行轮询,并且收到的每条消息都是从所有连接的对等方进行公平排队的
将ZMQ_DEALER套接字连接到ZMQ_REP套接字时,发送的每个消息都必须包含一个空的消息部分,定界符以及一个或多个主体部分。

ZMQ_DEALER特性摘要
兼容的对等套接字 ZMQ_ROUTER、ZMQ_REP、ZMQ_DEALER
方向 双向
发送/接收模式 无限制
入网路由策略 公平排队
外发路由策略 轮询
静音状态下的操作 阻塞
3.3.1.2 ZMQ_ROUTER

ZMQ_ROUTER类型的套接字是用于扩展请求/答复套接字的高级套接字类型。

当收到消息时:ZMQ_ROUTER套接字在将消息传递给应用程序之前,应在消息部分之前包含消息的始发对等方的路由ID。接收到的消息从所有连接的同级之间公平排队。

发送消息时:

  • ZMQ_ROUTER套接字应删除消息的第一部分,并使用它来确定消息应路由到的对等方的_routing id _。如果该对等点不再存在或从未存在,则该消息将被静默丢弃。
  • 但是,如果ZMQ_ROUTER_MANDATORY套接字选项设置为1,这两种情况下套接字都将失败并显示EHOSTUNREACH。

高水位标记影响:

  • 当ZMQ_ROUTER套接字由于达到所有同位体的高水位线而进入静音状态时,发送到该套接字的任何消息都将被丢弃,直到静音状态结束为止。同样,任何路由到已达到单个高水位标记的对等方的消息也将被丢弃。
  • 如果ZMQ_ROUTER_MANDATORY套接字选项设置为1,则在两种情况下套接字都应阻塞或返回EAGAIN。

ZMQ_ROUTER_MANDATORY套接字选项:

  • 当ZMQ_ROUTER套接字的ZMQ_ROUTER_MANDATORY标志设置为1时,套接字应在接收到来自一个或多个对等方的消息后生成ZMQ_POLLIN事件。
  • 同样,当至少一个消息可以发送给一个或多个对等方时,套接字将生成ZMQ_POLLOUT事件。

当ZMQ_REQ套接字连接到ZMQ_ROUTER套接字时,除了始发对等方的路由ID外,每个收到的消息都应包含一个空的定界符消息部分。因此,由应用程序看到的每个接收到的消息的整个结构变为:一个或多个路由ID部分,定界符部分,一个或多个主体部分。将回复发送到ZMQ_REQ套接字时,应用程序必须包括定界符部分。

ZMQ_ROUTER特性摘要
兼容的对等套接字 ZMQ_DEALER、ZMQ_REQ、ZMQ_ROUTER
方向 双向
发送/接收模式 无限制
入网路由策略 公平排队
外发路由策略 看上面介绍
静音状态下的操作 丢弃(见上面介绍)

3.3.2 代码演示

演示示例:

架构图见3.3.3的方法②。

扩展了上面的“REQ-REP”演示示例:REQ和ROUTER交流,DEALER与REP交流。代理节点从一个套接字读取消息,并将消息转发到其他套接字。

客户端:将REQ套接字连接到代理的ROUTER节点上,向ROUTER节点发送“Hello”,接收到“World”的回复。

代理端:

  1. 创建一个ROUTER套接字与客户端相连接,创建一个DEALER套接字与服务端相连接

  2. ROUTER套接字从客户端接收请求数据,并把请求数据发送给服务端

  3. DEALER套接字从服务端接收响应数据,并把响应数据发送给客户端

服务端:将REP套接字连接到代理的DEALER节点上。

  • 客户端代码:

    import zmq
    import time
    
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
    
    print("客户端-1已启动,正在发送信息...")
    while True:
        # 发送数据
        req_msg = "client-1-hello"
        socket.send_string(req_msg)
        print(f"clinet-1 req_msg:{req_msg}")
    
        # 接收回复数据数据
        reply_msg = socket.recv_string()
        print(f"reply clinet-1 msg:{reply_msg}")
    
        time.sleep(1)
    
  • 服务端代码:

    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect("tcp://localhost:5556")  # 服务端监听正确的端口
    
    print("服务端-1已启动,正在接收信息...")
    while True:
        msg = socket.recv_string()  # 接收消息
        print(f"server-1 msg: {msg}")
    
        reply_msg = "server-1 world"
        socket.send_string(reply_msg)  # 回复消息
        print(f"reply server-1 reply_msg: {reply_msg}")
    
  • 代理端代码:

    import zmq
    
    context = zmq.Context()
    
    # 创建前端 ROUTER 套接字并绑定到客户端连接地址
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5555")
    
    # 创建后端 DEALER 套接字并绑定到服务端连接地址
    backend = context.socket(zmq.DEALER)
    backend.bind("tcp://*:5556")
    
    # 创建 poller 用于同时监听多个套接字
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)
    
    print("代理节点已启动,正在处理信息...")
    
    while True:
        socks = dict(poller.poll())
    
        if frontend in socks:
            # 从 ROUTER 接收来自客户端的消息(multipart 消息)
            parts = frontend.recv_multipart()
            print(f"收到客户端消息: {parts}")
    
            # 将完整的 multipart 消息转发给 DEALER
            backend.send_multipart(parts)
    
        if backend in socks:
            # 从 DEALER 接收来自服务端的回复
            reply_parts = backend.recv_multipart()
            print(f"收到服务端回复: {reply_parts}")
    
            # 将回复转发回原始客户端(假设第一个部分是客户端 ID)
            frontend.send_multipart(reply_parts)
    

    如果不使用zmq.Poller()

    import zmq
    import time
    
    context = zmq.Context()
    
    # 创建前端 ROUTER 套接字并绑定到客户端连接地址
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5555")
    
    # 创建后端 DEALER 套接字并绑定到服务端连接地址
    backend = context.socket(zmq.DEALER)
    backend.bind("tcp://*:5556")
    
    print("代理节点已启动,正在处理信息...")
    
    try:
        while True:
            # 尝试从前端接收消息,如果无消息则快速跳过
            try:
                parts = frontend.recv_multipart(flags=zmq.NOBLOCK)
                print(f"收到客户端消息: {parts}")
                # 将完整的 multipart 消息转发给后端
                backend.send_multipart(parts)
            except zmq.Again:
                pass  # No message received from frontend, continue
    
            # 尝试从后端接收回复,如果无消息则快速跳过
            try:
                reply_parts = backend.recv_multipart(flags=zmq.NOBLOCK)
                print(f"收到服务端回复: {reply_parts}")
                # 将回复转发回原始客户端
                frontend.send_multipart(reply_parts)
            except zmq.Again:
                pass  # No message received from backend, continue
    
            # 避免CPU占用过高
            time.sleep(0.01)  # 等待10毫秒再检查
    
    except KeyboardInterrupt:
        print("中断信号收到,退出程序...")
    finally:
        frontend.close()
        backend.close()
        context.term()
    

结果展示:

# client
客户端-1已启动,正在发送信息...
clinet-1 req_msg:client-1-hello
reply clinet-1 msg:server-1 world
clinet-1 req_msg:client-1-hello
reply clinet-1 msg:server-1 world

# proxt
代理节点已启动,正在处理信息...
收到客户端消息: [b'\x00k\x8bEg', b'', b'client-1-hello']
收到服务端回复: [b'\x00k\x8bEg', b'', b'server-1 world']

# server
服务端-1已启动,正在接收信息...
server-1 msg: client-1-hello
reply server-1 reply_msg: server-1 world
server-1 msg: client-1-hello
reply server-1 reply_msg: server-1 world

如果有多个服务器和客户端,可以通过b'\x00k\x8bEg'来进行客户端的选择。

3.3.3 模式总结

共享队列/代理

  • 在“REP-REQ”的演示案例中,我们只有一个客户端和一个服务端进行交流。但是实际中,我们通常允许多个客户端与多个服务端之间相互交流。
  • 将多个客户端连接到多个服务器的方法有两种:
    • 方法①:将每个客户端都连接到多个服务端点。
    • 方法②:使用代理。

方法①:

  • 原理:一种是将每个客户端套接字连接到多个服务端点。REQ套接字随后会将请求发送到服务端上。比如说一个客户端连接了三个服务端点:A、B、C,之后发送请求R1、R4到服务A上,发送请求R2到服务B上、发送请求R3到服务C上(如下图所示)。
  • 对于这种设计来说,服务器属于静态部分,客户端属于动态部分,客户端的增减无所谓,但是服务器的增减确实致命的。假设现在有100个客户端都连接了服务器,如果此时新增了三台服务器,为了让客户端识别这新增的三台服务器,那么就需要将所有的客户端都停止重新配置然后再重新启动。

方法②:

  • 我们可以编写一个小型消息排队代理,使我们具备灵活性。
  • 原理:该代理绑定到了两个端点,一个用于客户端的前端(ZMQ_ROUTER),另一个用于服务的后端(ZMQ_DEALER)。然后带来使用zmq_poll()来轮询这两个套接字的活动,当有消息时,代理会将消息在两个套接字之间频繁地输送。
  • 该代理其实并不显式管理任何队列,其只负责消息的传送,ØMQ会自动将消息在每个套接字上进行排队。
  • 使用zmq_poll()配合DEALER-ROUTER:
    • 在上面我们使用REQ-REP套接字时,会有一个严格同步的请求-响应对话,就是必须客户端先发送一个请求,然后服务端读取请求并发送应答,最后客户端读取应答,如此反复。如果客户端或服务端尝试破坏这种约束(例如,连续发送两个请求,而没有等待响应),那么将返回一个错误。
    • 我们的代理必须是非阻塞的,可以使用zmq_poll()来轮询任何一个套接字上的活动,但我们不能使用REQ-REQ。幸运地是,有两个称为DEALER和ROUTER的套接字,它们能我们可以执行无阻塞的请求-响应。

3.4 Pub-sub发布订阅模型

发布-订阅模式由https://rfc.zeromq.org/spec/29/正式定义

在发布-订阅模式中,有一个发布者用来发送消息,该模式中有很多订阅者会接收发布者发布的消息

“发布-订阅”模型支持的套接字类型有4种:

  • ZMQ_PUB
  • ZMQ_SUB
  • ZMQ_XPUB
  • ZMQ_XSUB

3.4.1 “PUB-SUB”套接字类型

  • PUB就是发布者,SUB就是订阅者。
3.4.1.1 ZMQ_PUB

发布者使用类型为ZMQ_PUB的套接字来分发数据。发送的消息以扇出方式分发给所有连接的对等方。

在ZMQ_PUB类型的套接字上不能执行zmq_msg_recv()等接收数据的函数。

当ZMQ_PUB套接字由于已达到订阅者的高水位标记而进入静音状态时,将发送给有问题的订阅者的任何消息都将被丢弃,直到静音状态结束为止。

对于该套接字类型,zmq_msg_send()函数将永远不会阻塞。

ZMQ_PUB特性摘要
兼容的对等套接字 ZMQ_SUB、ZMQ_XSUB
方向 单向
发送/接收模式 仅发送
入网路由策略 不适用(N/A)
外发路由策略 扇出(Fan out)
静音状态下的操作 丢弃
3.4.1.2 ZMQ_SUB

订阅者使用ZMQ_SUB类型的套接字来订阅发布者分发的数据。

ZMQ_SUB套接字创建完成之后,ZMQ_SUB套接字未订阅任何消息,请使用zmq_setsockopt()的ZMQ_SUBSCRIBE选项指定要订阅的消息。

在ZMQ_PUB类型的套接字上不能执行zmq_msg_recv()等接收数据的函数。

ZMQ_SUB特性摘要
兼容的对等套接字 ZMQ_PUB、ZMQ_XPUB
方向 单向
发送/接收模式 仅接收
入网路由策略 公平排队
外发路由策略 不适用(N/A)

3.4.2 “XPUB-XSUB”套接字类型

  • “XPUB-XSUB”套接字类型与“PUB-SUB”套接字类型相同,也是属于发布-订阅
  • 在“PUB-SUB”中,订阅者通过zmq_connect()向发布者发起订阅;但是“XPUB-XSUB”套接字类型允许订阅者通过发送一条订阅信息到发布者来完成订阅
3.4.2.1 ZMQ_XPUB

用法与ZMQ_PUB大部分相同。

但是有一点与ZMQ_PUB不同:ZMQ_XPUB(自己)的订阅方可以向自己发送一个订阅信息来进行订阅。订阅消息是字节1(用于订阅)或字节0(用于取消订阅),后跟订阅主体。也接收不带子/取消订阅前缀的消息,但对订阅状态没有影响。

ZMQ_XPUB特性摘要
兼容的对等套接字 ZMQ_SUB、ZMQ_XSUB
方向 单向
发送/接收模式 发送消息,接收订阅
入网路由策略 不适用(N/A)
外发路由策略 扇出(Fan out)
静音状态下的操作 丢弃
3.4.2.2 ZMQ_XSUB
  • 用法与ZMQ_SUB大部分相同。
  • 但是有一点与ZMQ_SUB不同:自己可以向发布者发送一条订阅信息来进行订阅。订阅消息是字节1(用于订阅)或字节0(用于取消订阅),后跟订阅主体。也接收不带子/取消订阅前缀的消息,但对订阅状态没有影响。
ZMQ_XSUB特性摘要
兼容的对等套接字 ZMQ_PUB、ZMQ_XPUB
方向 单向
发送/接收模式 接收消息,发送订阅
入网路由策略 公平排队
外发路由策略 不适用(N/A)
静音状态下的操作 丢弃

3.4.3 代码演示

3.4.3.1 PUB-SUB

演示示例:

发布者:类似于一个天气更新服务器,向订阅者发送天气更新,内容包括邮政编码、温度、湿度等信息。

订阅者:它监听发布者更新的数据流,过滤只接收与特定邮政编码相关的天气信息,默认接收接收10条数据。

  • 发布者:

    import zmq
    import random
    import time
    import json
    
    context = zmq.Context()
    publisher = context.socket(zmq.PUB)
    publisher.bind("tcp://localhost:5555")
    
    print("发布者已启动,正在发送天气更新...")
    # 模拟天气更新的数据
    postal_codes = ['10001', '20002', '30003', '40004', '50005']  # 邮政编码列表
    while True:
        # 随机选择一个邮政编码
        postal_code = random.choice(postal_codes)
    
        # 随机生成温度和湿度
        temperature = random.uniform(-10, 40)
        humidity = random.uniform(30, 90)
    
        # 构建消息
        message = {
            "postal_code": postal_code,
            "temperature": temperature,
            "humidity": humidity,
        }
        message_str = json.dumps(message)
    
        # 发送消息
        print(f"pub: {message_str}")
        publisher.send_string(f"{postal_code} {message_str}")
        time.sleep(1)
    
  • 订阅者-1:

    import zmq
    import json
    
    context = zmq.Context()
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5555")
    
    # 设置订阅过滤器,默认订阅所有消息
    postal_code_filter = '10001'  # 订阅特定邮政编码的天气信
    # 只订阅以指定邮政编码开头的消息
    subscriber.setsockopt_string(zmq.SUBSCRIBE, postal_code_filter)
    
    print(f"订阅者已启动,正在接收邮政编码为 {postal_code_filter} 的天气更新...")
    # 接收前 10 条数据
    count = 0
    while count < 10:
        # 接收消息
        message_str = subscriber.recv_string()
        print(f"sub-10001: {message_str}")
    
        # 解析json字符串
        message = json.loads(message_str.split(' ', 1)[1])
    
        # 打印天气信息
        print(f"接收到天气更新: 邮政编码: {message['postal_code']},"
              f"温度: {message['temperature']}°C, 湿度: {message['humidity']}%")
    
        count += 1
    
    subscriber.close()
    context.term()
    
  • 订阅者-2:

    import zmq
    import json
    
    context = zmq.Context()
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5555")
    
    # 设置订阅过滤器,默认订阅所有消息
    postal_code_filter = '20002'  # 订阅特定邮政编码的天气信
    # 只订阅以指定邮政编码开头的消息
    subscriber.setsockopt_string(zmq.SUBSCRIBE, postal_code_filter)
    
    print(f"订阅者-2已启动,正在接收邮政编码为 {postal_code_filter} 的天气更新...")
    # 接收前 10 条数据
    count = 0
    while count < 10:
        # 接收消息
        message_str = subscriber.recv_string()
        print(f"sub-10002: {message_str}")
    
        # 解析json字符串
        message = json.loads(message_str.split(' ', 1)[1])
    
        # 打印天气信息
        print(f"接收到天气更新: 邮政编码: {message['postal_code']},"
              f"温度: {message['temperature']}°C, 湿度: {message['humidity']}%")
    
        count += 1
    
    subscriber.close()
    context.term()
    

结果展示:

# puber
pub: {"postal_code": "20002", "temperature": 13.25265265413962, "humidity": 86.0784125194719}
pub: {"postal_code": "40004", "temperature": 31.73599959814129, "humidity": 52.561033432350925}
pub: {"postal_code": "50005", "temperature": 14.90289689583275, "humidity": 74.72166996686823}
pub: {"postal_code": "20002", "temperature": 9.867402200794743, "humidity": 32.0353602888176}
pub: {"postal_code": "50005", "temperature": 24.332321511064087, "humidity": 74.40272310252055}
pub: {"postal_code": "30003", "temperature": 3.039558928478584, "humidity": 72.99202727412626}
pub: {"postal_code": "10001", "temperature": 7.746891871815958, "humidity": 78.89402815964135}
pub: {"postal_code": "40004", "temperature": 32.4869161115785, "humidity": 55.37044485578605}
pub: {"postal_code": "50005", "temperature": 24.873966694343203, "humidity": 60.45065293887665}
pub: {"postal_code": "40004", "temperature": 29.947344215184877, "humidity": 42.01658424992969}

# sub-1
订阅者-1已启动,正在接收邮政编码为 10001 的天气更新...
sub-10001: 10001 {"postal_code": "10001", "temperature": 7.746891871815958, "humidity": 78.89402815964135}
接收到天气更新: 邮政编码: 10001,温度: 7.746891871815958°C, 湿度: 78.89402815964135%

# sub-2
订阅者-2已启动,正在接收邮政编码为 20002 的天气更新...
sub-10002: 20002 {"postal_code": "20002", "temperature": 13.25265265413962, "humidity": 86.0784125194719}
接收到天气更新: 邮政编码: 20002,温度: 13.25265265413962°C, 湿度: 86.0784125194719%
sub-10002: 20002 {"postal_code": "20002", "temperature": 9.867402200794743, "humidity": 32.0353602888176}
接收到天气更新: 邮政编码: 20002,温度: 9.867402200794743°C, 湿度: 32.0353602888176%
3.4.3.2 XPUB-XSUB

演示示例:

发布者1:类似于一个天气更新服务器,向代理节点发送天气更新,内容包括邮政编码、温度、湿度等信息。

发布者2:累死一个 用户操作记录服务器,向代理节点发送当前用户的操作记录。

代理节点:代理节点接受连接的订阅者(XPUB)和发布者(XSUB)。它将订阅者的请求转发到发布者,并将消息从发布者转发到订阅者。

订阅者1:它监听代理节点更新的数据流,过滤只接收与特定邮政编码相关的天气信息,默认接收接收10条数据。

订阅者2:它监听代理节点更新的数据流,过滤只接收与用户操作记录相关的信息,默认接收接收10条数据。

  • 发布者1:

    import zmq
    import random
    import time
    import json
    
    context = zmq.Context()
    publisher = context.socket(zmq.PUB)
    publisher.connect("tcp://localhost:5555")
    
    print("发布者-1已启动,正在发送天气更新...")
    # 模拟天气更新的数据
    postal_codes = ['10001', '20002', '30003', '40004', '50005']  # 邮政编码列表
    while True:
        # 随机选择一个邮政编码
        postal_code = random.choice(postal_codes)
    
        # 随机生成温度和湿度
        temperature = random.uniform(-10, 40)
        humidity = random.uniform(30, 90)
    
        # 构建消息
        message = {
            "postal_code": postal_code,
            "temperature": temperature,
            "humidity": humidity,
        }
        message_str = json.dumps(message)
    
        # 发送消息
        print(f"pub-1: {message_str}")
        publisher.send_string(f"{postal_code} {message_str}")
        time.sleep(1)
    
  • 发布者2:

    import zmq
    import random
    import time
    import json
    
    context = zmq.Context()
    publisher = context.socket(zmq.PUB)
    publisher.connect("tcp://localhost:5555")
    
    print("发布者-2已启动,正在发送天气更新...")
    # 模拟天气更新的数据
    userIds = ['001', '002']  # 邮政编码列表
    while True:
        # 随机选择一个邮政编码ss
        userId = random.choice(userIds)
    
        # 当前用户登录和登录时间
        action = "login"
        timestamp = time.time()
    
        # 构建消息
        message = {
            "userId": userId,
            "action": action,
            "timestamp": timestamp,
        }
        message_str = json.dumps(message)
    
        # 发送消息
        print(f"pub-2: {message_str}")
        publisher.send_string(f"{userId} {message_str}")
        time.sleep(1)
    
  • 订阅者1:

    import zmq
    import json
    
    context = zmq.Context()
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5556")
    
    # 设置订阅过滤器,默认订阅所有消息
    postal_code_filter = '10001'  # 订阅特定邮政编码的天气信
    # 只订阅以指定邮政编码开头的消息
    subscriber.setsockopt_string(zmq.SUBSCRIBE, postal_code_filter)
    
    print(f"订阅者-1已启动,正在接收邮政编码为 {postal_code_filter} 的天气更新...")
    # 接收前 10 条数据
    count = 0
    while count < 10:
        # 接收消息
        message_str = subscriber.recv_string()
        print(f"sub-1-10001: {message_str}")
    
        # 解析json字符串
        message = json.loads(message_str.split(' ', 1)[1])
    
        # 打印天气信息
        print(f"接收到天气更新: 邮政编码: {message['postal_code']},"
              f"温度: {message['temperature']}°C, 湿度: {message['humidity']}%")
    
        count += 1
    
    subscriber.close()
    context.term()
    
  • 订阅者2:

    import zmq
    import json
    
    context = zmq.Context()
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5556")
    
    # 设置订阅过滤器,默认订阅所有消息
    userId = '001'
    # 只订阅以指定用户Id开头的消息
    subscriber.setsockopt_string(zmq.SUBSCRIBE, userId)
    
    print(f"订阅者-2已启动,正在接收用户id为 {userId} 的操作行为更新...")
    # 接收前 10 条数据
    count = 0
    while count < 10:
        # 接收消息
        message_str = subscriber.recv_string()
        print(f"sub-2-001: {message_str}")
    
        # 解析json字符串
        message = json.loads(message_str.split(' ', 1)[1])
    
        # 打印天气信息
        print(f"接收到用户行为更新: 用户ID: {message['userId']},"
              f"操作: {message['action']}, 时间: {message['timestamp']}%")
    
        count += 1
    
    subscriber.close()
    context.term()
    
  • 代理节点:

    import zmq
    import json
    
    context = zmq.Context()
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5556")
    
    # 设置订阅过滤器,默认订阅所有消息
    userId = '001'
    # 只订阅以指定用户Id开头的消息
    subscriber.setsockopt_string(zmq.SUBSCRIBE, userId)
    
    print(f"订阅者-2已启动,正在接收用户id为 {userId} 的操作行为更新...")
    # 接收前 10 条数据
    count = 0
    while count < 10:
        # 接收消息
        message_str = subscriber.recv_string()
        print(f"sub-2-001: {message_str}")
    
        # 解析json字符串
        message = json.loads(message_str.split(' ', 1)[1])
    
        # 打印天气信息
        print(f"接收到用户行为更新: 用户ID: {message['userId']},"
              f"操作: {message['action']}, 时间: {message['timestamp']}%")
    
        count += 1
    
    subscriber.close()
    context.term()
    

结果展示:

# proxt
proxt-xsub-msg: 001 {"userId": "001", "action": "login", "timestamp": 1734625019.16635}
proxt-xpub-msg:  001
proxt-xsub-msg: 10001 {"postal_code": "10001", "temperature": 7.875253853621338, "humidity": 37.31015663936709}
proxt-xsub-msg: 10001 {"postal_code": "10001", "temperature": 14.432860810309766, "humidity": 60.22298999335305}
proxt-xsub-msg: 10001 {"postal_code": "10001", "temperature": 15.503168013798806, "humidity": 59.75713604714118}
proxt-xsub-msg: 10001 {"postal_code": "10001", "temperature": -7.395234809523146, "humidity": 65.5193954895997}
proxt-xsub-msg: 10001 {"postal_code": "10001", "temperature": -9.75615742629257, "humidity": 74.21744572318175}
proxt-xsub-msg: 10001 {"postal_code": "10001", "temperature": 5.268318532469344, "humidity": 45.62217272071367}
proxt-xpub-msg:  10001

# puber1

pub-1: {"postal_code": "10001", "temperature": 10.813840832122981, "humidity": 85.29650973319102}
pub-1: {"postal_code": "50005", "temperature": 38.40227985356804, "humidity": 50.82447611291441}
pub-1: {"postal_code": "50005", "temperature": 28.751940848644928, "humidity": 68.55122723607448}
pub-1: {"postal_code": "40004", "temperature": 28.696542004302806, "humidity": 58.85859477448625}
pub-1: {"postal_code": "20002", "temperature": -6.89923052809386, "humidity": 61.09597083888137}
pub-1: {"postal_code": "20002", "temperature": 13.419115255532269, "humidity": 52.01525249018736}
pub-1: {"postal_code": "50005", "temperature": 29.04016206865918, "humidity": 58.77557836353199}
pub-1: {"postal_code": "30003", "temperature": 18.47547641106557, "humidity": 41.33830374452378}
pub-1: {"postal_code": "50005", "temperature": 27.766425247968726, "humidity": 77.09619320847364}
pub-1: {"postal_code": "50005", "temperature": 37.632560879484906, "humidity": 78.04897441464384}
pub-1: {"postal_code": "40004", "temperature": 0.9880948271698973, "humidity": 34.36147305497506}
pub-1: {"postal_code": "30003", "temperature": 39.722978064052114, "humidity": 40.02421184794968}
pub-1: {"postal_code": "40004", "temperature": -3.6255678397909357, "humidity": 40.45473079718421}

# puber2
pub-2: {"userId": "002", "action": "login", "timestamp": 1734625077.2781255}
pub-2: {"userId": "001", "action": "login", "timestamp": 1734625078.2800937}
pub-2: {"userId": "001", "action": "login", "timestamp": 1734625079.2821612}
pub-2: {"userId": "001", "action": "login", "timestamp": 1734625080.2840903}
pub-2: {"userId": "002", "action": "login", "timestamp": 1734625081.286704}
pub-2: {"userId": "001", "action": "login", "timestamp": 1734625082.2886486}
pub-2: {"userId": "002", "action": "login", "timestamp": 1734625083.2907314}
pub-2: {"userId": "001", "action": "login", "timestamp": 1734625084.2925353}
pub-2: {"userId": "002", "action": "login", "timestamp": 1734625085.2943888}
pub-2: {"userId": "002", "action": "login", "timestamp": 1734625086.296313}
pub-2: {"userId": "001", "action": "login", "timestamp": 1734625087.298361}
pub-2: {"userId": "002", "action": "login", "timestamp": 1734625088.3002558}

# suber-1
sub-1-10001: 10001 {"postal_code": "10001", "temperature": -7.395234809523146, "humidity": 65.5193954895997}
接收到天气更新: 邮政编码: 10001,温度: -7.395234809523146°C, 湿度: 65.5193954895997%
sub-1-10001: 10001 {"postal_code": "10001", "temperature": -9.75615742629257, "humidity": 74.21744572318175}
接收到天气更新: 邮政编码: 10001,温度: -9.75615742629257°C, 湿度: 74.21744572318175%
sub-1-10001: 10001 {"postal_code": "10001", "temperature": 5.268318532469344, "humidity": 45.62217272071367}
接收到天气更新: 邮政编码: 10001,温度: 5.268318532469344°C, 湿度: 45.62217272071367%

# suber-2
sub-2-001: 001 {"userId": "001", "action": "login", "timestamp": 1734625012.1531057}
接收到用户行为更新: 用户ID: 001,操作: login, 时间: 1734625012.1531057%
sub-2-001: 001 {"userId": "001", "action": "login", "timestamp": 1734625014.1569405}
接收到用户行为更新: 用户ID: 001,操作: login, 时间: 1734625014.1569405%
sub-2-001: 001 {"userId": "001", "action": "login", "timestamp": 1734625015.1587276}
接收到用户行为更新: 用户ID: 001,操作: login, 时间: 1734625015.1587276%

3.4.4 模式总结

关于“订阅”的说明(对于SUB、SUBX端来说)

  • 当你使用一个SUB、XSUB套接字时(订阅方)必须使用zmq_setsockopt()和SUBSCRIBE设置一个订阅(例如上面的客户端的代码所示)。如果你创建了一个SUB套接字,但是没有设置任何订阅,那么就不会得到任何消息。
  • 订阅者可以设置许多的订阅,它们被累加在一起。也就是说,如果某个更新匹配任何订阅,那么订阅者都会接收到它。
  • 订阅者也可以取消特定的订阅。
  • 订阅经常但不一定是一个可打印的字符串。
  • 要了解这是如何工作的,请参见zmq_setsockopt()和下面的演示案例。

慢木匠”症状

  • 如果发布者开启之后再启动订阅者,那么在订阅者启动的这段时间内,发布者发送的消息订阅者就接收不到了。

  • “满木匠”症状:即使先启动订阅者,再启动发布者,订阅者也可能错过发布者发送的消息:因此当订阅者连接到发布者时(这需要的时间很短,但非0),发布者可能已经将消息发送出去了。

  • “满木匠”症状案例:

    • ZeroMQ在后台执行异步I/O。假设你有两个节点执行次操作,顺序如下:
      • 订阅者接连到一个端点,并接收和处理消息。
      • 发布者绑定到一个端点,并立即发送10000条消息。
    • 订阅者很可能不会收到任何东西(在设置了正确的过滤器的情况下)。
    • 建立TCP连接包含会花几毫秒的握手,这取决于你的网络和节点间的跳数。在这段时间里,ZeroMQ可以发送很多消息。
  • 解决方法:

    • 之后介绍如何来同步发布者和订阅者,这样就不会启动数据发布,直到订阅者真正连接并准备就绪。
    • 有一个简单(愚蠢)的方式来延迟发布,就是休眠。但是,在实际中不建议这么做。
    • 同步的替代办法是简单地假定发布的数据流是无限的,它没有起点也没有终点。人们还假设订阅者不关心它启动前发生了什么事情。

关于发布-订阅模式的几个要点:

  • 订阅者不可以使用zmq_msg_send()等发送消息的函数,发布者不可以使用zmq_msg_recv()等接收消息的函数。
  • 一个订阅者可以连接到多个发布者,每次使用一个connect调用。那么数据将交错到达(“公平排队”)。
  • 如果一个发布者没有连接的订阅者,那么它简单地丢弃所有消息。
  • 如果你使用的是TCP并且订阅者是慢速的,那么消息将在发布方排队。我们将在后面的文章中介绍如何通过使用“高水位线”来针对这种情况保护发布者。
  • 从ZeroMQ v3.x开始,在使用连接的协议(tcp或ipc)时,过滤发生在发布方。使用epgm协议,过滤发生在订阅方。但在ZeroMQ v2.x版本中,所有过滤都发生在订阅方。

3.5 独家对模式

独家对模式(Exclusive pair)用于将一个对等点精确地连接到另一个对等点。此模式用于跨inproc传输的线程间通信。

互斥对模式由http://rfc.zeromq.org/spec:31正式定义。

“独家对模式”支持的套接字类型只有1种:

  • ZMQ_PAIR

3.5.1 “PAIR”套接字类型

3.5.1.1 ZMQ_PAIR

ZMQ_PAIR类型的套接字只能一次连接到单个对等方。对通过ZMQ_PAIR套接字发送的消息不执行消息路由或筛选。

当ZMQ_PAIR套接字由于已达到连接对等方的高水位线而进入静音状态时,或者如果没有连接任何对等方,则套接字上的任何zmq_send()操作都应阻塞,直到对等方可用于发送;消息不会被丢弃。

适用协议:

  • ZMQ_PAIR套接字设计用于通过nproc传输进行线程间通信,并且不实现自动重新连接等功能。
  • 尽管ZMQ_PAIR套接字可用于inproc以外的其他传输协议,但是它们无法自动重新连接,并且当以前存在任何连接(包括关闭状态的连接)时,新的传入连接将被终止,这使得它们在大多数情况下不适合TCP。
ZMQ_PAIR特性摘要
兼容的对等套接字 ZMQ_PAIR
方向 双向
发送/接收模式 无限制
入网路由策略 不适用(N/A)
外发路由策略 不适用(N/A)
静音状态下的操作 阻塞

3.3.2 代码演示

演示示例:

PAIR3:主线程中的PAIR套接字,等待PAIR2发来通知消息

PAIR2:在主线程中调用pthread_create()创建线程,在线程的回调函数中创建PAIR2套接字,该套接字等待PAIR1发来通知消息

PAIR1:PAIR2所在的线程再调用一次pthread_create(),在线程的回调函数中创建PAIR1套接字,PAIR1会向PAIR2发送消息

整体的流程就是:PAIR1发送消息给PAIR2,PAIR2接收到PAIR1的消息之后再发送消息给PAIR3,PAIR3接收到PAIR2的消息之后退出程序

案例分析
这是使用ØMQ进行多线程编程的一个经典模式:

  1. 两个线程通过inproc通信,使用的是共享的上下文。
  2. 父线程创建一个套接字,将其绑定到一个inproc端点,然后启动子线程,将上下文传递给它。
  3. 子线程创建第二个套接字,将它连接到该inproc端点,然后发信号告诉父线程,它已准备就绪。

使用这种模式的多线程代码是不可扩展到进程的。如果你使用inproc和套接字对,你就正在构建一个紧耦合的应用程序,也就是说,其中你的线程在结构是相互依存的,只有在低延迟真的很重要的时候才这样做。另一种设计模式是一个松耦合的应用程序,其中的线程有自己的上下文并通过ipc或tcp通信。你可以轻松地将松耦合的线程分解为单独的进程。

代码示例:

import zmq
import time
import threading


def step3():
    context = zmq.Context()
    socket3 = context.socket(zmq.PAIR)
    socket3.bind("tcp://*:5553")
    print("PAIR3: Waiting for message from PAIR2...")

    # 等待 PAIR2 发来的消息
    message = socket3.recv_string()
    print(f"PAIR3 received message: {message}")


def step2():
    context = zmq.Context()

    # 创建两个套接字,一个用于接收,另一个用于发送
    socket2_recv = context.socket(zmq.PAIR)
    socket2_send = context.socket(zmq.PAIR)

    socket2_recv.bind("tcp://*:5552")  # 用于接收消息
    socket2_send.connect("tcp://localhost:5553")  # 用于发送消息

    print("PAIR2: Waiting for message from PAIR1...")

    # 等待 PAIR1 发来的消息
    message = socket2_recv.recv_string()
    print(f"PAIR2 received message: {message}")

    # 向 PAIR3 发送消息
    print("PAIR2: Sending message to PAIR3...")
    socket2_send.send_string("Hello from PAIR2!")


def step1():
    context = zmq.Context()
    socket1 = context.socket(zmq.PAIR)
    socket1.bind("tcp://*:5551")
    print("PAIR1: Sending message to PAIR2...")

    # 向 PAIR2 发送消息
    socket1.connect("tcp://localhost:5552")
    socket1.send_string("Hello from PAIR1!")


if __name__ == '__main__':
    # 创建并启动线程 2 (PAIR1)
    thread2 = threading.Thread(target=step1)
    thread2.start()

    # 创建并启动线程 1 (PAIR2)
    thread1 = threading.Thread(target=step2)
    thread1.start()

    # 主线程中的 PAIR3
    step3()

    # 等待线程 1 和线程 2 完成
    thread1.join()
    thread2.join()

结果展示:

PAIR1: Sending message to PAIR2...
PAIR2: Waiting for message from PAIR1...
PAIR3: Waiting for message from PAIR2...
PAIR2 received message: Hello from PAIR1!
PAIR2: Sending message to PAIR3...
PAIR3 received message: Hello from PAIR2!

3.3.3 模式总结

PAIR套接字应用场景:协调线程

在编写多线程应用程序时,会遇到如何“协调线程”的问题,例如一个线程状态发生改变时同时另一个线程:

  • 如果使用以往的多线程程序,你可能会使用信号量或互斥等技术。
  • 但是在ØMQ中,你可以使用ZMQ_PAIR套接字来进行线程间的通信。

为什么选择的是PAIR?

  • 此处使用的是PAIR套接字,其他套接字组合也能够完成上面相同的工作,但是其他台套接字都有副作用,可能会干扰信令:

    • 你可以让发送者使用PUSH并让接收者使用PULL,但是PUSH会把消息发送给所有存在的接收者,假设你启动了2个接收者,那么就会“丢失”一半的信号。PAIR具有拒绝多个连接,两个连接的组成的对是独占的。

    • 你可以让发送者使用DEALER并让接收者使用ROUTER,但是ROUTER将你的信息包装在一个“封包”中,这意味着你的大小为0的信号变成了一个多部分消息。如果你不关心数据并把任何东西都当做一个有效的信号,并且如果你不会不止一次地从套接字上读取,这并不重要。但是,如果你决定要发送实际数据时,你会突然发现ROUTER为你提供了“错误”的消息。DEALER也分发传出消息,这带来与PUSH相同的风险。

    • 你可以让发送者使用PUB,而让接收者使用SUB,这将完全按照你发送它们的原样正确地传递你的消息,而且PUB不像PUSH或DEALER那样分发消息。但是,你需要用空订阅配置订阅者,这比较麻烦,更糟的是,PUB-SUB链接的可靠性是与实践相关的,并且如果在PUB套接字发送消息时,SUB套接字正在连接,信息就有可能会丢失。

综合以上的原因,使得PAIR成为线程对之间协调的最佳选择。

4. 使用总结

4.1 bind和connect的区别

connect:

  • 用于客户端套接字,主动连接到一个已经开放并监听端口的服务器套接字。
  • 客户端可以使用 connect 方法连接多个远程服务(例如连接多个服务实例),但只会连接到服务器的某个端口。
  • 连接后,客户端就能发送或接收消息。

bind:

  • 用于服务端套接字,绑定到本地的一个端口,等待客户端连接。
  • 服务器端使用 bind 来打开一个端口,允许多个客户端通过这个端口与其通信。
  • 每个 bind 调用会使套接字监听一个端口,可以让多个客户端连接到该端口。

具体的应用场景

  • connect: 用于客户端或主动请求的一方。一个套接字可以连接到多个远程地址。例如,在一个 PUSH 套接字的情况下,你可以通过多次调用 connect 来将任务发送到多个 PULL 套接字,这样可以实现负载均衡。
  • bind: 用于服务端或被动接收请求的一方。通常每个 bind 调用会启动一个监听端口。例如,一个 PULL 套接字在服务端可能会调用 bind 来绑定一个本地端口,等待 PUSH 套接字的任务。

4.2 接收和发送字符串的区别

接收和发送字符串的话可以采用msg = b'str'的方式转换为字节串,就可以使用sendrecv函数来接收发送。

如果要发送字符串,需要使用send_stringrecv_string函数。

如果对字符串进行了编码操作,那么需要在接收字符串之后也进行解码操作。

4.3 订阅者和发布者的绑定

订阅者和发布者的绑定依赖于消息前缀。

对于发布者,如果要对不同的订阅者发布不同的消息,需要在消息前缀中与订阅者指定的过滤器相同。

对于订阅者,如要只订阅自己感兴趣的消息,需要使用setsockopt_string来过滤掉自己需要的前缀消息。

# pub
publisher.send_string(f"{postal_code} {message_str}")

# sub
subscriber.setsockopt_string(zmq.SUBSCRIBE, postal_code_filter)

4.4 同一个socket接收和发送消息出现问题

当同一个socket既接收消息有往另外一个端口发送消息,发现发送消息出现问题。
示例代码:

def step2():
    context = zmq.Context()
    socket2 = context.socket(zmq.PAIR)
    socket2.bind("tcp://*:5552")
    print("PAIR2: Waiting for message from PAIR1...")

    # 等待 PAIR1 发来的消息
    message = socket2.recv_string()
    print(f"PAIR2 received message: {message}")

    # 向 PAIR3 发送消息
    socket2.connect("tcp://localhost:5553")
    print("PAIR2: Sending message to PAIR3...")
    socket2.send_string("Hello from PAIR2!")

根本原因在于你使用了一个套接字 (socket2) 既接收消息又发送消息,这样会导致 PAIR2 发送消息给 PAIR3 时出现问题。zmq.PAIR 套接字是全双工的,但当一个套接字同时用于接收和发送消息时,你需要确保消息的发送和接收是通过不同的操作来进行的,而在你的代码中,socket2 在接收消息时也被用来发送消息,这会导致程序的行为不如预期。

解决方案:

你需要为接收和发送分别创建不同的套接字,这样可以避免套接字同时进行接收和发送操作。

def step2():
    context = zmq.Context()
    
    # 创建两个套接字,一个用于接收,另一个用于发送
    socket2_recv = context.socket(zmq.PAIR)
    socket2_send = context.socket(zmq.PAIR)
    
    socket2_recv.bind("tcp://*:5552")  # 用于接收消息
    socket2_send.connect("tcp://localhost:5553")  # 用于发送消息
    
    print("PAIR2: Waiting for message from PAIR1...")

    # 等待 PAIR1 发来的消息
    message = socket2_recv.recv_string()
    print(f"PAIR2 received message: {message}")

    # 向 PAIR3 发送消息
    print("PAIR2: Sending message to PAIR3...")
    socket2_send.send_string("Hello from PAIR2!")

4.4 send不同方式的区别

在 ZeroMQ 中,Python 环境下的 send 方法有多种不同的用法,主要区别在于发送消息时的一些参数设置。这些参数控制消息的发送方式,包括消息的格式、是否阻塞、是否需要分割等。以下是常见的几种 send 方法和它们的区别:

1. send_string()

send_string()zmq 的专用方法,用于发送字符串消息。这会自动将 Python 字符串编码为字节流。

send_string("Hello, ZeroMQ!")

特点:

  • 适用于发送简单的字符串消息。
  • 自动将字符串转换为字节流(默认编码为 UTF-8)。
  • 无需显式地进行字节编码。

2. send_bytes()

send_bytes() 用于发送原始字节数据。如果你有字节类型的数据,直接使用 send_bytes() 会更加高效。

send_bytes(b"Hello, ZeroMQ!")

特点:

  • 适用于发送原始的字节数据。
  • 如果你已经有字节数据(例如文件内容、图片等),使用 send_bytes() 可以避免额外的编码操作。
  • 需要手动确保数据是字节类型(bytes 类型)。

3. send()

send() 是 ZeroMQ 中的通用发送方法,支持发送任意类型的数据(如字节、字符串、对象等)。如果你希望发送非字符串或字节数据,可以使用这个方法。

send(b"Hello, ZeroMQ!")

或者,你也可以指定 flags 参数来控制消息的行为(例如 zmq.DONTWAIT)。

特点:

  • 支持发送不同类型的数据(如字节数据、字符串等)。
  • 需要确保传入的是字节数据(bytes 类型),否则可能会引发错误。
  • 可以通过 flags 参数指定一些特定的行为,例如是否阻塞,是否是最后一部分消息(在消息分割时很有用)。

4. send_multipart()

send_multipart() 用于发送多个消息部分。在 ZeroMQ 中,消息是按部分(part)进行传输的,send_multipart() 允许你分多部分发送一个消息。例如,可以先发送一个头部,然后发送正文。

send_multipart([b"Header", b"Body", b"Footer"])

特点:

  • 适用于发送由多个部分组成的消息。
  • 每个部分可以是字节数据或字符串。
  • 消息的接收端需要用 recv_multipart() 来接收这些部分。

5. send_json()

send_json() 是 ZeroMQ 提供的一个方便的功能,专门用于发送 JSON 数据。它会将 Python 的字典(或其他支持的对象)自动编码为 JSON 格式,并将其发送。

send_json({"key": "value"})

特点:

  • 专门用于发送 JSON 数据,Python 对象会被自动转换为 JSON 格式。
  • 使用此方法时无需手动编码 JSON 数据,方便快捷。

6. send() with flags

send() 方法还支持额外的 flags 参数,用于控制消息的发送行为。这些标志通常用于细粒度的消息控制,例如非阻塞发送、分段发送等。

send(b"Hello, ZeroMQ!", zmq.DONTWAIT)

常见的 flags

  • zmq.DONTWAIT:非阻塞发送,如果消息不能立即发送,会抛出异常。
  • zmq.SNDMORE:在分段发送消息时,标志“更多部分将在后面发送”。
  • zmq.NOBLOCK:类似于 DONTWAIT,用于非阻塞发送。

这些标志通常与分布式应用场景中的消息分割和流控制有关。

总结:

  • send_string():专门发送字符串,自动处理编码。
  • send_bytes():用于发送原始字节数据。
  • send():通用的发送方法,适用于发送字节流数据。
  • send_multipart():用于发送由多个部分组成的消息。
  • send_json():用于发送 JSON 格式的数据,自动处理 Python 对象到 JSON 的转换。
  • flags 参数:控制消息发送的行为,例如非阻塞发送、分段发送等。

选择哪种发送方式取决于你要发送的数据类型和具体的需求,例如是否需要分割消息,是否需要 JSON 格式,或者是否要控制消息发送的行为。

标签:zmq,socket,python,message,发送,使用,ZeroMQ,ZMQ,接字
From: https://www.cnblogs.com/zreo2home/p/18628575

相关文章

  • python毕设 闲鱼交易平台程序+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于闲鱼交易平台的研究,现有研究主要以其商业模式和用户行为为主,专门针对用Python构建闲鱼交易平台的研究较少。随着互联网的迅速发展......
  • C#中使用gRPC(二)
    这一节,我们实际操作和体验一下再C#中使用gPrc我用的开发环境是VS2022和.Net6的版本。创建好项目以后,ASP.NETCore会自动帮我们创建一个greet.proto的文件和一个服务调整一下greet.proto文件,假设我们的sayHello方法传入一个对象,有年龄,姓名,住址和电话这四个字段,其中电话是一个字......
  • C#中使用gRPC(一)
    在使用gRpc之前,需要先了解一点基础知识。1.ProtocolBuffers具体参考https://protobuf.dev/:简答来说就是Google开发的一种数据描述的格式。和具体的开发语言无关,根据这个东西,你能定义你方法的参数,返回值类型。首先创建一个xxx.proto文件.例如sayHello.protosyntax=......
  • 适合中小数量用户使用,苹果手机免越狱投屏中控尽享高清、流畅的体验!
    在如今的生活中,投屏功能已经逐渐普及,无论是在会议中还是家庭聚会中,投屏都能为我们带来更便捷的展示和分享体验。而当我们想要将手机中的视频、照片等内容投屏到大屏幕时,是否又为繁琐的操作步骤而感到烦恼呢?今天,我将为大家介绍一款能够轻松实现苹果手机免越狱投屏中控的功能,让你......
  • 如何使用WGAN-GP生成一维滚动轴承振动数据样本。以西储大学(CWRU)数据集为例,提供一个基
    使用WGAN-GP生成一维滚动轴承振动数据样本。以西储大学(CWRU)数据集为例,提供一个基于训练好的权重参数文件进行测试的代码。WGAN-GP-1D轴承振动数据样本生成方法,西储大学数据集为例,可替换自己的数据。代码注释清楚,包含训练过程的代码train_gan和基于训练好的权重参数文件......
  • 如何使用 基于连续小波变换时频图的CNN轴承故障诊断模型 Python、jupyter notebook,实
    基于连续小波变换时频图的CNN轴承故障诊断模型Python、jupyternotebook使用基于连续小波变换(ContinuousWaveletTransform,CWT)生成的时频图来构建一个卷积神经网络(CNN)模型进行滚动轴承故障诊断。以下是详细的步骤和代码示例。步骤概述数据集准备特征提取(CWT时频图)......
  • 禅道安装与使用全解析:开启高效项目管理之旅
    禅道是一款国产的开源项目管理软件,它融合了项目管理、任务管理、缺陷管理、测试管理等多种功能,广泛应用于互联网软件研发等众多领域。一、功能特点项目管理禅道可以对项目进行全生命周期的管理。从项目的创建、规划,到执行、监控和收尾,每个阶段都有相应的功能支持。例如,在项目......
  • Bugzilla 安装及使用指南:助力软件缺陷精准管控
    Bugzilla是一个开源的缺陷跟踪系统,在软件开发和测试过程中被广泛使用。一、主要功能缺陷提交开发团队成员、测试人员或者用户可以通过Bugzilla提交软件中发现的问题。提交时需要详细描述缺陷的症状,比如软件在执行某个特定操作时出现的错误提示信息、软件崩溃的具体场景等。例......
  • Jira 安装与使用全攻略:开启高效项目管理之旅
    一、基本介绍所属公司及背景:Jira是由澳大利亚的Atlassian公司开发的。Atlassian是一家知名的软件公司,专注于为团队提供协作和生产力工具。Jira最初是为软件开发团队设计的,用于管理软件项目中的问题(如缺陷、任务、新功能请求等),随着时间的推移,它的应用范围不断扩展,被许多不同行业......
  • 使用Python实现问答机器人,掌握OpenAI接口使用
    streamapifromopenaiimportOpenAIclient=OpenAI(#Thisisthedefaultandcanbeomittedapi_key="sk-T1SC0pSurmOOhsdGu3P9WnHv5pDEhaz6GeMyENMfnsuKOQs7",base_url="https://api.openai-proxy.com/v1")re_stream=client.......