回调函数 dotnet .net 消费处理系统 生产者-消费者类型
一个简单的消息处理系统,它使用了.NET的System.Threading.Channels
命名空间来创建一个无界的通道(channel),用于在不同的任务之间传递MessageWrapper
对象。
无界限的消息队列(Unbounded Channel)是.NET中System.Threading.Channels
命名空间提供的特性之一,它允许你创建一个通道,用于在生产者和消费者之间传递数据项。
所谓“无界限”,意味着理论上这个队列可以无限增长,不会因为队列满了而拒绝新的消息(当然实际上会受限于系统的内存资源)。
因此,在使用无界队列时需要特别小心,确保有适当的机制来控制队列的增长,避免潜在的内存溢出问题。
using System.Threading.Channels;
public class MessageWrapper
{
public string Message { get; set; }
public DateTime Timestamp { get; set; }
}
public class Program
{
public static async Task Main(string[] args)
{
// 创建消息队列
var messageQueue = Channel.CreateUnbounded<MessageWrapper>();
// 启动消息处理任务
var cts = new CancellationTokenSource();
Task handleMessageTask = ReceiveMessageHandlerTask(messageQueue, cts.Token);
// 模拟发送消息
while (true)
{
string? v = System.Console.ReadLine();
if (!string.IsNullOrEmpty(v))
{
if (v == "q")
{
cts.Cancel();
await handleMessageTask;
break;
}
var messageWrapper = new MessageWrapper
{
Message = $"Message : {v}",
Timestamp = DateTime.Now
};
await messageQueue.Writer.WriteAsync(messageWrapper);
}
}
}
public static async Task HandleMessage(MessageWrapper messageWrapper)
{
Console.WriteLine($"收到message_{messageWrapper.Timestamp}_{messageWrapper.Message}:");
await Task.Delay(100);
}
public static async Task ReceiveMessageHandlerTask(
Channel<MessageWrapper> messageQueue,
CancellationToken cancellationToken
)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
// Console.WriteLine("Waiting for messages...");
var messageWrapper = await messageQueue.Reader.ReadAsync(cancellationToken);
await HandleMessage(messageWrapper);
}
catch (OperationCanceledException)
{
// 如果是取消异常,直接退出循环而不打印错误信息
Console.WriteLine("Cancellation detected. Stopping message handler...");
break;
}
catch (Exception ex)
{
Console.WriteLine($"Error handling message: {ex.Message}");
}
}
}
}
标签:事件处理,Task,messageWrapper,messageQueue,Message,dotnet,Console,net,public
From: https://www.cnblogs.com/zhuoss/p/18686627