首页 > 系统相关 >进程间通信组件ZeroMQ详解

进程间通信组件ZeroMQ详解

时间:2024-12-30 12:09:38浏览次数:5  
标签:string url 模式 间通信 详解 msg ZeroMQ public

在一些复杂的项目中,往往会由不同功能的程序组成,且在程序运行期间,各个程序还需要进行互相通信,实现进程间通信的方式有很多种,最常用的就是通过消息中间件,比如RabbitMQ,Kafka,以及ZeroMQ等,而RabbitMQ和Kafka这两款中间件往往都需要独立安装步骤才能使用,ZeroMQ却不需要独立安装部署,而是作为动态库直接在程序中引用即可。今天以一个简单的小例子,简述ZeroMQ的常见用法,仅供学习分享使用,如有不足之处,还请指正。

ZeroMQ (又被称为 ØMQ, 0MQ, or zmq),虽然看起来像是可嵌入的网络组件,实际上却是一款并发框架。ZeroMQ作为一款开源通用消息组件,通过Socket可以将原子消息通过不同协议(进程内,进程间,TCP和广播等)进行传输。基于ZeroMQ,可以由多种模式进行选择,如:fan-out,发布-订阅,任务分发,请求-应答等,并且支持1-N,N-N等多端通信。对于C#开发人员,ZeroMQ有两种方式可供选择,1. NetMQ,提供一个端口给C#;2. clrzmq4通过C#绑定到libzmq。而NetMQ正是ZeroMQ推荐的使用方式

ZeroMQ通信模式

 

ZeroMQ提供了多种通信模式,主要有以下几种:

  • 请求应答(Request-Response)模式,此模式是ZeroMQ所有通信方式中最简单的一种模式,当客户端发出请求时,期望得到应答,且必须得到应答,才算一个完整的通信。
  • 发布订阅(Publish-Subscriber)模式,此模式发送者并不直接发送消息给接收者,而是将要发送的消息进行分类,接收者根据分类只接收自己感兴趣的消息,这就是发布订阅模式。这里提及的消息分类,通常被称为主题(topic)或过滤器(filter)。ZeroMQ通过多种信息来表达主题内容,以字节数组或者字符串的形式表示。
  • 生产者-消费者(Push-Pull)模式,此模式是一个消息分发机制,主要用于任务并发和并行处理场景,正常情况下,一个或者多个生产者发送消息,而一个或多个消费者接收并处理这些消息,这种模式特别适合于需要高效任务分发的场景,如分布式计算,大数据处理等。
  • 路由经销商(Router-Dealer)模式,此种模式是请求回复模式的一种升级,当Router收到消息的时候,会自动在消息的前面添加一帧,用来识别发送端的地址。当发送一个消息的时候,需要先发送一帧对端的地址,然后再发送消息,如果目的地址指向的对端不存在了,这个消息就会被丢弃。对端的地址默认情况下由ZMQ来产生一个唯一标识UUID。DEALER可以任意读写,不需要额外的地址帧,当有多个对端的时候,循环给单个对端发送消息。(注意:不是群发消息,与PUB不同)。
  • 多发布订阅(XPub-XSub),通常情况下,发布订阅模式适用于一个发布者,多个订阅者的场景。如果需要多个消息发布者,那XPub-XSub模式将会比较适用。

本文主要讲解请求应答模式和发布订阅模式,其他通信模式,如果感兴趣可以参考官方文档。

请求应答

请求应答(Request-Response),此模式是ZeroMQ所有通信方式中最简单的一种模式,当客户端发出请求时,期望得到应答,且必须得到应答,才算一个完整的通信。请求应答模式是同步阻塞模式,如果发送消息顺序错误,会抛出异常。正确的请求应答顺序如下:

  1. 请求端(RequestSocket)发送一个消息
  2. 响应端(ResponseSocket)阅读请求消息
  3. 响应端(ResponseSocket)发送响应信息
  4. 请求端(RequestSocket)接收响应端(ResponseSocket)发送的信息。

请求应答模式,主要由RequestSocket和ResponseSocket组成,实现消息的请求和应答。

请求端发送消息之前,需要先进行连接Connect,然后才能发送消息。示例代码如下所示:

public class ZeroMQRequest:IDisposable
{
	public Action<string> Received;
	public Action<string> Sended;
	private string url = string.Empty;

	private RequestSocket request;

	public ZeroMQRequest(string url)
	{
		this.request = new RequestSocket();
		this.url = url;
	}

	public void Connect()
	{
		request.Connect(this.url);
	}

	public void BeginReceive()
	{
		string msg = this.request.ReceiveFrameString();
		Received?.Invoke(msg);
	}

	public void SendMsg(string msg)
	{
		this.request.SendFrame(msg);
		if (Sended != null)
		{
			Sended.Invoke(msg);
		}
	}

	public void Disconnect()
	{
		request.Disconnect(this.url);
	}

	public void Dispose()
	{
		request.Close();
		request.Dispose();
	}
}

响应端接收消息之前,需要先进行绑定(Bind)到对应的网络端口,然后才能接收消息。示例代码如下所示:

public class ZeroMQResponse:IDisposable
{
	public Action<string> Received;
	public Action<string> Sended;
	private string url = string.Empty;
	private ResponseSocket response;

	public ZeroMQResponse(string url)
	{
		this.url = url;
		this.response = new ResponseSocket();
		this.response.Bind(this.url);
	}

	public void BeginReceive()
	{
		Task.Run(() =>
		{
			while (true)
			{
				string msg = this.response.ReceiveFrameString();
				Received?.Invoke(msg);
				//收到回复
				Send("Ok");
			}
		});
		
	}

	public void Send(string msg)
	{
		this.response.SendFrame(msg);
		if (Sended != null)
		{
			Sended.Invoke(msg);
		}
	}

	public void Dispose()
	{
		this.response.Dispose();
	}
}

上述代码是将ReuqestSocket和ResponseSocket进行封装,并通过委托Action公开了接收和发送后响应接口,在使用时进行调用即可。

请求应答模式示例截图如下所示:

由于请求应答模式是阻塞模式,如果没有发送就调用接收方法,或连续调用接收方法,或连续发送(发送没有响应就再次发送),则会抛出异常。

连续两次发送,异常信息如下所示:

连续两次接收,异常信息如下所示:

发布订阅

发布订阅模式,将要发送的信息按照主题(Topic)进行分类,哪个接收端订阅了这个主题,就接收对应的消息,而不是直接发发送给接收者,这样有助于对消息进行分类处理。所以发布订阅模式并非阻塞模式,也不是一对一的请求响应,而是按需分类,异步响应模式。此模式主要有PublisherSocket和SubscriberSocket两个类,分别用于处理消息的发布和订阅。正确的发布订阅顺序,如下所示:

  1. 发送端:定义PubliserSocket对象,并绑定(Bind)到指定端口。然后发送主题和消息。
  2. 接收端:定义SubscriberSocket对象,连接到指定端口,订阅主题,接收指定主题的消息。

消息发布类(PublisherSocket),在消息发送之前,首先绑定一个端口,然后才能发送主题和消息,示例代码如下所示:

public class ZeroMQPublisher : IDisposable
{
	private string url=string.Empty;
	private PublisherSocket publisher;
	public Action<string> Sended;


	public ZeroMQPublisher(string url)
	{
		this.url = url;
		this.publisher = new PublisherSocket();
		this.publisher.Bind(url);
	}

	public void Send(string topic,string msg)
	{
		this.publisher.SendMoreFrame(topic);
		this.publisher.SendFrame(msg);
		if(Sended != null)
		{
			Sended.Invoke($"send msg,topic:{topic},msg:{msg},time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
		}
	}

	public void Dispose()
	{
		this.publisher.Close();
		this.publisher.Dispose();
	}
}

消息订阅类(SubscriberSocket),在消息接收之前,首先连接端口,订阅主题(Subscribe方法),然后才能进行消息的接收,示例代码如下所示:

public class ZeroMQSubscriber : IDisposable
{
	private string url=string.Empty;
	private SubscriberSocket subscriber;

	public Action<string> Received;
	private bool isRunning = false;

	public ZeroMQSubscriber(string url)
	{
		this.url = url;
		this.subscriber = new SubscriberSocket();
		this.subscriber.Connect(url);
		this.subscriber.Subscribe(string.Empty);
		this.isRunning = true;
	}

	public void BeginReceive()
	{
		Task.Run(() =>
		{
			while(isRunning)
			{
				var topic = this.subscriber.ReceiveFrameString();
				var msg = this.subscriber.ReceiveFrameString();
				if(Received != null)
				{
					Received.Invoke($"received msg,topic:{topic},msg:{msg},time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
				}
			}
		});
	}

	public void DisConnect()
	{
		isRunning = false;
		this.subscriber.Disconnect(this.url);
	}

	public void Dispose()
	{
		this.isRunning=false;
		this.subscriber.Close();
		this.subscriber?.Dispose();
	}
}

注意,发布订阅模式是单向触发的,即消息发布者,不可以接收消息;消息接收者,也不可以发布消息。接收端在调用Subscribe方法时,如果主题为空,则表示可以订阅任何主题。

发布订阅模式示例截图如下所示:

源码下载

关注老码识途公众号,回复关键字ZeroMQ,即可获取示例源码,如下图所示:

以上就是《进程间通信组件库ZeroMQ详解》的全部内容。

标签:string,url,模式,间通信,详解,msg,ZeroMQ,public
From: https://www.cnblogs.com/hsiang/p/18622806

相关文章

  • 详解权限管理模型:从 DAC 到混合模型的原理、特点与应用场景
    自主访问控制模型(DAC)原理:资源的所有者可以自主地决定哪些用户或组可以访问该资源以及对资源的访问权限,如读、写、执行等。特点:灵活性高,资源所有者对资源有绝对的控制权;但安全性相对较低,容易导致权限滥用。应用场景:适用于小型、相对简单的系统或组织内部,如个人电脑上的文件系统......
  • 【详解】解决Eclipse发布到Tomcat丢失依赖jar包的问题
    目录解决Eclipse发布到Tomcat丢失依赖jar包的问题问题原因解决方案自动化构建考虑结论代码概述代码的基本组成示例代码(Python)解决Eclipse发布到Tomcat丢失依赖jar包的问题在Web开发过程中,使用Eclipse作为IDE并将项目发布到Tomcat服务器上是常见的操作。然而,有时在......
  • 【详解】ElasticSearchQuery查询方式
    目录ElasticsearchQuery查询方式1.MatchQuery(匹配查询)2.TermQuery(精确查询)3.RangeQuery(范围查询)4.BoolQuery(布尔查询)5.其他查询方式结论ElasticsearchQuery查询方式Elasticsearch(ES)是一个基于Lucene的高性能、分布式、开源搜索引擎,提供了多种灵活的查询......
  • Python中指数概率分布函数的绘图详解
    在数据科学和统计学中,指数分布是一种应用广泛的连续概率分布,通常用于建模独立随机事件发生的时间间隔。通过Python,我们可以方便地计算和绘制指数分布的概率密度函数(PDF)。本文将详细介绍指数分布的原理、应用场景,并提供详细的代码示例,展示如何在Python中绘制指数分布的概率密......
  • Java实现拍卖系统详解
    一、项目背景与需求分析随着互联网技术的飞速发展,电子商务领域不断拓展新的业务模式,在线拍卖系统应运而生并逐渐成为一种重要的商业交易方式。在当今数字化的时代,人们越来越倾向于通过网络平台进行各类交易活动,在线拍卖系统能够打破传统拍卖在时间和空间上的限制,使得参与者可以在......
  • xargs命令详解
    xargs是一个强大的命令行工具,用于将标准输入数据转换为命令行参数,并执行一个命令。以下是xargs的一些详细用法和选项:基本用法xargs命令的基本格式如下:command|xargsanother_command这里,command的输出将作为another_command的参数。选项-0或--null:输入项之间用......
  • 按库存生产 (Make-To-Stock, MTS) 的计划策略配置和应用详解
    在SAP系统中,按库存生产(MTS)是一种基于预测安排生产的模式,与客户订单无直接关联。SAP系统为MTS模式预配置了多种计划策略,以满足不同企业的需求。本文将重点介绍计划策略10、11、40和52的配置和应用。MTS模式中的计划策略概述下表列出了四种计划策略的主要参数:......
  • RocketMQ 消息顺序与事务机制详解
    目录一、简介二、RocketMQ架构概述三、RocketMQ消息流转过程四、RocketMQ消息顺序与事务五、RocketMQ高可用性与扩展性六、RocketMQ应用场景七、总结一、简介RocketMQ是一款高吞吐量、高可扩展性的分布式消息中间件,由阿里巴巴开源,并已成为Apache的顶级项目......
  • 一文详解-JavaScript中 es5 原型和 es6-class
    一文详解-JavaScript中es5原型和es6-class原型真的有用吗有不少小伙子应该会有这个感觉大家都在说原型prototype很重要,那为什么我却用不到?原因不外乎这几个:框架重度使用者,我们目前的前端主流业务,几乎都是使用vue,react,微信小程序在开发项目。这些框架封装得太过......
  • CSS系列(42)-- Backdrop Filter详解
    前端技术探索系列:CSSBackdropFilter详解......