首页 > 编程语言 >C#使用Socket实现分布式事件总线,不依赖第三方MQ

C#使用Socket实现分布式事件总线,不依赖第三方MQ

时间:2024-10-29 08:49:53浏览次数:4  
标签:Socket C# MQ 事件 eventClient query new event

使用 Socket 实现的分布式事件总线,支持 CQRS,不依赖第三方 MQ。

CodeWF.EventBus.Socket 是一个轻量级的、基于 Socket 的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。

Command

Command

Query

Query

特性

  • 轻量级:不依赖任何外部 MQ 服务,减少了系统复杂性和依赖。

  • 高性能:基于 Socket 的直接通信,提供低延迟、高吞吐量的消息传递。

  • 灵活性:支持自定义事件类型和消息处理器,易于集成到现有系统中。

  • 可扩展性:支持多客户端连接,适用于分布式系统环境。

通信协议

通过 TCP 协议进行数据交互,协议包结构如下:

0.0.8@2x

安装

通过NuGet包管理器安装CodeWF.EventBus.Socket

Install-Package CodeWF.EventBus.Socket

服务端使用

运行事件服务

在服务端代码中,创建并启动EventServer实例以监听客户端连接和事件:

using CodeWF.EventBus.Socket;

// 创建事件服务器实例
IEventServer eventServer = new EventServer();

// 启动事件服务器,监听指定IP和端口
eventServer.Start("127.0.0.1", 9100);

停止事件服务

当不再需要事件服务时,调用Stop方法以优雅地关闭服务器:

eventServer.Stop();

客户端使用

连接事件服务

在客户端代码中,创建EventClient实例并连接到事件服务器:

using CodeWF.EventBus.Socket;

// 创建事件客户端实例
IEventClient eventClient = new EventClient();

// 连接到事件服务器,使用eventClient.ConnectStatus检查连接状态
eventClient.Connect("127.0.0.1", 9100));

订阅事件

订阅特定类型的事件,并指定事件处理函数:

eventClient.Subscribe<NewEmailCommand>("event.email.new", ReceiveNewEmailCommand);

private void ReceiveNewEmail(NewEmailCommand command)
{
    // 处理新邮件通知
    Console.WriteLine($"收到新邮件,主题是{message.Subject}");
}

发布命令(Command)

发布事件到指定的主题,供已订阅的客户端处理:

// 发布新邮件通知事件
eventClient.Publish("event.email.new", new NewEmailCommand { Subject = "恭喜您中Github一等奖", Content = "我们很开心,您在2024年7月...", SendTime = new DateTime(2024, 7, 27) });

查询(Query)

查询指定主题,需要有接收查询端订阅相同的主题(即生产者),收到请求后,再以相同的主题发布查询结果:

eventClient.Subscribe<EmailQuery>("event.email.query", ReceiveEmailQuery);

private void ReceiveEmailQuery(EmailQuery query)
{
    // 执行查询请求,准备查询结果
    var response = new EmailQueryResponse { Emails = EmailManager.QueryEmail(request.Subject) };

    // 以相同的主题,发布查询结果
    if (_eventClient!.Publish("event.email.query", response,
        out var errorMessage))
    {
        Logger.Info($"Response query result: {response}");
    }
    else
    {
        Logger.Error($"Response query failed: {errorMessage}");
    }
}

其他端可使用相同的主题查询(即消费者):

var response = _eventClient!.Query<EmailQuery, EmailQueryResponse>("event.email.query",
    new EmailQuery() { Subject = "Account" },
    out var errorMessage);
if (string.IsNullOrWhiteSpace(errorMessage) && response != null)
{
    Logger.Info($"Query event.email.query, result: {response}");
}
else
{
    Logger.Error(
        $"Query event.email.query failed: [{errorMessage}]");
}

取消订阅事件

不再需要接收某类事件时,可以取消订阅:

eventClient.Unsubscribe<NewEmailNotification>("event.email.new", ReceiveNewEmail);

断开事件服务

完成事件处理或需要断开与服务器的连接时,调用Disconnect方法:

eventClient.Disconnect();
Console.WriteLine("断开与事件服务的连接");

注意事项

  • 确保服务端和客户端使用的地址和端口号一致,并且端口未被其他服务占用。
  • 在生产环境中,服务端应配置为监听公共 IP 地址或适当的网络接口。
  • 考虑到网络异常和服务重启等情况,客户端可能需要实现重连逻辑。
  • 根据实际需求,可以扩展EventServerEventClient类以支持更复杂的功能,如消息加密、认证授权等。

标签:Socket,C#,MQ,事件,eventClient,query,new,event
From: https://www.cnblogs.com/lsq6/p/18512082

相关文章

  • PbootCMS 模板默认Sqlite数据库转Mysql数据库教程详解
    下载数据库文件:进入程序目录的data文件夹,找到.db后缀的数据库文件,下载到本地。下载相关工具:sqlitestudio:用于打开管理Sqlite数据库,导出Sql格式文件。SQLITE转MYSQL工具:用于转换数据库。导出Sqlite数据库:打开SqliteStudio,将.db文件拖入程序中。点击左上角导航......
  • 基于BUCK拓扑的数字电源PLECS仿真
    需求输入DC300V输出DC165V,输入范围185V-425V1、理论分析        实现电压变换的拓扑为BUCK电路,对于BUCK的工作原理资料随处可见,此处不再赘述。本文接下来主要介绍其数字控制环路的设计与DSP实现。        根据电源外特性要求,滤波电感大小设计为6.4uH,输出......
  • Anthropic Computer Use for Mac:释放Mac潜力的AI控制工具
    在人工智能技术飞速发展的今天,我们对于计算机的控制和交互方式也在不断进化。最近,一款名为AnthropicComputerUseforMac的工具引起了我的注意。这是一款专为Mac系统设计的AI控制工具,它不仅能够在Mac上原生运行,还能通过macOS命令和工具直接控制系统,提供了一系列令人兴奋的功......
  • PbootCMS 织梦后台左侧菜单空白不显示的解决办法
    权限问题:检查 data 文件夹及其子文件夹是否有写入权限,确保在Linux和Windows系统中都正确设置了权限。对于Linux系统,可以通过命令行使用 chmod-R777data 命令赋予写入权限;对于Windows系统,则需要通过文件属性手动设置。缺少必要的文件夹:如果 /data/cache/, /dat......
  • 基于EasyX图形化编程[C语言]
    EasyX是针对C/C++的图形库,可以帮助使用C/C++语言的程序员快速上手图形和游戏编程。本期介绍如何用EsayX进行图形化编程,代码辅助介绍#include<stdio.h>#include<easyx.h>#include<mmsystem.h>#pragmacomment(lib,"winmm.lib")//包含库文件intmain(){ //一,创建窗......
  • Micropython PICO 随记-使用PIO驱动Syn6288
    开发环境MCU:Pico1(无wifi版)传感器模块:Syn6288使用固件:自编译版本开发环境:MacBookProSonoma14.5开发工具:Thonny4.1.6开发语言:MicroPython1.20.0资料学习StateMachineApi参考官方代码UART通讯协议Syn6288手册代码使用创建两个StateMachine,分别用于发送待......
  • npm 包的命名空间介绍,以及@typescript-eslint/typescript-eslint
    npm包的命名空间是一个重要的概念,用于组织和管理相关的包。通过命名空间,开发者可以避免命名冲突、增强包的可读性和可维护性。以下是关于npm命名空间的详细介绍,并以@typescript-eslint作为示例。1.命名空间的结构命名空间的格式为@scope/package-name:@scope:这是......
  • vs+qt修改本地ico(已有一份ico)
    (此方法只修改文件夹里exe的ico,运行时的ico需要从ui文件中修改)1、有ICO文件2、右键项目,添加ico资源3、选择ico文件,此时项目列表中会出现ico文件4、清楚项目,重新生成项目5、右键rc文件,查看代码6、此时会出现两个ico7、关闭rc文件,找到resource.h文件,打开8、找到IDI_ICON......
  • 【论文分享】HashGAT-VCA:一种结合哈希函数和图注意力网络的矢量元胞自动机模型,用于城
    本文考虑地块内部异质性,提出一个结合哈希函数和图注意力网络(GAT)的矢量元胞自动机(VCA)方法,用于研究城市土地利用变化;并将该模型应用于模拟深圳市2009年至2012年的城市土地利用变化,结果表明,HashGAT-VCA模型的模拟准确性显著优于其他VCA模型。【论文题目】HashGAT-VCA:Avecto......
  • OCPP1.6-1-文档概述
    OCPP1.6文档概述目前OCA官方OCPP1.6的最新版文档是OCPP_1.6_documentation_2019_12其中包含了6个文件和schemas文件夹:序号文档名描述0ocpp-1.6edition2.pdfOpenChargePointProtocol1.6协议内容1ocpp-1.6-errata-sheet.pdfOCPP1.6勘误表2ocpp-......