首页 > 其他分享 >回调函数 事件处理 dotnet .net 消费处理系统 生产者-消费者类型

回调函数 事件处理 dotnet .net 消费处理系统 生产者-消费者类型

时间:2025-01-23 09:02:25浏览次数:1  
标签:事件处理 Task messageWrapper messageQueue Message dotnet Console net public

回调函数 dotnet .net 消费处理系统 生产者-消费者类型

一个简单的消息处理系统,它使用了.NET的System.Threading.Channels命名空间来创建一个无界的通道(channel),用于在不同的任务之间传递MessageWrapper对象。

无界限的消息队列(Unbounded Channel)是.NET中System.Threading.Channels命名空间提供的特性之一,它允许你创建一个通道,用于在生产者和消费者之间传递数据项。

所谓“无界限”,意味着理论上这个队列可以无限增长,不会因为队列满了而拒绝新的消息(当然实际上会受限于系统的内存资源)。

因此,在使用无界队列时需要特别小心,确保有适当的机制来控制队列的增长,避免潜在的内存溢出问题。

using System.Threading.Channels;

public class MessageWrapper
{
    public string Message { get; set; }
    public DateTime Timestamp { get; set; }
}

public class Program
{
    public static async Task Main(string[] args)
    {
        // 创建消息队列
        var messageQueue = Channel.CreateUnbounded<MessageWrapper>();

        // 启动消息处理任务
        var cts = new CancellationTokenSource();
        Task handleMessageTask = ReceiveMessageHandlerTask(messageQueue, cts.Token);

        // 模拟发送消息
        while (true)
        {
            string? v = System.Console.ReadLine();
            if (!string.IsNullOrEmpty(v))
            {
                if (v == "q")
                {
                    cts.Cancel();
                    await handleMessageTask;
                    break;
                }
                var messageWrapper = new MessageWrapper
                {
                    Message = $"Message : {v}",
                    Timestamp = DateTime.Now
                };
                await messageQueue.Writer.WriteAsync(messageWrapper);
            }
        }
    }

    public static async Task HandleMessage(MessageWrapper messageWrapper)
    {
        Console.WriteLine($"收到message_{messageWrapper.Timestamp}_{messageWrapper.Message}:");
        await Task.Delay(100);
    }

    public static async Task ReceiveMessageHandlerTask(
        Channel<MessageWrapper> messageQueue,
        CancellationToken cancellationToken
    )
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            try
            {
                // Console.WriteLine("Waiting for messages...");
                var messageWrapper = await messageQueue.Reader.ReadAsync(cancellationToken);
                await HandleMessage(messageWrapper);
            }
            catch (OperationCanceledException)
            {
                // 如果是取消异常,直接退出循环而不打印错误信息
                Console.WriteLine("Cancellation detected. Stopping message handler...");
                break;
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error handling message: {ex.Message}");
            }
        }
    }
}

标签:事件处理,Task,messageWrapper,messageQueue,Message,dotnet,Console,net,public
From: https://www.cnblogs.com/zhuoss/p/18686627

相关文章

  • .NET9 中替换Swagger使用Scalar
    .NET9没有Swagger怎么办?前言在.NET9中,在创建WebAPI项目时,Swagger的使用与.NET8略有不同。.NET9不再内置Swagger,而是生成OpenApi标准的Json文件。如果想在.NET9中使用Swagger,需要手动安装,并配置Swagger。在.NET9中使用Swagger安装包首先安装Nuget包Install-PackageSwashbu......
  • 华为eNSP-telnet配置
            Telnet是一个用于远程登录的协议,它允许用户通过网络连接到远程主机,并在本地计算机上执行远程主机上的命令。telnet基于传输层之上的应用层协议。现在用两台路由器进行telnet配置,我们在R2上开启telnet服务,用R1远程登陆R2。R1配置[R1]intg0/0/0[R1-Gigabi......
  • TensorFlow迁移学习DenseNet121预测10-monkey-species
     In [1]:fromtensorflowimportkerasimporttensorflowastfimportnumpyasnpimportpandasaspdfromscipyimportndimageimportmatplotlib.pyplotasplt In [14]:densenet121=keras.applications.DenseNet121(include_top=Fal......
  • Springboot3整合Netty进行消息传递
    1.服务端1.1字符消息NettyServer/***@authorliu.wenxuan1*@Description:netty服务端处理字符消息解码器问题不能同时处理文件和字符*/publicclassNettyServer{privatestaticfinalintPORT=8080;publicstaticvoidmain(String[]args)th......
  • dotnet CultureInfo遇到欧洲如俄文小数点是逗号想转点的解决办法
    如题,当CultureInfo是俄文(ru-RU)时,浮点数中的点是用逗号表达的,如1.1会显示成1,1,造成很多的麻烦,当然如果全系统中全部采纳逗号作为浮点也没问题,只要用户接受就可以,但有时需要继续用点号,那么解决办法如下。1.修改DefaultThreadCurrentCulture我们知道CultureInfo.CurrentCulture静......
  • Windows 环境下 Docker Desktop + Kubernetes 部署项目指南
    Windows环境下DockerDesktop+Kubernetes部署项目指南一、环境准备二、安装与配置Kubernetes安装windows版的docker启动kubernetes安装windows版的kubectl工具下载k8s-for-docker-desktop启动KubernetesDashboard二、在Kubernetes上部署项目创建一个......
  • .NET 9 new features-C#13新的锁类型和语义
    C#13中,引入了新的锁类型和语义,主要用于增强多线程编程中的同步机制。传统上,C#使用lock关键字与任意的object实例配合,实现线程间的互斥访问。然而,这种方式可能存在性能瓶颈和潜在的死锁风险。为此,C#13在.NET9中引入了新的锁类型System.Threading.Lock,提供更高效和安......
  • TensorFlow迁移学习Resnet50预测10-monkey-species
     In [15]:fromtensorflowimportkerasimporttensorflowastfimportnumpyasnpimportpandasaspdfromscipyimportndimageimportmatplotlib.pyplotasplt In [2]:resnet50=keras.applications.ResNet50(include_top=False,po......
  • .net core 的 swagger 分组简单使用
    1.Programm中添加builder.Services.AddSwaggerGen(c=>{c.SwaggerDoc("v1",newOpenApiInfo{Title="BarcodeAPI",Version="v1"});c.SwaggerDoc("WMS",newOpenApiInfo{Title="W......
  • TensorFlow迁移学习Resnet50预测10-monkey-species
     In [15]:fromtensorflowimportkerasimporttensorflowastfimportnumpyasnpimportpandasaspdfromscipyimportndimageimportmatplotlib.pyplotasplt In [2]:resnet50=keras.applications.ResNet50(include_top=False,po......