首页 > 其他分享 >.net core使用channel消息队列

.net core使用channel消息队列

时间:2023-06-26 19:22:24浏览次数:48  
标签:core Task heartBeatsChannel 队列 线程 使用 net channel

.net core使用channel消息队列

背景

最近做一个项目,连接了很多设备,需要保存设备的心跳数据,刚开始的做法是直接接收到设备的数据之后进行心跳数据的保存,但是随着设备多了起来,然后设备的使用时长不断的加大,对数据库的压力也比较大,所以想着优化一下。

方案调研

1.使用第三方中间件

常见的使用redis,或者mq,只需要不断的向中间件发送数据即可,redis使用队列,如果是mq直接发送消息即可,使用起来简单方便,但是要引入这些中间件,目前的架构里面没有,需要自己去起服务,维护。

2.使用channel

System.Threading.Channels 是.NET Core 3.0 后推出的新的集合类型, 具有异步API,高性能,线程安全等特点,它可以用来做消息队列,进行数据的生产和消费, 公开的 WriterReader api对应消息的生产者和消费者,也让Channel更加的简洁和易用,与Rabbit MQ 等其他队列不同的是,Channel 是进程内的队列

目前就介绍来看非常完美,不需要添加第三方中间件,直接添加现有的模块即可。

代码实现

选择了使用channel来做优化。拿到设备数据之后直接把消息丢入到channel,然后后台使用定时任务或者自己实现hostservice去不断的消费数据。

生产者代码

 public async Task ProduceHeartBeat(string message)
        {
            await channel.Writer.WriteAsync(message);
        }

不断的向里面写入数据即可.

消费者代码

        /// <summary>
        /// timespan时间内消费多少数据
        /// </summary>
        /// <param name="count"></param>
        /// <param name="timeSpan"></param>
        /// <returns></returns>
        public async Task<List<string>> ConsumeHeartBeatAsync(int count,TimeSpan timeSpan)
        {
            var result = new List<string>(count);
            CancellationTokenSource cts = new CancellationTokenSource();
            var cancellationToken = cts.Token;
            cts.CancelAfter(timeSpan);
            int rcount = 0;
            while ( !cancellationToken.IsCancellationRequested && rcount<count)
            {
                //await Task.Delay(2000);
                if (channel.Reader.TryRead(out var number))
                {
                    Console.WriteLine(number);
                    result.Add(number);
                    rcount++;
                }
                else
                {
                    break;
                }
                
            }  
            return result;
        }

里面加入了一个cancellationToken,进行消费的时长限制。在此时长内消费多少条数据,超时直接结束。

这就是基本的代码

后台定时消费数据

public class HeartBeatService : BackgroundService
    {
        private readonly HeartBeatsChannel heartBeatsChannel;

        public HeartBeatService(HeartBeatsChannel heartBeatsChannel)
        {
            this.heartBeatsChannel = heartBeatsChannel;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            try
            {

                Task.Factory.StartNew(() =>
                {
                    while (!stoppingToken.IsCancellationRequested)
                    {
                        //阻塞的队列使得一直在同一个线程运行
                        Process(15,heartBeatsChannel).Wait();
                    }

                }, TaskCreationOptions.LongRunning);

                Console.WriteLine("主线程 现在运行的线程id为:" + Thread.CurrentThread.ManagedThreadId);

                }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }


        /// <summary>
        /// 消费数据
        /// </summary>
        /// <param name="count">一次消费数量</param>
        /// <param name="heartBeatsChannel"></param>
        /// <returns></returns>
        private async Task Process(int count ,HeartBeatsChannel heartBeatsChannel)
        {
            Console.WriteLine("子线程_现在运行的线程id为:" + Thread.CurrentThread.ManagedThreadId);
            //每次消费三十个
            if (heartBeatsChannel.IsHasContent)
            {
                //int count = 15;
                //进行消费
                await heartBeatsChannel.ConsumeHeartBeatAsync(count, TimeSpan.FromSeconds(3));
            }           
            await Task.Delay(3000);
        }

使用的是BackgroundServic,直接实现要处理的业务逻辑就好了。在这里使用的是TaskCreationOptions.LongRunning,新开一个线程去处理心跳数据。

总结

以上就是主要的实现全过程,完整的代码在github

https://github.com/lackguozi/LearnChannelWebApi

实际上完全可以不用后台去定时消费数据,channel有很多api可以去处理,比如WaitToReadAsync(),但是这里没有使用,主要是不想持续的占数据库资源???总结的话学习了channel的用法,底层似乎使用了deque??只稍微看了下源码,但是看到了许多的lock,这个是必不可少的。还是巨硬轮子造的好 =_=

标签:core,Task,heartBeatsChannel,队列,线程,使用,net,channel
From: https://www.cnblogs.com/guoxiaotian/p/17506536.html

相关文章

  • .NETCore项目在Windows下构建Docker镜像并本地导出分发到CentOS系统下
    在Windows下使用Docker,我们选择DockerDesktop这个软件,非常方便。DockerDesktop介绍及安装DockerDesktop是适用于Mac、Linux或Windows环境的一键安装应用程序,使您能够构建和共享容器化应用程序和微服务。它提供了一个简单的GUI(图形用户界面),允许您直接从机器管理容器、应用程......
  • SQLServer Core 序列号使用CPU限制的处理
    SQLServerCore序列号使用CPU限制的处理背景有客户是SQLSERVER的数据库.说要进行一下压测.这边趁着最后进行一下环境的基础搭建工作.然后在全闪的环境上面搭建了一个Windows2019+SQL2019的环境发现一个挺好的地方.SQLSERVER会提示,如果使用enterprise的序列号的话仅能......
  • Kubernetes添加用户
    kubernetes中有两种用户,一种是serviceaccount,另一种是普通用户ServiceAccount认证从1.24开始,创建serviceaccount的同时不再创建secretapiVersion:v1kind:ServiceAccountmetadata:name:kubepi-usernamespace:kube-system---apiVersion:rbac.authoriz......
  • 编译 CoreML 模型
    本篇文章译自英文文档CompileCoreMLModels作者是JoshuaZ.Zhang,KazutakaMorita,ZhaoWu更多TVM中文文档可访问→ApacheTVM是一个端到端的深度学习编译框架,适用于CPU、GPU和各种机器学习加速芯片。|ApacheTVM中文站本文介绍如何用Relay部署CoreML模型。首先......
  • F5APM第七期Network Access模式配置
    F5APM第七期NetworkAccess模式配置’......
  • 【HMS Core】web端网页应用集成账号服务,请求/oauth2/v3/token返回状态码403
    【问题描述】web端网页应用接入华为账号,请求/oauth2/v3/token返回状态码403请求代码:响应日志:【问题分析】这是由于跨域访问报错了,建议从服务器端调用token接口重试,不要把client_secret暴露到web端【解决方案】服务器端调用token接口参考链接:https://developer.huawei.com/consumer......
  • Taurus .Net Core 微服务开源框架:Admin 插件【1】 - 微服务节点管理
    前言:最近发现NetCore的文章有点少,特来补几篇。上一篇:Taurus.mvc.NetCore微服务开源框架发布V3.1.7:让分布式应用更高效。自上篇之后,期间更新了4个小版本,更新如下:-----V3.1.7.1----------------1、优化:请求头输出【标识主机IP号、进程号】(2023-06-07)2、优化:Gateway负载......
  • Kubernetes编程——通过命令行使用 API
    通过命令行使用API 长话短说,我们将使用以batchAPI组为例来讲cli相关的操作。 首先,需要在终端运行下面的命令:[root@localhost~]#kubectlproxy--port=8089Startingtoserveon127.0.0.1:8089这个命令把kubernetesAPI服务代理到了本地,并处理了有关身......
  • NETCORE - 动态类型 dynamic 取值
    NETCORE-动态类型dynamic取值环境:.NET6+WebApi 控制器传参时,使用dynamic 安装nuget 包 Program.cs 中//添加对象序列化程序为Newtonsoft.Jsonbuilder.Services.AddControllers().AddNewtonsoftJson(options=>{options.SerializerSettings.ContractR......
  • Docker 中的 .NET 异常了怎么抓 Dump
    一:背景1.讲故事有很多朋友跟我说,在Windows上看过你文章知道了怎么抓Crash,CPU爆高,内存暴涨等各种Dump,为什么你没有写在Docker中如何抓的相关文章呢?瞧不上吗?哈哈,在DUMP的分析旅程中,跑在Docker中的.NET占比真的不多,大概10个dump有1-2个是docker中的,市场决定了我......