为了更好的阅读体验,请点击这里
本文大部分内容翻译自 Chapter 1 - Basics,原因是之前翻译的版本太老了,不得不亲自披挂上阵拿机器翻译一下。只截取了部分自己可能用得到的,所以如果有看不太懂的地方,去翻一下原网页吧。QWQ
附赠 libzmq 的 api 接口函数说明 一份。
一、基础函数
-
int zmq_recv (void *socket, void *buf, size_t len, int flags);
zmq_recv()
函数应从 socket 参数引用的套接字接收消息,并将其存储在 buf 参数引用的缓冲区中。任何超过 len 参数指定长度的字节都将被截断。如果指定套接字上没有可用消息,则 zmq_recv() 函数将阻塞,直到请求得到满足。 flags 参数是下面定义的标志的组合: 如果 len 为零,则 buf 参数可以为 null。 -
int zmq_send (void *socket, void *buf, size_t len, int flags);
zmq_send()
函数应排队从 buf 和 len 参数引用的缓冲区创建的消息。 -
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);
zmq_setsockopt()
函数应将 option_name 参数指定的选项设置为 option_value 参数指向的值,用于 socket 参数指向的 ØMQ 套接字。option_len 参数是以字节为单位的选项值的大小。对于采用“字符串”类型值的选项,提供的字节数据应该不包含零字节,或者以单个零字节结尾(终止 ASCII NUL 字符)。 -
void *zmq_ctx_new ();
创建新的 ZeroMQ context。如果成功的话,这个函数会为新创建的上下文返回一个 opaque 句柄。否则返回NULL
。 -
void *zmq_socket (void *context, int type);
这个函数使用具体的上下文创建一个 zmq 套接字并为新创建的套接字返回一个 opaque 句柄。type
参数指定了套接字的类型,这决定了在套接字上的对话语义。
新创建的套接字开始是未绑定的,同时不与任何端点有联系。为了做到一个消息跟着套接字流动这样的操作,必须用zmq_connect()
连接至少一个端点,或者是至少一个端点必须被使用zmq_bind()
创建为了传入的连接。 -
int zmq_bind (void *socket, const char *endpoint);
在一个套接字上接受传入的连接。这个函数绑定套接字到一个局部的端点然后在这个端点上接受传入的连接。
endpoint
参数是一个包含了transport://
跟随着一个地址
的字符串。transport
指定了是用什么协议。地址
决定了绑定到哪个地址。
该函数支持以下协议:- tcp
- ipc
- inproc
- pgm, epgm
- vmci
-
int zmq_connect (void *socket, const char *endpoint);
从套接字上创建发出的连接。这个函数把套接字连接到端点上然后在那个端点上接受传入的连接。
endpoint
参数是一个包含了transport://
跟随着一个地址
的字符串。transport
指定了是用什么协议。地址
决定了连接到哪个地址。
该函数支持的协议同上。
由于其他语言的字符串格式不像 C 的格式是 str\0
而是 str
的,因此需要手动添加最后的终止符。因此原教程编写了比较方便使用的方法。原文中建立了一个规则,即 ZeroMQ 字符串是指定长度的,并且在没有尾随 null 的情况下在线发送。在最简单的情况下(我们将在示例中这样做),一个 ZeroMQ 字符串巧妙地映射到一个 ZeroMQ 消息帧。
在 C 中,接收 ZeroMQ 字符串并将其作为有效的 C 字符串传递给应用程序:
// Receive ZeroMQ string from socket and convert into C string
// Chops string at 255 chars, if it's longer
static char *s_recv (void *socket) {
char buffer [256];
int size = zmq_recv (socket, buffer, 255, 0);
if (size == -1)
return NULL;
if (size > 255)
size = 255;
buffer [size] = \0;
/* use strndup(buffer, sizeof(buffer)-1) in *nix */
return strdup (buffer);
}
这形成了一个方便的辅助函数,本着让我们可以重用有利可图的东西的精神,让我们编写一个类似的 s_send
函数,以正确的 ZeroMQ 格式发送字符串,并将其打包到一个我们可以重用的头文件中。
结果是 zhelpers.h
,这个头文件可以让我们可以用 C 编写更甜美和更短的 ZeroMQ 应用程序。这是一个相当长的源代码,并且只能在 C 中使用,所以请 在闲暇时阅读它。
二、几种协议
1. request-reply
从一个 Hello World 示例开始。我们将制作一个客户端和一个服务器。客户端向服务器发送“Hello”,服务器回复“World”。这是 C 中的服务器,它在端口 5555 上打开一个 ZeroMQ 套接字,读取其上的请求,并用“World”回复每个请求:
// Hello World server
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
int main (void)
{
// Socket to talk to clients
void *context = zmq_ctx_new ();
void *responder = zmq_socket (context, ZMQ_REP);
int rc = zmq_bind (responder, "tcp://*:5555");
assert (rc == 0);
while (1) {
char buffer [10];
zmq_recv (responder, buffer, 10, 0);
printf ("Received Hello\n");
sleep (1); // Do some 'work'
zmq_send (responder, "World", 5, 0);
}
return 0;
}
// Hello World client
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
int main (void)
{
printf ("Connecting to hello world server...\n");
void *context = zmq_ctx_new ();
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5555");
int request_nbr;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
char buffer [10];
printf ("Sending Hello %d...\n", request_nbr);
zmq_send (requester, "Hello", 5, 0);
zmq_recv (requester, buffer, 10, 0);
printf ("Received World %d\n", request_nbr);
}
zmq_close (requester);
zmq_ctx_destroy (context);
return 0;
}
如果您终止服务器 (Ctrl-C) 并重新启动它,客户端将无法正常恢复。从崩溃的进程中恢复并不是那么容易。
2. pub-sub
一个推送天气更新的示例,其中包含邮政编码、温度和相对湿度。我们将生成随机值,就像真实的气象站一样。
// Weather update server
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
#include "zhelpers.h"
int main (void)
{
// Prepare our context and publisher
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_PUB);
int rc = zmq_bind (publisher, "tcp://*:5556");
assert (rc == 0);
// Initialize random number generator
srandom ((unsigned) time (NULL));
while (1) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = randof (100000);
temperature = randof (215) - 80;
relhumidity = randof (50) + 10;
// Send message to all subscribers
char update [20];
sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
s_send (publisher, update);
}
zmq_close (publisher);
zmq_ctx_destroy (context);
return 0;
}
这种更新流没有开始也没有结束,就像一个永无止境的广播。
这是客户端应用程序,它侦听更新流并获取与指定邮政编码有关的任何内容,默认情况下是纽约市,因为这是开始任何冒险的好地方:
// Weather update client
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
#include "zhelpers.h"
int main (int argc, char *argv [])
{
// Socket to talk to server
printf ("Collecting updates from weather server...\n");
void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://localhost:5556");
assert (rc == 0);
// Subscribe to zipcode, default is NYC, 10001
const char *filter = (argc > 1)? argv [1]: "10001 ";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
filter, strlen (filter));
assert (rc == 0);
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
char *string = s_recv (subscriber);
int zipcode, temperature, relhumidity;
sscanf (string, "%d %d %d",
&zipcode, &temperature, &relhumidity);
total_temp += temperature;
free (string);
}
printf ("Average temperature for zipcode '%s' was %dF\n",
filter, (int) (total_temp / update_nbr));
zmq_close (subscriber);
zmq_ctx_destroy (context);
return 0;
}
请注意,当您使用 SUB 套接字时,您必须使用 zmq_setsockopt()
和 ZMQ_SUBSCRIBE
设置订阅,如此代码所示。如果您不设置任何订阅,您将不会收到任何消息。这是初学者常犯的错误。订阅者可以设置多个订阅,这些订阅被加在一起。也就是说,如果更新匹配任何订阅,订阅者就会收到它。订户也可以取消特定的订阅。订阅通常(但不总是)是一个可打印的字符串。请参阅 zmq_setsockopt()
以了解其工作原理。
PUB-SUB 套接字对是异步的。客户端在一个循环中执行 zmq_recv()
(或者一次,如果这就是它仅需要调用一次的话)。尝试向 SUB 套接字发送消息将导致错误。同样,该服务器端会根据需要经常执行 zmq_send()
,但不得在 PUB 套接字上执行 zmq_recv()
。
理论上,对于 ZeroMQ 套接字,哪一端连接和哪一端绑定并不重要。但是,在实践中存在未记录(原文:undocumented)的差异,文章稍后会谈到。现在,除非您的网络设计使接下来的操作不可能,请绑定 PUB 并连接 SUB。
关于 PUB-SUB 套接字,还有一件更重要的事情需要了解:您无法准确知道订阅者何时开始接收消息。即使你启动一个订阅者,等待一段时间,然后再启动发布者,订阅者也总是会错过发布者发送的第一条消息。这是因为当订阅者连接到发布者时(需要很短但非零的时间),发布者可能已经在发送消息了。
这种“反应迟钝”的症状经常影响到足够多的人,因此我们将对其进行详细解释。请记住,ZeroMQ 执行异步 I/O,即在后台执行。假设您有两个节点按以下顺序执行此操作:
- 订阅者连接到端点并接收和计算消息。
- 发布者绑定到端点并立即发送 1,000 条消息。
那么订阅者很可能不会收到任何东西。你会眨眼(blink),检查你是否设置了正确的过滤器,然后再试一次,订阅者仍然不会收到任何东西。
建立 TCP 连接涉及往返握手,这需要几毫秒,具体取决于您的网络和对等点之间的跃点数。在那个时候,ZeroMQ 可以发送很多消息。为了便于讨论,假设建立连接需要 5 毫秒,并且同一链路每秒可以处理 1M 消息。在订阅者连接到发布者的 5 毫秒内,发布者仅需 1 毫秒即可发送这些 1K 消息。
在 第 2 章 - 套接字和模式中,我们将解释如何同步发布者和订阅者,以便在订阅者真正连接并准备好之前您不会开始发布数据。有一个简单而愚蠢的方法可以延迟发布者,那就是休眠(sleep)。但是,不要在实际应用程序中这样做,因为它非常脆弱,而且不优雅且缓慢。使用睡眠向自己证明发生了什么,然后等待这章内容 第 2 章 - 套接字和模式,看看如何正确地做到这一点。
同步的替代方法是简单地假设发布的数据流是无限的,没有起点也没有终点。还假设订户不关心在它启动之前发生了什么。这就是我们构建天气客户端示例的方式。
因此,客户端订阅了它选择的邮政编码并收集了该邮政编码的 100 个更新。如果邮政编码是随机分布的,这意味着来自服务器的大约一千万次更新。您可以启动客户端,然后启动服务器,客户端将继续工作。您可以根据需要随时停止和重新启动服务器,客户端将继续工作。当客户端收集到它的一百个更新时,它计算平均值,打印它,然后退出。
关于发布-订阅(pub-sub)模式的一些要点:
- 订阅者可以连接到多个发布者,每次使用一个连接调用。然后数据将到达并交错(“公平排队”),这样就没有一个发布者会淹没其他发布者。
- 如果发布者没有连接的订阅者,那么它将简单地丢弃所有消息。
- 如果您使用的是 TCP 而订阅者速度很慢,则消息将在发布者上排队。稍后我们将研究如何使用“高水位线”来保护发布者免受这种情况的影响。
- 从 ZeroMQ v3.x 开始,当使用连接协议(
tcp:@< *>@*
或ipc:@<
>@
)时,过滤发生在发布者端。使用epgm:@<//>@
协议,过滤发生在订阅者端。在 ZeroMQ v2.x 中,所有过滤都发生在订阅者端。
3. Divide and Conquer(并行计算)
注意,Ventilator 是呼吸机,worker 是工人,sink 是水槽,我不太会翻译这三个词,因此均使用英文代替。下文中如果出现了这三种中文翻译证明是机翻忘删了233
Ventilator 生成 100 个任务,每个任务都有一条消息告诉 worker 休眠一定毫秒数:
// Task ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
#include "zhelpers.h"
int main (void)
{
void *context = zmq_ctx_new ();
// Socket to send messages on
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// Socket to send start of batch message on
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");
printf ("Press Enter when the workers are ready: ");
getchar ();
printf ("Sending tasks to workers...\n");
// The first message is "0" and signals start of batch
s_send (sink, "0");
// Initialize random number generator
srandom ((unsigned) time (NULL));
// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = randof (100) + 1;
total_msec += workload;
char string [10];
sprintf (string, "%d", workload);
s_send (sender, string);
}
printf ("Total expected cost: %d msec\n", total_msec);
zmq_close (sink);
zmq_close (sender);
zmq_ctx_destroy (context);
return 0;
}
这是 worker 的应用程序。它收到一条消息,休眠该秒数,然后发出已完成的信号:
// Task worker
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
#include "zhelpers.h"
int main (void)
{
// Socket to receive messages on
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// Process tasks forever
while (1) {
char *string = s_recv (receiver);
printf ("%s.", string); // Show progress
fflush (stdout);
s_sleep (atoi (string)); // Do the work
free (string);
s_send (sender, ""); // Send results to sink
}
zmq_close (receiver);
zmq_close (sender);
zmq_ctx_destroy (context);
return 0;
}
下面是 sink 的应用程序。它收集了 100 个任务,然后计算整个处理所花费的时间,因此如果有多个任务,我们可以确认这些 worker 确实在并行运行:
// Task sink
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
#include "zhelpers.h"
int main (void)
{
// Prepare our context and socket
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// Wait for start of batch
char *string = s_recv (receiver);
free (string);
// Start our clock now
int64_t start_time = s_clock ();
// Process 100 confirmations
int task_nbr;
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (receiver);
free (string);
if (task_nbr % 10 == 0)
printf (":");
else
printf (".");
fflush (stdout);
}
// Calculate and report duration of batch
printf ("Total elapsed time: %d msec\n",
(int) (s_clock () - start_time));
zmq_close (receiver);
zmq_ctx_destroy (context);
return 0;
}
让我们更详细地看一下这段代码的某些方面:
- workers 将上游连接到 ventilator,将下游连接到 sink。这意味着您可以任意添加 worker。如果 workers 绑定到他们的端点,您将需要(a)更多端点(b)每次添加 worker 时修改 ventilator 和/或 sink。我们说呼吸机和水槽是我们架构的稳定部分,而工人是它的动态部分。
- 我们必须将批处理的开始与所有正在运行的 worker 同步。这是 ZeroMQ 中一个相当常见的陷阱,没有简单的解决方案。
zmq_connect
方法需要一定的时间。因此,当一组 workers 连接到 ventilator 时,第一个成功连接的 worker 将在短时间内收到大量消息,而其他 worker 也在连接。如果您不以某种方式同步批处理的最开头,系统将根本不会并行运行。您可以尝试取消 ventilator 中的等待,看看会发生什么。 - ventilator 的 PUSH 套接字将任务平均分配给 workers(假设他们在当前批次开始分发之前都已连接)。这称为负载平衡,我们将再次更详细地讨论它。
- sink 的 PULL 套接字平均地收集来自 workers 的结果。这称为公平排队。
管道模式还表现出“慢连接”综合症,导致人们指责 PUSH 套接字没有正确地进行负载平衡。如果您正在使用 PUSH 和 PULL,并且您的一个工作人员收到的消息比其他工作人员多得多,那是因为 PULL 套接字的加入速度比其他套接字快,并且在其他工作人员设法连接之前获取了很多消息。如果你想要适当的负载平衡,你可能想看看 第 3 章 - 高级请求-回复模式中的负载平衡模式。
三、使用 ZeroMQ 编程
获得正确的上下文(context)
ZeroMQ 应用程序总是从创建上下文开始,然后使用它来创建套接字。在 C 中,它是zmq_ctx_new()
调用。您应该在您的流程中创建和使用一个上下文。从技术上讲,上下文是单个进程中所有套接字的容器,并充当进程内
套接字的传输,这是在一个进程中连接线程的最快方式。如果在运行时一个进程有两个上下文,它们就像单独的 ZeroMQ 实例。如果这正是你想要的,好的,但除此之外请记住:
在进程开始时调用 zmq_ctx_new()
一次,在结束时调用 zmq_ctx_destroy()
一次。
如果您正在使用 fork()
系统调用,请在 fork 之后,然后在子进程代码的开头执行 zmq_ctx_new()
。一般来说,您会在子进程中做有趣的(ZeroMQ)事情,而在父进程中做无聊的流程管理。
干净地退出
使用 C 时,您必须在使用完对象后小心地释放它们,否则会导致内存泄漏、应用程序不稳定以及通常的恶果。
内存泄漏是一回事,但 ZeroMQ 对于退出应用程序的方式非常挑剔。原因是技术性的和痛苦的,但结果是如果你让任何套接字保持打开状态,zmq_ctx_destroy()
函数将永远挂起(hang)。并且即使您关闭所有套接字,且如果有待定(pending)的连接或发送,zmq_ctx_destroy()
默认情况下也会永远等待,除非您在关闭它们之前在这些套接字上设置 LINGER 为零。
我们需要担心的 ZeroMQ 对象是消息、套接字和上下文。幸运的是它非常简单,至少在简单的程序中是这样:
- 尽可能使用
zmq_send()
和zmq_recv()
,因为它避免了使用zmq_msg_t
对象的需要。 - 如果您确实使用
zmq_msg_recv()
,请始终在完成后立即通过调用zmq_msg_close()
释放接收到的消息。 - 如果您要打开和关闭大量套接字,这可能表明您需要重新设计您的应用程序。在某些情况下,除非您销毁上下文,否则套接字句柄不会被释放。
- 退出程序时,关闭套接字,然后调用
zmq_ctx_destroy()
。这破坏了上下文(destroy context)。
至少对于 C 开发是这样。在具有自动对象销毁功能的语言中,套接字和上下文将在您离开作用域时被销毁。如果您使用异常,则必须在类似于“最终”块的地方进行清理,这与任何资源一样。
如果您正在进行多线程工作,它会变得比这更复杂。首先,不要尝试从多个线程使用同一个套接字。请不要解释为什么您认为这会非常有趣,只是请不要这样做。接下来,您需要关闭每个有正在进行的请求的套接字。正确的方法是设置一个较低的 LINGER 值(1 秒),然后关闭套接字。如果您的语言绑定在您销毁上下文时没有自动为您执行此操作,我建议您发送一个补丁。
最后,破坏上下文。这将导致附加线程(即共享相同上下文)中的任何阻塞接收或轮询或发送返回错误。捕获该错误,然后设置 linger on,关闭该线程中的套接字,然后退出。不要两次破坏相同的上下文。主线程中的 zmq_ctx_destroy
将阻塞,直到它知道的所有套接字都安全关闭。
瞧!它非常复杂和痛苦,以至于任何称职的语言绑定作者都会自动执行此操作并使套接字关闭舞蹈变得不必要。
四、为什么需要 ZeroMQ
ZeroMQ:一个高效、可嵌入的库,可以解决应用程序在网络上变得非常灵活所需的大部分问题,而且成本不高。
具体来说:
- 它在后台线程中异步处理 I/O。它们使用无锁数据结构与应用程序线程通信,因此并发 ZeroMQ 应用程序不需要锁、信号量或其他等待状态。
- 组件可以动态地来来去去,ZeroMQ 会自动重新连接。这意味着您可以按任何顺序启动组件。您可以创建“面向服务的架构”(SOA),其中服务可以随时加入和离开网络。
- 它会在需要时自动对消息进行排队。它智能地执行此操作,在排队之前将消息推送到尽可能靠近接收方的位置。
- 它有处理过满队列(称为“高水位线”)的方法。当队列已满时,ZeroMQ 会自动阻止发送者或丢弃消息,具体取决于您正在执行的消息传递类型(所谓的“模式”)。
- 它允许您的应用程序通过任意传输相互通信:TCP、多播、进程内、进程间。您无需更改代码即可使用不同的传输方式。
- 它使用取决于消息传递模式的不同策略来安全地处理慢速/阻塞的读取器。
- 它允许您使用各种模式(例如请求-回复和发布-订阅)来路由消息。这些模式是您创建拓扑、网络结构的方式。
- 它允许您创建代理以通过单个调用排队、转发或捕获消息。代理可以降低网络互连的复杂性。
- 它使用在线路上的简单框架,完全按照发送时的方式传递整个消息。如果您写了一条 10k 的消息,您将收到一条 10k 的消息。
- 它不会对消息强加任何格式。它们是从零到千兆字节大的 blob。当您想要表示数据时,您可以选择其他一些产品,例如 msgpack、Google 的协议缓冲区等。
- 它通过在有意义的情况下自动重试来智能地处理网络错误。
- 它可以减少您的碳排放。用更少的 CPU 做更多的事情意味着你的盒子使用更少的电力,你可以让你的旧盒子使用更长时间。Al Gore 会喜欢 ZeroMQ。
实际上 ZeroMQ 做的远不止这些。它对您开发具有网络功能的应用程序的方式具有颠覆性的影响。从表面上看,它是一个受套接字启发的 API,您可以在其上执行 zmq_recv()
和 zmq_send()
。但是消息处理很快成为中心循环,您的应用程序很快就会分解为一组消息处理任务。它优雅而自然。它可以扩展:这些任务中的每一个都映射到一个节点,并且节点通过任意传输相互通信。一个进程中的两个节点(节点是一个线程)、一个盒子上的两个节点(节点是一个进程)或一个网络中的两个节点(节点是一个盒子)——它们都是一样的,没有应用程序代码更改。