首页 > 其他分享 >事件总线EventBus实现邮件推送

事件总线EventBus实现邮件推送

时间:2024-09-01 21:58:36浏览次数:11  
标签:set string get 总线 eventData EventBus 推送 public channel

基于事件总线EventBus实现邮件推送功能

   有时候,有人给我的网站留了言,但是我必须要打开我的网站(https://www.xiandanplay.com/)才知道,所以我便决定给网站增加一个邮件推送的功能,好让我第一时间知道。于是乎,按照我自己的思路,同时为了去学习了解rabbitmq以及EventBus概念,我便设计了一套邮件推送的功能,这里分享出来,可能方案不是很好,大家不喜勿喷。

什么是事件总线   

    事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。

    关于这个概念,网上有很多讲解的,这里我推荐一个讲的比较好的(事件总线知多少)

什么是RabbitMQ

   RabbitMQ这个就不用说了,想必到家都知道。

粗糙流程图

简单来解释就是:

      1、定义一个事件抽象类

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public abstract class EventData     {         /// <summary>         /// 唯一标识         /// </summary>         public string Unique { getset; }         /// <summary>         /// 是否成功         /// </summary>         public bool Success { getset; }         /// <summary>         /// 结果         /// </summary>         public string Result { getset; }     }

 2、定义一个事件处理抽象类,以及对应的一个队列消息执行的一个记录

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public abstract class EventHandler<T> where T : EventData     {         public async Task Handler(T eventData)         {             await BeginHandler(eventData.Unique);             eventData = await ProcessingHandler(eventData);             if (eventData.Success)                 await FinishHandler(eventData);         }         /// <summary>         ///  开始处理         /// </summary>         /// <param name="unique"></param>         /// <returns></returns>         protected abstract Task BeginHandler(string unique);         /// <summary>         /// 处理中         /// </summary>         /// <param name="eventData"></param>         /// <returns></returns>         protected abstract Task<T> ProcessingHandler(T eventData);         /// <summary>         /// 处理完成         /// </summary>         /// <param name="eventData"></param>         /// <returns></returns>         protected abstract Task FinishHandler(T eventData);     }         [Table("Sys_TaskRecord")]     public class TaskRecord : Entity<long>     {         /// <summary>         /// 任务类型         /// </summary>         public TaskRecordType TaskType { getset; }         /// <summary>         /// 任务状态         /// </summary>         public int TaskStatu { getset; }         /// <summary>         /// 任务值         /// </summary>         public string TaskValue { getset; }         /// <summary>         /// 任务结果         /// </summary>         public string TaskResult { getset; }         /// <summary>         /// 任务开始时间         /// </summary>         public DateTime TaskStartTime { getset; }         /// <summary>         /// 任务完成时间         /// </summary>         public DateTime? TaskFinishTime { getset; }         /// <summary>         /// 任务最后更新时间         /// </summary>         public DateTime? LastUpdateTime { getset; }         /// <summary>         /// 任务名称         /// </summary>         public string TaskName { getset; }         /// <summary>         /// 附加数据         /// </summary>         public string AdditionalData { getset; }     }

   3、定义一个邮件事件消息类,继承自EventData,以及一个邮件处理的Hanler继承自EventHandler

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class EmailEventData:EventData    {        /// <summary>        /// 邮件内容        /// </summary>        public string Body { getset; }        /// <summary>        /// 接收者        /// </summary>        public string Reciver { getset; }    }   public class CreateEmailHandler<T> : Core.EventBus.EventHandler<T> where T : EventData    {        private IEmailService emailService;        private IUnitOfWork unitOfWork;        private ITaskRecordService taskRecordService;        public CreateEmailHandler(IEmailService emailService, IUnitOfWork unitOfWork, ITaskRecordService taskRecordService)        {            this.emailService = emailService;            this.unitOfWork = unitOfWork;            this.taskRecordService = taskRecordService;        }        protected override async Task BeginHandler(string unique)        {            await taskRecordService.UpdateRecordStatu(Convert.ToInt64(unique), (int)MqMessageStatu.Processing);            await unitOfWork.CommitAsync();        }          protected override async Task<T> ProcessingHandler(T eventData)        {            try            {                EmailEventData emailEventData = eventData as EmailEventData;                await emailService.SendEmail(emailEventData.Reciver, emailEventData.Reciver, emailEventData.Body, "[闲蛋]收到一条留言");                eventData.Success = true;            }            catch (Exception ex)            {                await taskRecordService.UpdateRecordFailStatu(Convert.ToInt64(eventData.Unique), (int)MqMessageStatu.Fail,ex.Message);                await unitOfWork.CommitAsync();                eventData.Success = false;            }            return eventData;          }          protected override async Task FinishHandler(T eventData)        {            await taskRecordService.UpdateRecordSuccessStatu(Convert.ToInt64(eventData.Unique), (int)MqMessageStatu.Finish,"");            await unitOfWork.CommitAsync();        }

   4、接着就是如何把事件消息和事件Hanler关联起来,那么我这里思路就是把EmailEventData的类型和CreateEmailHandler的类型先注册到字典里面,这样我就可以根据EmailEventData找到对应的处理程序了,找类型还不够,如何创建实例呢,这里就还需要把CreateEmailHandler注册到DI容器里面,这样就可以根据容器获取对象了,如下

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33   public void AddSub<T, TH>()              where T : EventData              where TH : EventHandler<T>         {             Type eventDataType = typeof(T);             Type handlerType = typeof(TH);             if (!eventhandlers.ContainsKey(typeof(T)))                 eventhandlers.TryAdd(eventDataType, handlerType);             _serviceDescriptors.AddScoped(handlerType);         } -------------------------------------------------------------------------------------------------------------------  public Type FindEventType(string eventName)         {             if (!eventTypes.ContainsKey(eventName))                 throw new ArgumentException(string.Format("eventTypes不存在类名{0}的key", eventName));             return eventTypes[eventName];         } ------------------------------------------------------------------------------------------------------------------------------------------------------------   public object FindHandlerType(Type eventDataType)         {             if (!eventhandlers.ContainsKey(eventDataType))                 throw new ArgumentException(string.Format("eventhandlers不存在类型{0}的key", eventDataType.FullName));             var obj = _buildServiceProvider(_serviceDescriptors).GetService(eventhandlers[eventDataType]);             return obj;         } ----------------------------------------------------------------------------------------------------------------------------------  private static IServiceCollection AddEventBusService(this IServiceCollection services)         {             string exchangeName = ConfigureProvider.configuration.GetSection("EventBusOption:ExchangeName").Value;             services.AddEventBus(Assembly.Load("XianDan.Application").GetTypes())                 .AddSubscribe<EmailEventData, CreateEmailHandler<EmailEventData>>(exchangeName, ExchangeType.Direct, BizKey.EmailQueueName);             return services;         }

   5、发送消息,这里代码简单,就是简单的发送消息,这里用eventData.GetType().Name作为消息的RoutingKey,这样消费这就可以根据这个key调用FindEventType,然后找到对应的处理程序了

1 2 3 4 5 6 7 8 9 using (IModel channel = connection.CreateModel()) {      string routeKey = eventData.GetType().Name;      string message = JsonConvert.SerializeObject(eventData);      byte[] body = Encoding.UTF8.GetBytes(message);      channel.ExchangeDeclare(exchangeName, exchangeType, truefalsenull);      channel.QueueDeclare(queueName, truefalsefalsenull);      channel.BasicPublish(exchangeName, routeKey, null, body); }

  

6、订阅消息,核心的是这一段

  Type eventType = _eventBusManager.FindEventType(eventName);
  var eventData = (T)JsonConvert.DeserializeObject(body, eventType);
  EventHandler<T> eventHandler = _eventBusManager.FindHandlerType(eventType)  as       EventHandler<T>;

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public void Subscribe<T, TH>(string exchangeName, string exchangeType, string queueName)            where T : EventData            where TH : EventHandler<T>        {            try            {                _eventBusManager.AddSub<T, TH>();                IModel channel = connection.CreateModel();                channel.QueueDeclare(queueName, truefalsefalsenull);                channel.ExchangeDeclare(exchangeName, exchangeType, truefalsenull);                channel.QueueBind(queueName, exchangeName, typeof(T).Name, null);                var consumer = new EventingBasicConsumer(channel);                consumer.Received += async (model, ea) =>                {                    string eventName = ea.RoutingKey;                    byte[] resp = ea.Body.ToArray();                    string body = Encoding.UTF8.GetString(resp);                    try                    {                        Type eventType = _eventBusManager.FindEventType(eventName);                        var eventData = (T)JsonConvert.DeserializeObject(body, eventType);                        EventHandler<T> eventHandler = _eventBusManager.FindHandlerType(eventType) as EventHandler<T>;                        await eventHandler.Handler(eventData);                    }                    catch (Exception ex)                    {                        LogUtils.LogError(ex, "EventBusRabbitMQ", ex.Message);                    }                    finally                    {                        channel.BasicAck(ea.DeliveryTag, false);                    }                                     };                channel.BasicConsume(queueName, autoAck: false, consumer: consumer);            }            catch (Exception ex)            {                LogUtils.LogError(ex, "EventBusRabbitMQ.Subscribe", ex.Message);            }          }

    注意,这里我使用的时候有个小坑,就是最开始是用using包裹这个IModel channel = connection.CreateModel();导致最后程序启动后无法收到消息,然后去rabbitmq的管理界面发现没有channel连接,队列也没有消费者,最后发现可能是using执行完后就释放掉了,把using去掉就好了。

   好了,到此,我的思路大概讲完了,现在我的网站留言也可以收到邮件了,那么多测试邮件,哈哈哈哈哈

 

  大家感兴趣的话可以去我的网站(https://www.xiandanplay.com/)踩一踩,互加友链也可以的,谢谢大家,不喜勿喷喽!

标签:set,string,get,总线,eventData,EventBus,推送,public,channel
From: https://www.cnblogs.com/Leo_wl/p/18391772

相关文章

  • Git 中创建一个新项目并推送到远程仓库
    具体流程为1.创建本地项目目录mkdirmy_projectcdmy_project2.初始化git仓库gitinit指令会在当前目录中创建一个”.git”文件夹,用于存储版本控制信息3.添加文件到仓库echo"#MyProject">>README.mdgitadd.创建README.MD文件,并将其添加到git的暂存区......
  • EventSource事件流(允许网页与服务器之间建立一个持久的连接,服务器可以通过这个连接向
     EventSource是JavaScript中用于处理服务器发送事件(Server-SentEvents,SSE)的接口。它允许网页与服务器之间建立一个持久的连接,服务器可以通过这个连接向客户端推送更新。EventSource通常用于需要实时更新数据的场景,比如实时通知、股票价格更新等。 基本用法//创建一......
  • PCIe学习笔记(一)-------1.2 PCIe总线简介
    1,PCIe概览PCIe是第三代外围设备总线,英文缩写为PCIe或者PCIExpress。PCIe是点对点,全双工的差分传输信号总线。点对点互连表示链路上的电气负载有限,从而使发送和接收频率可扩展到更高。PCIe目前成熟的版本有GEN1,GEN2,GEN3,GEN4和GEN5,每一代相较上一代传输速率和传输带宽都有了很大幅......
  • linux总线设备驱动模型
    linux总线设备驱动模型platform平台驱动模型linux自带I2C、SPI、USB等总线。但是在SOC中有些外设是没有总线这个概念的,但是又要使用总线、驱动和设备模型该怎么办呢?为了解决此问题,Linux提出了platform这个虚拟总线,相应的就有platform_driver和platform_devi......
  • .net core下使用事件总线
    .netcore下使用事件总线        随着微服务的火热,DDD(领域驱动设计模式)思想风起云涌,冲击着整个软件生态系统。其中,事件总线那是必须知道的了,于是我便抱着一个学习DDD的心态搭建了一个博客网站,目前该网站正在建设阶段,后续会不断完善,这里我只是讲一下我里面所用到的......
  • 【计算机组成原理】五、中央处理器:2.数据通路、控制器(单总线结构、专用数据通路、硬布
    3.数据通路文章目录3.数据通路3.1单总线结构3.2专用数据通路4.控制器4.1硬布线控制器4.1.1基本结构4.1.2设计步骤==微操作总结==1)分析每个阶段的微操作序列2)安排微操作时序的原则3)电路设计4.1.3特点4.2微程序控制器==微指令包含关系==4.2.1基本结构4.2.2微指令格式4.......
  • 如何将本地项目推送到Git上(以Gitee为例)
    1.创建Gitee仓库首先,你需要在Gitee上创建一个新的仓库:登录Gitee账户。点击右上角的“+”按钮,然后选择“新建仓库”。填写仓库名称和描述信息,选择仓库的可见性(公开或私有),然后点击“创建”。(注意其他的选项不要选择)2.初始化本地项目(如果尚未初始化)如果你已经......
  • FPGA与STM32_FSMC总线通信实验
    在嵌入式系统设计中,FPGA(现场可编程门阵列)与微控制器如STM32的通信是常见的应用场景。STM32通过FSMC(灵活静态存储控制器)接口与FPGA进行数据交换,可以实现高速数据传输和复杂逻辑控制。本文将介绍如何通过FSMC总线实现STM32与FPGA之间的通信。实验目的理解FSMC总线的基本工作......
  • rabbitmq实现用户关系绑定信息推送
    1.MQ三大要点交换机队列Key2.交换机  交换机是消息队列系统中的一个核心组件,主要用于将消息路由到一个或多个队列中。交换机通过不同的路由规则来决定消息的去向。根据不同的类型,交换机可以有不同的路由策略:直连交换机(DirectExchange):根据消息的路由键(RoutingKey......
  • DeviceNet主站转EtherCAT从站总线协议转换网关配置详情
    DeviceNet转EtherCAT如何实现有效连接与通信,这一问题常常让许多人感到困惑不已。现在,就来为大家专门解答这个疑问。远创智控YC-ECT-DNTM型设备有着极为出色的表现,能够成功地解决这个困扰众人的难题。接下来,会为大家详尽地介绍该设备的功能,像智能的网络管理功能以及强大的兼容性......