首页 > 其他分享 >基于channel的异步事件总线

基于channel的异步事件总线

时间:2023-09-19 20:59:34浏览次数:39  
标签:异步 总线 private services var public channel 通道

生成者/消费者概念编程模型

通道是生成者/使用者概念编程模型的实现。 在此编程模型中,生成者异步生成数据,使用者异步使用该数据。 换句话说,此模型将数据从一方移交给另一方。 尝试将通道视为任何其他常见的泛型集合类型,例如 List。 主要区别在于,此集合管理同步,并通过工厂创建选项提供各种消耗模型。 这些选项控制通道的行为,例如允许它们存储的元素数,以及达到该限制时会发生什么情况,或者通道是由多个生成者还是多个使用者同时访问

channel简介

channel提供了用于在生成者和使用者之间以异步方式传递数据的一组同步数据结构。

channel(管道)提供了有界通道和无界通道

无界通道

该通道可以同时供任意数量的读取器和编写器使用。 或者,可以通过提供 UnboundedChannelOptions 实例在创建无限制通道时指定非默认行为。 该通道的容量不受限制,并且所有写入均以同步方式执行

有界通道

创建有界通道时,该通道将绑定到最大容量。 达到边界时,默认行为是通道异步阻止生成者,直到空间可用。 可以通过在创建通道时指定选项来配置此行为。 可以使用任何大于零的容量值创建有界通道

模式行为

BoundedChannelFullMode.Wait

这是默认值。 WriteAsync调用 以等待空间可用以完成写入操作。 调用 以 TryWrite 立即返回 false 。

BoundedChannelFullMode.DropNewest

删除并忽略通道中的最新项,以便为要写入的项留出空间。

BoundedChannelFullMode.DropOldest

删除并忽略通道中的最旧项,以便为要写入的项留出空间。
BoundedChannelFullMode.DropWrite 删除要写入的项。

Channel.Writer API

生成者功能在 Channel<TWrite,TRead>.Writer 上公开。 下表详细介绍了生成者 API 和预期行为:

ChannelWriter.Complete

将通道标记为已完成,这意味着不再向该通道写入更多项。

ChannelWriter.TryComplete

尝试将通道标记为已完成,这意味着不会向通道写入更多数据。

ChannelWriter.TryWrite

尝试将指定的项写入到通道。 当与无界通道一起使用时,除非通道的编写器通过 ChannelWriter.Complete 或 ChannelWriter.TryComplete 发出完成信号,否则这将始终返回 true。

ChannelWriter.WaitToWriteAsync

返回一个 ValueTask ,当有空间可以写入项时完成。
ChannelWriter.WriteAsync 以异步方式将项写入到通道

Channel.Reader API

ChannelReader.ReadAllAsync

创建允许从通道中读取所有数据的 IAsyncEnumerable

ChannelReader.ReadAsync

以异步方式从通道中读取项。

ChannelReader.TryPeek

尝试从通道中查看项。

ChannelReader.TryRead

尝试从通道中读取项。

ChannelReader.WaitToReadAsync

返回在 ValueTask 数据可供读取时完成的 。

channel的具体使用

https://learn.microsoft.com/zh-cn/dotnet/core/extensions/channels

基于channel实现事件总线

EventDiscriptorAttribute 特性定义

    [AttributeUsage(AttributeTargets.Class,AllowMultiple = false,Inherited = false)]
    public class EventDiscriptorAttribute:Attribute
    {
       /// <summary>
       /// 事件2名称
       /// </summary>
       public string EventName { get; private set; }
       /// <summary>
       /// channel 容量设置
       /// </summary>
       public int Capacity { get; private set; }  
       /// <summary>
       /// 是否维持一个生产者多个消费者模型
       /// </summary>
       public bool SigleReader { get; private set; }

       public EventDiscriptorAttribute(string eventName, int capacity = 1000, bool sigleReader = true)
        {
            EventName = eventName;
            Capacity = capacity;
            SigleReader = sigleReader;
        }   
    }

定义通道容器

    //通道容器单列注入,在拓展类中初始化
    public class ChannelContainer : IChannelContainer
    {
        public List<EventDiscription> Events { get; private set; }

        private readonly IServiceCollection Services;

        public EventHandlerContainer(IServiceCollection services)
        {
            Events = new List<EventDiscription>();
            Services = services;         
            services.AddSingleton<IEventHandlerContainer>(this);
        }

        private bool Check(Type type)
        {
            var discription = Events.FirstOrDefault(p=>p.EtoType == type);

            return discription is null;
        }
        
        ///订阅并且注入EventHandler
        public void TryAddChannle(Type eto,Type handler)
        {
            if(!Check(eto))
            {
                return;
            }

            Events.Add(new EventDiscription(eto, handler));

            var handlerbaseType = typeof(IEventHandler<>);

            var handlertype = handlerbaseType.MakeGenericType(eto);

            if(Services.Any(P=>P.ServiceType==handlertype))
            {
                return;
            }

            Services.AddTransient(handlertype, handler);
        }

        public void TryAddChannle<TEto, THandler>()
        {
            TryAddChannle(typeof(TEto),typeof(THandler));  
        }

        
        public void TryAddChannle(Type eto)
        {
            if (!Check(eto))
            {
                return;
            }

            Events.Add(new EventDiscription(eto));

            var handlerbaseType = typeof(IEventHandler<>);

            var handlertype = handlerbaseType.MakeGenericType(eto);

            if (Services.Any(P => P.ServiceType == handlertype))
            {
                return;
            }
        }

        public void TryAddChannle<TEto>()
        {
            TryAddChannle(typeof(TEto));
        }

事件管理器

事件管理器通过线程安全字典管理事件通道和事件的触发

可以看到在Subscribe 方法中消费者并不是在订阅后立即执行的而是放到EventTrigger中的定义的异步事件中去

消费者执行最后又.,NET提供的托管任务去执行

 public class EventHandlerManager : IEventHandlerManager,IDisposable 
    {
        private ConcurrentDictionary<string, Channel<string>> Channels;
        private bool IsDiposed = false;

        private readonly IServiceProvider ServiceProvider;

        private readonly CancellationToken _cancellation;

        private readonly IEventHandlerContainer _eventHandlerContainer;

        private readonly ILogger _logger;

        private ConcurrentDictionary<string,EventTrigger> EventTriggers;

        private bool IsInitConsumer = true;

        public EventHandlerManager( IServiceProvider serviceProvider
            , IEventHandlerContainer eventHandlerContainer
            , ILoggerFactory loggerFactory)
        {
            ServiceProvider = serviceProvider;
            _cancellation = CancellationToken.None;
            _eventHandlerContainer = eventHandlerContainer;
            Channels = new ConcurrentDictionary<string, Channel<string>>();
            EventTriggers = new ConcurrentDictionary<string, EventTrigger>();
            _logger = loggerFactory.CreateLogger<IEventHandlerManager>();
        }
        //初始化通信管道
        public async Task CreateChannles()
        {
            var eventDiscriptions = _eventHandlerContainer.Events;

            foreach(var item in eventDiscriptions)
            {
                var attribute = item.EtoType.GetCustomAttributes()
                                            .OfType<EventDiscriptorAttribute>()
                                            .FirstOrDefault();

                if (attribute is null)
                {
                    ThorwEventAttributeNullException.ThorwException();
                }

                var channel = Channels.GetValueOrDefault(attribute.EventName);

                if (channel is not null)
                {
                    return;
                }
                //创建无界通道模型,并且初始化容量大小,当无容量写入后等待写入
                channel = Channel.CreateBounded<string>(
                        new BoundedChannelOptions(attribute.Capacity)
                              {
                                SingleWriter = true,
                                SingleReader = false,
                                AllowSynchronousContinuations = false,
                                FullMode = BoundedChannelFullMode.Wait
                        });

                Channels.TryAdd(attribute.EventName, channel);

                _logger.LogInformation($"创建通信管道{item.EtoType}--{attribute.EventName}");
            }
            await Task.CompletedTask;
        }

        private Channel<string> Check(Type type)
        {
            var attribute = type .GetCustomAttributes()
                                   .OfType<EventDiscriptorAttribute>()
                                   .FirstOrDefault();

            if (attribute is null)
            {
                ThorwEventAttributeNullException.ThorwException();
            }

            var channel = Channels.GetValueOrDefault(attribute.EventName);

            if(channel is null)
            {
                ThrowChannelNullException.ThrowException(attribute.EventName);
            } 

            return channel;
        }

        public void Dispose()
        {
            IsDiposed = true;
            IsInitConsumer = true;
            foreach(var trigger in EventTriggers.Values)
            {
                trigger.Dispose();
            }
            _cancellation.ThrowIfCancellationRequested();
        }

        /// <summary>
        /// 生产者
        /// </summary>
        /// <typeparam name="TEto"></typeparam>
        /// <param name="eto"></param>
        /// <returns></returns>
        public async Task WriteAsync<TEto>(TEto eto) where TEto : class
        {
            var channel = Check(typeof(TEto));
            //由于创建的是有界通道,存在有界通道消息积累超过初始大小所以循环判断是否可以写入消息
            while ( await channel.Writer.WaitToWriteAsync(CancellationToken.None)) 
            {
                var data = JsonConvert.SerializeObject(eto);

                await channel.Writer.WriteAsync(data, _cancellation);
            }          
        }
        /// <summary>
        /// 消费者
        /// </summary>
        /// <returns></returns>
        public void Subscribe<TEto>() where TEto : class
        {
            var attribute = typeof(TEto).GetCustomAttributes()
           .OfType<EventDiscriptorAttribute>()
           .FirstOrDefault();

            if (attribute is null)
            {
                ThorwEventAttributeNullException.ThorwException();
            }

            if (EventTriggers.Keys.Any(p => p == attribute.EventName))
            {
                return;
            }

            Func<Task> func = async () =>
            {
                var scope = ServiceProvider.CreateAsyncScope();

                var channel = Check(typeof(TEto));

                var handler = scope.ServiceProvider.GetRequiredService<IEventHandler<TEto>>();

                var reader = channel.Reader;

                try
                {
                    while (await channel.Reader.WaitToReadAsync())
                    {
                        while (reader.TryRead(out string str))
                        {
                            var data = JsonConvert.DeserializeObject<TEto>(str);

                            _logger.LogInformation(str);

                            await handler.HandelrAsync(data);
                        }
                    }
                }
                catch (Exception e)
                {
                    _logger.LogInformation($"本地事件总线异常{e.Source}--{e.Message}--{e.Data}");
                    throw;
                }
            };

            var trigger = new EventTrigger();
            trigger.Recived(func);

            EventTriggers.TryAdd(attribute.EventName, trigger);
        }

        public Task Trigger()
        {
            //只允许初始化一次消费者
            if (IsInitConsumer)
            {
                foreach (var eventTrigger in EventTriggers)
                {
                    Task.Factory.StartNew(async () =>
                    {
                        await eventTrigger.Value.Trigger();
                    });
                }
            }
            IsInitConsumer = false;
            return Task.CompletedTask;  
        }
    }
}

EventTrigger 定义

    public class EventTrigger:IDisposable
    {
        public event Func<Task>? Event;

        public EventTrigger()
        {

        }

        public void Recived(Func<Task> func)
        {
            if (Event is not null)
            {
                return;
            }
            Event += func;
        }

        public Task Trigger()
        {
            if(Event is null)
            {
                return Task.CompletedTask;  
            }
            return Event();
        }

        public void Dispose()
        {
            if( Event is not null )
            {
                Event = null;
            }
        }
    }

托管任务执行EventHandlerManager Trigger()方法

    public class EventBusBackgroundService : BackgroundService
    {
        private readonly IEventHandlerManager _eventHandlerManager;
        public EventBusBackgroundService(IEventHandlerManager eventHandlerManager) 
        { 
            _eventHandlerManager = eventHandlerManager; 
        }  
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            await _eventHandlerManager.Trigger();
        }
    }

拓展类定义

    public  static class EventBusExtensions
    {
        //添加事件总线并且添加channle管道
        public static IServiceCollection AddEventBusAndChannles(this IServiceCollection services,Action<EventHandlerContainer> action)
        {
            services.AddSingleton<IEventHandlerManager, EventHandlerManager>();

            services.AddTransient<ILocalEventBus, LocalEventBus>();
            ///添加托管任务
            services.AddHostedService<EventBusBackgroundService>();

            EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);

            action.Invoke(eventHandlerContainer);

            return services;
        }

        //创建通信管道
        public static async Task InitChannles(this IServiceProvider serviceProvider,Action<IEventHandlerManager> action)
        {
            var scope = serviceProvider.CreateAsyncScope(); 

            var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();
            
            //初始化通信管道
            await eventhandlerManager.CreateChannles();

            action.Invoke(eventhandlerManager);
        }

        //添加本地事件总线
        public static IServiceCollection AddEventBus(this IServiceCollection services)
        {
            services.AddSingleton<IEventHandlerManager, EventHandlerManager>();

            services.AddTransient<ILocalEventBus, LocalEventBus>();

            services.AddHostedService<EventBusBackgroundService>();

            return services;
        }

        //添加通信管道
        public static IServiceCollection AddChannles(this IServiceCollection services, Action<EventHandlerContainer> action)
        {
            EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);

            action.Invoke(eventHandlerContainer);

            return services;
        }
    }
}

使用


    context.Services.AddEventBus();
    //添加通信管道
    context.Services.AddChannles(p =>
    {
        p.TryAddChannle<TestEto>();
    });
    //
    var scope = context.ServiceProvider.CreateScope();

    var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();
    //初始化通信管道
    await eventhandlerManager.CreateChannles();
    //订阅事件
    eventhandlerManager.Subscribe<TestEto>();
    //定义EventHandler
    public class TestEventHandler : IEventHandler<TestEto>,ITransientInjection
    {
        private ILogger _logger;
        public TestEventHandler(ILoggerFactory factory)
        {
            _logger = factory.CreateLogger<TestEventHandler>();
        }   
        public Task HandelrAsync(TestEto eto)
        {
            _logger.LogInformation($"{typeof(TestEto).Name}--{eto.Name}--{eto.Description}");
            return Task.CompletedTask;
        }
    }
    
    //构造函数注入
    [HttpGet]
		public async Task TestLocalEventBus()
		{
			TestEto eto = null;

			for(var i = 0; i < 100; i++)
			{
				eto = new TestEto()
				{
					Name ="LocalEventBus" + i.ToString(),
					Description ="wyg"+i.ToString(),
				};
				await _localEventBus.PublichAsync(eto);
			}
		}

总结

作为一个才毕业一年的初级程序员的我来说这次的channel的事件总线的封装还存在着许多不足

1.无法对消息进行持久化的管理

2.没有对消息异常进行处理

3.没有做到像abp那样自动订阅

当然还存在着一些我不知道问题,欢迎各位大佬提出问题指正

源码链接

这里提一嘴(有个小小的请求),有广州的老哥所在的公司目前还招人的话,希望给小弟一个机会(找了快一个月工作了,确实有点难,联系方式vx:wenyg2001411)

标签:异步,总线,private,services,var,public,channel,通道
From: https://www.cnblogs.com/wygbjd/p/17715763.html

相关文章

  • Asyncio 协程异步笔记
    协程&asyncio&异步1.协程(coroutine)协程不是计算机提供,而是程序员人为创造。协程(coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块互相切换运行。例如:deffunc1():print(1)...print(2)deffu......
  • KingbaseES V8R3集群运维案例之---流复制异步同步及全同步模式配置
    案例说明:通过案例描述KingbaseESV8R3集群异步、同步及全同步强一致性配置,本案例为一主二备的架构。适用版本:KingbaseESV8R3集群架构:集群复制配置参数说明:1)sync_flag[kingbase@node101bin]$cat../etc/HAmodule.conf|grep-isync_#1->synchronouscluster,0->async......
  • SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据
    一、背景:    利用ThreadPoolTaskExecutor多线程异步批量插入,提高百万级数据插入效率。ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。ThreadPoolTaskExecutor是ThreadPoolExecutor的封装,所以,性能更加优秀,推荐ThreadPoolTaskExecutor。二、具体细节:2.1、配置app......
  • c# 异步 与 Link
    异步操作:"异步"指的是代码执行不按照顺序进行,而是通过使用回调函数、Promise、async/await等机制来实现非阻塞式的执行。在异步执行的情况下,代码不会等待前一段代码执行完成,而是继续执行后续的代码。当异步操作完成后,系统会通知代码进行相应的处理。 采用async/await实现......
  • CompletableFuture 异步多线程D优雅!
    一个示例回顾Future一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度。JDK5新增了Future接口,用于描述一个异步计算的结果。虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,我们必须使用Future.get()的方式阻塞调用线程,或者使用轮......
  • Python并发编程——IO模型、阻塞IO、非阻塞IO、多路复用、异步IO、IO模型比较、select
    文章目录每日测验一IO模型介绍二阻塞IO(blockingIO)三非阻塞IO(non-blockingIO)四多路复用IO(IOmultiplexing)五异步IO(AsynchronousI/O)六IO模型比较分析七selectors模块网络并发知识点梳理网络并发知识点梳理每日测验简述死锁现象你用过哪些队列阐述进......
  • 解决SpringBoot Async异步方法获取不到Security Context
     SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);这样设置的话很不安全,不废话,直接上代码,改造一下AsyncConfig就可以了,线程也安全/***@description:线程池的配置*/@ConfigurationpublicclassAsyncConfig{privates......
  • 记录--如何解决异步请求中的返回值问题
    这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助在Web开发中,异步请求是一个常见的操作。然而,在异步请求中正确地获取返回值却可能会变得棘手。本文将介绍如何解决异步请求中的返回值问题,并提供一种解决方案。一、问题描述在某个Web应用程序中,用户遇到了无......
  • 基于Spring事务的可靠异步调用实践
    SpringTxAsync组件是仓储平台组(WMS6)自主研发的一个专门用于解决可靠异步调用问题的组件。通过使用SpringTxAsync组件,我们成功地解决了在仓储平台(WMS6)中的异步调用需求。经过近二年多的实践并经历了两次618活动以及两次双11活动,该组件已经在我们的所有应用中稳定运行并成功应用于......
  • 同步异步 阻塞非阻塞
    同步异步描述的事任务的提交方式 描述的事一段代码或者函数同步:任务提交后,原地等待任务的返回结果,等待的过程中不做任何事(干等)程序层面上表现出来的感觉就是卡住了例子:importtime deffunc():time.sleep(3)print('helloworld')  if__name++=='__main__......