首页 > 其他分享 >在 .NET Core 中使用 ActionBlock 实现高效率的多步骤数据处理

在 .NET Core 中使用 ActionBlock 实现高效率的多步骤数据处理

时间:2024-12-24 16:44:16浏览次数:3  
标签:Core dataItem 处理 步骤 await TransformBlock NET ActionBlock

目录

一、引言

上一篇博客 分享了使用 Channel 来实现针对大量数据的多线程异步处理,感谢大哥们在评论中提出的宝贵的问题和建议!本篇将分享使用 ActionBlock 如何实现,欢迎在评论区留言讨论。

二、ActionBlock介绍

什么是 ActionBlock?

ActionBlock是 .NET 中 TPL Dataflow 库的一部分,用于处理数据流和并行任务。它提供了一种简单而强大的方式来处理并行任务,并且可以轻松地实现生产者-消费者模式。

ActionBlock 的特点

  • 并行处理:ActionBlock可以配置为并行处理多个任务,从而提高处理效率
  • 异步编程:支持异步编程模型,可以避免阻塞线程,提高应用程序的响应速度和吞吐量
  • 数据流控制:可以通过设置最大并行度和其他选项来控制数据流的处理方式
  • 任务调度:可以用于调度和管理并行任务,确保任务按预期执行

ActionBlock 的使用场景

  • 生产者-消费者模式:可以用于实现生产者-消费者模式,其中生产者将数据发送到ActionBlock,消费者从ActionBlock中读取数据并进行处理
  • 数据流处理:适用于需要处理大量数据并且需要并行处理的场景,例如日志处理、数据转换等
  • 任务调度:可以用于调度和管理并行任务,确保任务按预期执行

ActionBlock 的基本用法

使用ActionBlock非常简单,主要步骤如下:

  1. 创建 ActionBlock:定义一个 ActionBlock,指定要执行的操作和并行选项
  2. 发送数据到 ActionBlock:使用SendAsync方法将数据发送到 ActionBlock
  3. 完成 ActionBlock:在所有数据发送完成后,调用Complete方法通知 ActionBlock 不再接收新的数据
  4. 等待处理完成:使用Completion属性等待所有数据处理完成

以下是一个简单的示例代码,展示了如何使用 ActionBlock:

using System.Threading.Tasks.Dataflow;

var actionBlock = new ActionBlock<int>(async item =>
{
    // 模拟异步处理
    await Task.Delay(100);
    Console.WriteLine($"Processed item: {item}");
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 4 // 设置最大并行度
});

// 发送数据到 ActionBlock
for (int i = 0; i < 10; i++)
{
    await actionBlock.SendAsync(i);
}

// 完成 ActionBlock
actionBlock.Complete();
// 等待处理完成
await actionBlock.Completion;

Console.WriteLine("All items processed.");

三、假设场景

假设我们有一组数据需要经过两个步骤的处理。每个数据项都需要进行初步处理,然后进行进一步处理。希望步骤2可以在步骤1产生结果数据后立即开始处理,而不是等待步骤1完全处理完毕。

四、解决方案

使用TransformBlockActionBlock来实现生产者-消费者模式。生产者负责读取数据并将其发送到TransformBlock中,消费者从TransformBlock中读取数据并进行处理。
以下是一个简单的示例代码,演示如何使用TransformBlockActionBlock实现生产者-消费者模式来处理数据:

using System.Threading.Tasks.Dataflow;

var cts = new CancellationTokenSource();
// 假设有一组数据
var dataItems = Enumerable.Range(0, 1000).Select(x => $"data_{x}").ToList();

var processor = new DataProcessor(10, cts.Token);
await processor.ProcessAsync(dataItems);

Console.ReadKey();

/// <summary>
/// 数据处理器
/// </summary>
public class DataProcessor(int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    public async Task ProcessAsync(List<string> dataItems)
    {
        // 创建一个 TransformBlock 用于步骤1的处理,并将结果发送到步骤2的 ActionBlock
        var step1Block = new TransformBlock<string, string>(async dataItem => await Step1(dataItem), new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = maxDegreeOfParallelism,
            CancellationToken = cancellationToken
        });
         
        // 创建一个 ActionBlock 用于步骤2的处理
        var step2Block = new ActionBlock<string>(async dataItem =>
        {
            await Step2(dataItem);
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = maxDegreeOfParallelism,
            CancellationToken = cancellationToken
        });

        // 将 TransformBlock 链接到 ActionBlock
        step1Block.LinkTo(step2Block, new DataflowLinkOptions { PropagateCompletion = true });

        // 启动多个步骤1的任务(生产者)
        foreach (var dataItem in dataItems)
        {
            await step1Block.SendAsync(dataItem, cancellationToken);
        }

        // 完成步骤1的 TransformBlock 的写入
        step1Block.Complete();
        // 等待步骤1的 TransformBlock 处理完成
        await step1Block.Completion;

        // 完成步骤2的 ActionBlock 的写入
        step2Block.Complete();
        // 等待步骤2的 ActionBlock 处理完成
        await step2Block.Completion;
    }

    private async Task<string> Step1(string dataItem)
    {
        // 模拟步骤1的处理(如初步处理数据)
        await Task.Delay(10, cancellationToken);
        Console.WriteLine($"Step1 processed data item: {dataItem}");
        return dataItem;
    }

    private async Task Step2(string dataItem)
    {
        // 模拟步骤2的处理(如进一步处理数据)
        await Task.Delay(10, cancellationToken);
        Console.WriteLine($"Step2 processed data item: {dataItem}");
    }
}

代码解释:

  1. 创建Step1的 TransformBlock:在ProcessAsync方法中,我们首先创建了一个 TransformBlock,用于Step1的处理,TransformBlock 接受一个输入数据项,进行处理后返回一个输出数据项,TransformBlock<string, string>表示输入和输出都是string类型
  2. 创建Step2的 ActionBlock:创建一个 ActionBlock 用于Step2的处理,ActionBlock 接受一个输入数据项并进行处理,但不返回输出数据项。ActionBlock<string>表示输入是string类型
  3. 链接 TransformBlock 和 ActionBlock:将 TransformBlock 链接到 ActionBlock ,以便将Step1的处理结果发送到Step2进行处理,使用LinkTo方法将两个块连接起来,并设置PropagateCompletion为 true,表示当 TransformBlock 完成时,ActionBlock 也会完成
  4. 启动Step1的任务:逐个将数据项发送到 TransformBlock,并等待所有数据处理完成,使用SendAsync方法将数据项发送到 TransformBlock
  5. 等待任务完成:使用Complete方法通知 TransformBlock 不再接收新的数据,并使用Completion属性等待所有数据处理完成。然后完成Step2的 ActionBlock 的写入,并等待Step2的 ActionBlock 处理完成

标签:Core,dataItem,处理,步骤,await,TransformBlock,NET,ActionBlock
From: https://www.cnblogs.com/Tangtang1997/p/18628049

相关文章

  • Windows各个版本对.NET framework各版本的支持情况
    参考:https://mp.weixin.qq.com/s/-Je7dN_k5HyPZdyBuxoomgwindows7.NETFramework4.5是那个不需要KB3063858补丁就能在Windows7上运行的版本。让我澄清并详细说明:.NETFramework在Windows7上的兼容性.NETFramework4.0和4.5:.NETFramework4.0和4.5在Windows7上原生......
  • .NET Core 类型系统(Types System)底层原理浅谈
    C#类型系统C#是一种强类型语言。每个变量和常量都有一个类型,每个求值的表达式也是如此。每个方法声明都为每个输入参数和返回值指定名称、类型和种类(值、引用或输出)。.NET类库定义了内置数值类型和表示各种构造的复杂类型。其中包括文件系统、网络连接、对象的集合和数组以......
  • YOLO11改进-模块-引入空间自适应特征调制网络SAFMN(Spatial Adaptive Feature Modulat
            尽管基于深度学习的解决方案在图像超分辨率(SR)中取得了令人瞩目的重建性能,但这些模型通常较大且架构复杂,使其与许多具有计算和内存限制的低功耗设备不兼容。为了克服这些挑战,我们提出了一种用于高效SR设计的空间自适应特征调制(SAFM)机制。具体来说,SAFM层使用......
  • .NET Freamework 创建windows 服务
    使用.NETFreamework创建windows服务今天有需求需要新写一个windows服务,发现资料找不到了。顺着模板一点一点写,需要对照微软的资料。这里自己重新整理一份,由于不需要使用跨平台,所以我还是使用.NETFramework4.8下的windows服务。微软文档地址如下:如何:创建Windows服务-.......
  • (2-3-01)目标检测与分割:基于PointNet的目标检测与分割+基于Voxel-based的目标检测与分割
    2.3 目标检测与分割LiDAR目标检测与分割是智能驾驶和机器人领域中的重要任务之一,它涉及从激光雷达(LiDAR)扫描数据中提取和识别目标物体。在本节的内容中,将详细讲解常见的LiDAR目标检测与分割算法。2.3.1 基于PointNet的目标检测与分割PointNet算法的发展推动了智能驾驶......
  • docker+kubernetes
    Docker docker 可以在阿里云仓库拉取,需要配置daemon.json dockersearchjava 全文搜索java相关镜像dockerpulljava:8 不输入版本号拉去latest最新版本查询下载dockerimages 看镜像仓库有哪些镜像软件查看具体软件镜像  dockerrun-d-p91:80nginx:......
  • .NET 9 中的 多级缓存 HybridCache
    HybridCache是什么在.NET9中,Microsoft将HybridCache带入了框架体系。HybridCache是一种新的缓存模型,设计用于封装本地缓存和分布式缓存,使用者无需担心选择缓存类型,从而优化性能和维护效率。实际上,HybridCache基于IDistributedCache提供的接口和操作,但增加了一些其他......
  • 【深度学习语义分割】U型网络UNet和双边语义分割网络BiSeNet哪种在滑坡分割中更有优势
    【深度学习语义分割】U型网络UNet和双边语义分割网络BiSeNet哪种在滑坡分割中更有优势?为什么?你会如何选择?【深度学习语义分割】U型网络UNet和双边语义分割网络BiSeNet哪种在滑坡分割中更有优势?为什么?你会如何选择?文章目录【深度学习语义分割】U型网络UNet和双边语义分......
  • .NET 阻止系统睡眠/息屏
    本文介绍Windows系统设备下如何阻止系统睡眠/息屏,以及想看当前阻止睡眠/息屏的应用信息powercfg/requests查看活动列表在播放音乐时,我们会发现设置了系统电源管理-自动睡眠,计划不会生效,这个音频播放操作阻止了系统自动睡眠。但不会阻止息屏,所以Windows下一般屏幕关闭是不影响音......
  • CS305 Computer Networks
    CS305ComputerNetworksProject:RemoteMeetingIntroductionVideomeetingshavebecomeessentialintoday’sdigitallandscape,transforminghowpeopleconnect,collaborate,andcommunicate.Withtheriseofremotework,onlinelearning,andglobalpartne......