首页 > 其他分享 >简单写一个eventbus

简单写一个eventbus

时间:2024-04-15 12:36:06浏览次数:26  
标签:Task 一个 IntegrationEvent IIntegrationEventHandler handler 简单 eventbus public

前言

闲暇之余,简单写一个eventbus。

正文

什么是eventbus?

eventbus 是一个开源的发布订阅模式的框架,用于简化程序间不同组件的通信。
它允许不同组件间松耦合通信,组件之间不通过直接引用的方式,而是事件的方式进行消息传递。

下面进行代码演示:

首先是发布订阅,那么就应该有发布方法和订阅方法,因为是消息传递,那么就应该还有启动消费消息的方法。

public interface IEventBus : IDisposable
{
    Task Publish<T>(T @event) where T : IntegrationEvent;

    Task Subscribe<T>(IIntegrationEventHandler<T> handler)
        where T : IntegrationEvent;

    Task StartConsume();
}

大体我们要实现上面的功能。

然后我们可以定义事件的基础信息:

public class IntegrationEvent
{
    public Guid Id { get; set; }

    public DateTime OccurredOn { get; set; }

    public IntegrationEvent()
    {
        Id = Guid.NewGuid();
        OccurredOn = DateTime.Now;
    }
}

比如说要有唯一的id,同时要有事件发生的时间。

订阅的话,那么需要指定处理的对象。

public interface IIntegrationEventHandler
{
}

public interface IIntegrationEventHandler<in TIntegrationEvent> :
    IIntegrationEventHandler where TIntegrationEvent : IntegrationEvent
{
    Task Handler(TIntegrationEvent @event);
}

处理对象设计也很简单,就是需要创建一个有能够处理IntegrationEvent的对象即可。

这里很多人会疑惑,为什么很多框架的泛型接口类,往往会创建一个非泛型的接口。

这个其实是为了进一步抽象,方便做集合处理,下面将会介绍到。

然后就可以写一个内存型的eventbus。

public class InMemoryEventBus : IDisposable
{
    private Dictionary<string, List<IIntegrationEventHandler>>
        _dictionary = new Dictionary<string, List<IIntegrationEventHandler>>();

    public async Task Publish<T>(T @event) where T : IntegrationEvent
    {
        var fullName = @event.GetType().FullName;
        if (fullName == null)
        {
            return;
        }

        var handlers = _dictionary[fullName];

        foreach (var integrationEventHandler in handlers)
        {
            if (integrationEventHandler is IIntegrationEventHandler<T> handler)
            {
                await handler.Handler(@event);
            }
        }
    }

    public async Task Subscribe<T>(IIntegrationEventHandler<T> handler)
        where T : IntegrationEvent
    {
        var fullname = typeof(T).FullName;
        if (fullname == null)
        {
            return;
        }

        if (_dictionary.ContainsKey(fullname))
        {
            var handlers = _dictionary[fullname];
            handlers.Add(handler);
        }
        else
        {
            _dictionary.Add(fullname, new List<IIntegrationEventHandler>()
            {
                handler
            });
        }
    }

    public void Dispose()
    {
        // 移除相关连接等
    }
}

里面实现了eventbus的基本功能。可以看到上面的_dictionary,里面就是IIntegrationEventHandler,
所以泛型接口会继承一个非泛型的接口,是为了进一步抽象声明,对一些集合处理是很方便的。

然后这里为什么没有直接继承Ieventbus呢? 而是实现eventbus的功能。

因为Ieventbus 其实是面向用户的,继承ieventbus只是一个门面,相当于适配器。

而InMemoryEventBus 是为了实现功能。

可以理解为InMemoryEventBus 是我们电脑主板、cpu等,然后我们只需要一个实现其接口的组件,从而和外部连接。

而不是整个内核系统和外部直连,那么我们可以使用InMemoryEventBusClient 作为这个组件。

public class InMemoryEventBusClient : IEventBus
{
    private readonly InMemoryEventBus _eventBus;
    
    public InMemoryEventBusClient()
    {
        _eventBus = new InMemoryEventBus();
    }

    public void Dispose()
    {
        _eventBus.Dispose();
    }

    public async Task Publish<T>(T @event) where T : IntegrationEvent
    {
        await _eventBus.Publish(@event);
    }

    public async Task Subscribe<T>(IIntegrationEventHandler<T> handler) where T : IntegrationEvent
    {
        await _eventBus.Subscribe(@handler);
    }

    public Task StartConsume()
    {
        // 运行相关的消费
        return Task.CompletedTask;
    }
}

InMemoryEventBusClient 负责实现外部接口,InMemoryEventBus 负责实现功能。

从而达到解耦的目的。

同样的例子还有polly,这个框架应该很出名了,其中他里面就有很多衍生的组件,都是调用内核来适配其他框架定义的接口。

上面可以看到StartConsume什么都没有做,其功能被Publish给融合了。

只要publish就消费了。

如果我们扩展kafka的话,那么consume其实就是拉取数据然后消费,publish其实就是推向kafka,中间就是序列号和反序列话的过程。

eventbus 完善篇后续再补。

标签:Task,一个,IntegrationEvent,IIntegrationEventHandler,handler,简单,eventbus,public
From: https://www.cnblogs.com/aoximin/p/18068563

相关文章

  • python 实现简单的web功能
    BaseHTTPRequestHandler介绍这是一个以TCPServer为基础开发的模块,可以在请求外层添加http协议报文,发送http协议。基于BaseHTTPServer的HttpServer的处理流程:1.HTTPServer绑定对应的应答类(BaseHTTPRequestHandler)http_server=HTTPServer((’’,int(port)),ServerHTTP)2.监......
  • C / C++ 文件简单混编 + 轻量级日志系统使用
    在项目工程里面,不一定全部是c或者c++文件,有时候是混合一起,这个时候如果使用makefile编译的话,就要考虑兼容两种类型的文件编译了;实战经验如下:根据自己的风格制作响应的打印和日志记录,makefile编写如下:CC=gccCPP=g++#文件夹路径ROOTPATH=.INCLUDE=-I./cfg/inc-I$(......
  • 使用OpenCV来实现读取一个目录下的所有图像,然后将它们调整大小为1920x1080像素,并保存
    使用OpenCV来实现读取一个目录下的所有图像,然后将它们调整大小为1920x1080像素,并保存的步骤如下:安装OpenCV库:如果你还没有安装OpenCV库,可以通过pip安装:pipinstallopencv-python编写Python脚本:importosimportcv2defresize_images_in_directory(source_dir,target......
  • gitee基于webhooks实现前端简单自动化部署
    1.为什么采用自动化部署简而言之,程序员优秀传统:懒=>高级生产力.基于gitee进行的自动化部署,服务器环境为Ubuntu基于webhooks进行的自动化部署更加轻快便捷2.部署步骤1).服务器购买可以购买阿里云抢占式服务器进行实验,花费应该在一大洋以内,或者直接购买一年低配服务......
  • 搭建个人图书馆!一个简单的在线个人书库
    大家好,我是Java陈序员。今天,给大家介绍一个在线的个人图书管理系统,支持在线阅读。关注微信公众号:【Java陈序员】,获取开源项目分享、AI副业分享、超200本经典计算机电子书籍等。项目介绍talebook——一个基于Calibre的简单的个人图书管理系统,支持在线阅读。友情提醒:个......
  • ABP -Vnext框架一步一步入门落地教程——使用ABP -Vnext创建一个WEBAPI接口(二)
    人生需要指引,而复制是成功最快的方式,兄弟们让我们发车吧————代码大牛ljy开发主题:何谓开发应用服务端在官方开发教程这一段的内容叫做开发应用服务端,作为现在前后端分离的开发模式来说,一个应用就分为前端页面框架和后端API,页面框架调用WEBAPI实现业务就完事了。所以咱们今天......
  • 简单智能手机原型设计
    实验一一、实验题目:原型设计二、实验目的:掌握产品原型设计方法和相应工具使用。三、实验要求与过程(1)对比分析墨刀、Axure、Mockplus等原型设计工具的各自的适用领域及优缺点。1、墨刀适用领域:墨刀主要适用于移动应用、小程序等交互设计,尤其擅长快速原型设计和高保真度的交......
  • 一行return 写一个递归函数! 20240414
    defmake_anonymous_factorial():returnlambdan:1ifn==0elsereduce(lambdax,y:x*y,range(1,n+1))这个函数make_anonymous_factorial的目的是创建并返回一个匿名函数(也称为lambda函数),该匿名函数能够计算一个给定非负整数n的阶乘。下面是对这个函数的详细......
  • 简单了解前端性能监控
    作为一名开发来讲,以下场景你有没有遇到:点击这个按钮怎么没反应了页面为什么白了怎么一直正在加载很多用户说图片加载不出来......那么有一款性能监控产品太重要了,但是性能相关的东西实在太多了。那么从一个熟悉又陌生的api开始,performance。1.什么是performancemdn上是......
  • 1个表A多个字段a,b,关联另一个表B的一个字段c,并取出B表的d字段,如何写sql
    方式有两种,第一种如下:A数据表中多个字段对应B数据表的ID,现在要把B表的其他字段一起查询出来一、数据表: 1、SPEED_DETECTION_ROAD一、数据表: 1、SPEED_DETECTION_ROAD 它的START_POINT_ID和END_POINT_ID字段对应下面表的ID,2、SECTION_INFO: 二、sql语句SELE......