首页 > 编程语言 >C# 并发控制框架:单线程环境下实现每秒百万级调度

C# 并发控制框架:单线程环境下实现每秒百万级调度

时间:2024-10-16 11:02:41浏览次数:8  
标签:generator 并发 C# await Worker 单线程 go async children

前言

在工业自动化和机器视觉领域,对实时性、可靠性和效率的要求越来越高。为了满足这些需求,我们开发了一款专为工业自动化运动控制和机器视觉流程开发设计的 C# 并发流程控制框架。

该框架不仅适用于各种工业自动化场景,还能在单线程环境下实现每秒百万次以上的调度频率,从而从容应对涉及成千上万输入输出点数的复杂任务。

并发流程控制框架

本框架提供一种全新的并发流程控制框架,它借鉴了Golang语言中的高效并发模式,并在此基础上进行了必要的功能扩展。框架不仅能够支持自定义的单/多线程调度机制,还允许在主UI线程中进行调度,从而简化了逻辑与用户界面之间的交互。

另外,该框架还集成了高精度定时器、可配置的调度优先级、逻辑停止与暂停等功能,让我们能够更加灵活地管理和控制复杂的自动化流程。

框架优势

  • 相较于传统模型:相对于传统的多线程模型、状态机模型以及类PLC模型,本框架具有更加紧凑清晰的逻辑结构,显著提升了开发效率,并简化了后续的维护与升级过程。
  • 受Go语言启发:框架的设计借鉴了 Go 语言中的高效并发模式,并在此基础上进行了必要的功能扩展,以适应工业自动化领域的具体需求。
  • 灵活的调度机制:支持自定义单线程或多线程调度,同时也可在主 UI 线程中进行调度,便于逻辑与用户界面的交互,增强了用户体验。
  • 丰富的内置功能:内置高精度定时器、可配置的调度优先级、逻辑停止与逻辑暂停功能,确保任务执行的准确性和可控性。
  • 树形多任务调度:采用树形结构管理多任务调度,提高了逻辑的可靠性和系统的整体稳定性。
  • 卓越的性能表现:在单线程环境下,每秒可实现超过一百万次的调度频率,能够从容应对成千上万输入输出点数的复杂场景。
  • 广泛的实践验证:该框架已在多个实际项目中成功应用,证明了其稳定性和可靠性。

框架示例

代码中定义了一系列不同的任务执行模式,展示如何通过不同的调度策略来管理并发任务。

  • 全局变量

static shared_strand strand:全局共享的调度器,用于保证线程安全。

  • 日志记录函数

Log(string msg):记录带有时间戳的日志信息到控制台。

  • 工作任务函数

Worker(string name, int time = 1000):模拟一个简单的任务,该任务会在指定的毫秒数后打印一条消息。

  • 主函数

MainWorker():异步主任务函数,依次调用前面定义的各种任务模式。

Main(string[] args):程序入口点,初始化工作服务、共享调度器,并启动主任务。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Go;

namespace WorkerFlow
{
    class Program
    {
        static shared_strand strand;

        static void Log(string msg)
        {
            Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")} {msg}");
        }

        static async Task Worker(string name, int time = 1000)
        {
            await generator.sleep(time);
            Log(name);
        }

        //1 A、B、C依次串行
        //A->B->C
        static async Task Worker1()
        {
            await Worker("A");
            await Worker("B");
            await Worker("C");
        }

        //2 A、B、C全部并行,且依赖同一个strand(隐含参数,所有依赖同一个strand的任务都是线程安全的)
        //A
        //B
        //C
        static async Task Worker2()
        {
            generator.children children = new generator.children();
            children.go(() => Worker("A"));
            children.go(() => Worker("B"));
            children.go(() => Worker("C"));
            await children.wait_all();
        }

        //3 A执行完后,B、C再并行
        //  -->B
        //  |
        //A->
        //  |
        //  -->C
        static async Task Worker3()
        {
            await Worker("A");
            generator.children children = new generator.children();
            children.go(() => Worker("B"));
            children.go(() => Worker("C"));
            await children.wait_all();
        }

        //4 B、C都并行执行完后,再执行A
        //B--
        //  |
        //  -->A
        //  |
        //C--
        static async Task Worker4()
        {
            generator.children children = new generator.children();
            children.go(() => Worker("B"));
            children.go(() => Worker("C"));
            await children.wait_all();
            await Worker("A");
        }

        //5 B、C任意一个执行完后,再执行A
        //B--
        //  |
        //  >-->A
        //  |
        //C--
        static async Task Worker5()
        {
            generator.children children = new generator.children();
            var B = children.tgo(() => Worker("B", 1000));
            var C = children.tgo(() => Worker("C", 2000));
            var task = await children.wait_any();
            if (task == B)
            {
                Log("B成功");
            }
            else
            {
                Log("C成功");
            }
            await Worker("A");
        }

        //6 等待一个特定任务
        static async Task Worker6()
        {
            generator.children children = new generator.children();
            var A = children.tgo(() => Worker("A"));
            var B = children.tgo(() => Worker("B"));
            await children.wait(A);
        }

        //7 超时等待一个特定任务,然后中止所有任务
        static async Task Worker7()
        {
            generator.children children = new generator.children();
            var A = children.tgo(() => Worker("A", 1000));
            var B = children.tgo(() => Worker("B", 2000));
            if (await children.timed_wait(1500, A))
            {
                Log("成功");
            }
            else
            {
                Log("超时");
            }
            await children.stop();
        }

        //8 超时等待一组任务,然后中止所有任务
        static async Task Worker8()
        {
            generator.children children = new generator.children();
            children.go(() => Worker("A", 1000));
            children.go(() => Worker("B", 2000));
            var tasks = await children.timed_wait_all(1500);
            await children.stop();
            Log($"成功{tasks.Count}个");
        }

        //9 超时等待一组任务,然后中止所有任务,且在中止任务中就地善后处理
        static async Task Worker9()
        {
            generator.children children = new generator.children();
            children.go(() => Worker("A", 1000));
            children.go(async delegate ()
            {
                try
                {
                    await Worker("B", 2000);
                }
                catch (generator.stop_exception)
                {
                    Log("B被中止");
                    await generator.sleep(500);
                    throw;
                }
                catch (System.Exception)
                {
                }
            });
            var task = await children.timed_wait_all(1500);
            await children.stop();
            Log($"成功{task.Count}个");
        }

        //10 嵌套任务
        static async Task Worker10()
        {
            generator.children children = new generator.children();
            children.go(async delegate ()
            {
                generator.children children1 = new generator.children();
                children1.go(() => Worker("A"));
                children1.go(() => Worker("B"));
                await children1.wait_all();
            });
            children.go(async delegate ()
            {
                generator.children children1 = new generator.children();
                children1.go(() => Worker("C"));
                children1.go(() => Worker("D"));
                await children1.wait_all();
            });
            await children.wait_all();
        }

        //11 嵌套中止
        static async Task Worker11()
        {
            generator.children children = new generator.children();
            children.go(() => Worker("A", 1000));
            children.go(async delegate ()
            {
                try
                {
                    generator.children children1 = new generator.children();
                    children1.go(async delegate ()
                    {
                        try
                        {
                            await Worker("B", 2000);
                        }
                        catch (generator.stop_exception)
                        {
                            Log("B被中止1");
                            await generator.sleep(500);
                            throw;
                        }
                        catch (System.Exception)
                        {
                        }
                    });
                    await children1.wait_all();
                }
                catch (generator.stop_exception)
                {
                    Log("B被中止2");
                    throw;
                }
                catch (System.Exception)
                {
                }
            });
            await generator.sleep(1500);
            await children.stop();
        }

        //12 并行执行且等待一组耗时算法
        static async Task Worker12()
        {
            wait_group wg = new wait_group();
            for (int i = 0; i < 2; i++)
            {
                wg.add();
                int idx = i;
                var _ = Task.Run(delegate ()
                {
                    try
                    {
                        Log($"执行算法{idx}");
                    }
                    finally
                    {
                        wg.done();
                    }
                });
            }
            await wg.wait();
            Log("执行算法完成");
        }

        //13 串行执行耗时算法,耗时算法必需放在线程池中执行,否则依赖同一个strand的调度将不能及时
        static async Task Worker13()
        {
            for (int i = 0; i < 2; i++)
            {
                await generator.send_task(() => Log($"执行算法{i}"));
            }
        }

        static async Task MainWorker()
        {
            await Worker1();
            await Worker2();
            await Worker3();
            await Worker4();
            await Worker5();
            await Worker6();
            await Worker7();
            await Worker8();
            await Worker9();
            await Worker10();
            await Worker11();
            await Worker12();
            await Worker13();
        }

        static void Main(string[] args)
        {
            work_service work = new work_service();
            strand = new work_strand(work);
            generator.go(strand, MainWorker);
            work.run();
            Console.ReadKey();
        }
    }
}

框架地址

总结

值得一提的是,该框架特别设计用于工业自动化运动控制以及机器视觉流程开发领域,其独特的树形多任务调度机制极大提高了逻辑的可靠性,同时单线程环境下的每秒调度次数可达一百万次以上,足以应对涉及成千上万输入输出点数的应用场景。经过多个项目的实际验证,证明了其稳定性和可靠性,为工业自动化提供了强有力的支持。

通过本文的介绍,希望能为工业自动化领域的开发者提供一个高效、可靠且易于使用的工具。借助这一工具,大家在构建复杂的控制系统时,能够更加轻松地应对并发处理的挑战。也期待您在评论区留言交流,分享您的宝贵经验和建议。

最后

如果你觉得这篇文章对你有帮助,不妨点个赞支持一下!你的支持是我继续分享知识的动力。如果有任何疑问或需要进一步的帮助,欢迎随时留言。

也可以加入微信公众号[DotNet技术匠] 社区,与其他热爱技术的同行一起交流心得,共同成长!优秀是一种习惯,欢迎大家留言学习!

标签:generator,并发,C#,await,Worker,单线程,go,async,children
From: https://www.cnblogs.com/1312mn/p/18460025

相关文章

  • Exchange 2016与国内版O365混合部署(1):过程总览
    Exchange2016与国内版O365混合部署(1):过程总览写在前面:关于Exchange与O365做混合部署,其实网上有很多相关的文章和介绍,笔者Exchange2010和2016都搭过,与2010比,Exchange2016在软件架构设计上做了很多精简和变化,因此安装和后续的配置也会更简单易行,如果你是一个初学者,这个系列可以让......
  • Office365与本地Exchange混合部署之邮件流介绍详情
    Office365与本地Exchange混合部署之邮件流介绍详情Office365与本地Exchange混合部署之邮件流介绍详情我们前面的文章中有介绍了本地Exchange与Office365实现Exchange服务混合部署。然后在配置了混合部署后遇到了一些邮件投递错误及迁移用户的相关错误,都陆续解决了,同时都做了相......
  • 清理Exchange 2013和2016的Log文件(精华)
    清理Exchange2013和2016的Log文件(精华)清理Exchange2013和2016的Log文件【摘要】在你的Exchange2013/2016的环境中,你可能会发现你的系统盘会很快被占用了很多空间,并且如果你不理会它的话,很快你的系统盘剩余空间就会告急了。这是因为Exchange2013/2016默认的日志记录行为导......
  • CSP2024 前集训:csp-s模拟11
    前言T1挂了,后面几道赛时都不那么可做,T2读假题了浪费太多时间,T3没调出来。T4是原,但是整个机房只有一个人当时改了,所以还是没人写,因为T4是原,还加了个T5,也不太可做。T1玩水对于一个点\((i,j)\),若\(s_{i+1,j}=s_{i,j+1}\)则称其为分点,若一个分店后面还有分点或两个分......
  • Exchange2016日志路径
    Exchange2016日志路径Exchange2016日志路径C:\ProgramFiles\Microsoft\ExchangeServer\V15\Logging\下面的日志:               C:\ProgramFiles\Microsoft\ExchangeServer\V15\Logging\RpcHttp              C:......
  • Cisco DNS view
    在配置vpn的路由器上,通过设置网关为dns,实现内外网dns解析分流ipdhcppooldatanetwork192.168.1.0255.255.255.0default-router192.168.1.1dns-server192.168.1.1!!ipname-server114.114.114.114  --定义外网dnsserveripname-server172.16.186.17    ---......
  • CiscoASA FQDN配置
    CiscoASAFQDN配置(ios8.4及以上版本支持)dnsdomain-lookupDMZdnsserver-groupDefaultDNSname-server172.29.12.25name-server172.29.12.23domain-nameeascs.comobjectnetworkBAIDUfqdnv4www.baidu.comobject-groupnetworkWeb_Domainnetwork-objectobjectBAIDU......
  • cisco nexus7000 基本系统配置及OTV
    cisconexus7000基本系统配置1.开启cdpcdpenablecdpformatdevice-idsystem-name默认是对端设备的设备名2.ntp开启普通vdc2下开启ntp同步,先在defaultvdc上打上clockprotocolntpvdc2DC1-N7K-2(config)#ntpserver10.1.1.1use-vrfmannagementDC1-N7K-2(config)#ntpso......
  • HUWEI AC6508 portal认证
    aaa accounting-schemeacc_ningdun  accounting-moderadius  accountingrealtime3 authentication-schemeauth_ningdun  authentication-moderadiusmac-access-profilenamenington mac-authenreauthenticate mac-authentimerreauthenticate-period......
  • CNCC | 超强阵容!CCF-网易雷火联合基金研讨会嘉宾率先揭晓
    CNCC2024将于10月24日至26日在浙江省东阳市横店镇举办,本次大会主题为“发展新质生产力,计算引领未来”。作为大会的重要组成部分,“CNCC|从游戏AI到AOP:虚实融合助推新质生产力”主题分论坛将于10月26日(下周六)13:00至18:00举办。此次分论坛汇聚了众多行业内重量级嘉宾,他们将分......