首页 > 编程语言 >C#单线程环境下实现每秒百万级调度

C#单线程环境下实现每秒百万级调度

时间:2024-11-19 21:40:07浏览次数:1  
标签:generator C# await Worker children 单线程 go async 每秒

C# 并发控制框架:单线程环境下实现每秒百万级调度 

阅读目录

前言

在工业自动化和机器视觉领域,对实时性、可靠性和效率的要求越来越高。为了满足这些需求,我们开发了一款专为工业自动化运动控制和机器视觉流程开发设计的 C# 并发流程控制框架。

该框架不仅适用于各种工业自动化场景,还能在单线程环境下实现每秒百万次以上的调度频率,从而从容应对涉及成千上万输入输出点数的复杂任务。

并发流程控制框架

本框架提供一种全新的并发流程控制框架,它借鉴了Golang语言中的高效并发模式,并在此基础上进行了必要的功能扩展。框架不仅能够支持自定义的单/多线程调度机制,还允许在主UI线程中进行调度,从而简化了逻辑与用户界面之间的交互。

另外,该框架还集成了高精度定时器、可配置的调度优先级、逻辑停止与暂停等功能,让我们能够更加灵活地管理和控制复杂的自动化流程。

框架优势

  • 相较于传统模型:相对于传统的多线程模型、状态机模型以及类PLC模型,本框架具有更加紧凑清晰的逻辑结构,显著提升了开发效率,并简化了后续的维护与升级过程。
  • 受Go语言启发:框架的设计借鉴了 Go 语言中的高效并发模式,并在此基础上进行了必要的功能扩展,以适应工业自动化领域的具体需求。
  • 灵活的调度机制:支持自定义单线程或多线程调度,同时也可在主 UI 线程中进行调度,便于逻辑与用户界面的交互,增强了用户体验。
  • 丰富的内置功能:内置高精度定时器、可配置的调度优先级、逻辑停止与逻辑暂停功能,确保任务执行的准确性和可控性。
  • 树形多任务调度:采用树形结构管理多任务调度,提高了逻辑的可靠性和系统的整体稳定性。
  • 卓越的性能表现:在单线程环境下,每秒可实现超过一百万次的调度频率,能够从容应对成千上万输入输出点数的复杂场景。
  • 广泛的实践验证:该框架已在多个实际项目中成功应用,证明了其稳定性和可靠性。

框架示例

代码中定义了一系列不同的任务执行模式,展示如何通过不同的调度策略来管理并发任务。

  • 全局变量

static shared_strand strand:全局共享的调度器,用于保证线程安全。

  • 日志记录函数

Log(string msg):记录带有时间戳的日志信息到控制台。

  • 工作任务函数

Worker(string name, int time = 1000):模拟一个简单的任务,该任务会在指定的毫秒数后打印一条消息。

  • 主函数

MainWorker():异步主任务函数,依次调用前面定义的各种任务模式。

Main(string[] args):程序入口点,初始化工作服务、共享调度器,并启动主任务。

复制代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Go;

namespace WorkerFlow
{
    class Program
    {
        static shared_strand strand;

        static void Log(string msg)
        {
            Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")} {msg}");
        }

        static async Task Worker(string name, int time = 1000)
        {
            await generator.sleep(time);
            Log(name);
        }

        //1 A、B、C依次串行
        //A->B->C
        static async Task Worker1()
        {
            await Worker("A");
            await Worker("B");
            await Worker("C");
        }

        //2 A、B、C全部并行,且依赖同一个strand(隐含参数,所有依赖同一个strand的任务都是线程安全的)
        //A
        //B
        //C
        static async Task Worker2()
        {
            generator.children children = new generator.children();
            children.go(() => Worker("A"));
            children.go(() => Worker("B"));
            children.go(() => Worker("C"));
            await children.wait_all();
        }

        //3 A执行完后,B、C再并行
        //  -->B
        //  |
        //A->
        //  |
        //  -->C
        static async Task Worker3()
        {
            await Worker("A");
            generator.children children = new generator.children();
            children.go(() => Worker("B"));
            children.go(() => Worker("C"));
            await children.wait_all();
        }

        //4 B、C都并行执行完后,再执行A
        //B--
        //  |
        //  -->A
        //  |
        //C--
        static async Task Worker4()
        {
            generator.children children = new generator.children();
            children.go(() => Worker("B"));
            children.go(() => Worker("C"));
            await children.wait_all();
            await Worker("A");
        }

        //5 B、C任意一个执行完后,再执行A
        //B--
        //  |
        //  >-->A
        //  |
        //C--
        static async Task Worker5()
        {
            generator.children children = new generator.children();
            var B = children.tgo(() => Worker("B", 1000));
            var C = children.tgo(() => Worker("C", 2000));
            var task = await children.wait_any();
            if (task == B)
            {
                Log("B成功");
            }
            else
            {
                Log("C成功");
            }
            await Worker("A");
        }

        //6 等待一个特定任务
        static async Task Worker6()
        {
            generator.children children = new generator.children();
            var A = children.tgo(() => Worker("A"));
            var B = children.tgo(() => Worker("B"));
            await children.wait(A);
        }

        //7 超时等待一个特定任务,然后中止所有任务
        static async Task Worker7()
        {
            generator.children children = new generator.children();
            var A = children.tgo(() => Worker("A", 1000));
            var B = children.tgo(() => Worker("B", 2000));
            if (await children.timed_wait(1500, A))
            {
                Log("成功");
            }
            else
            {
                Log("超时");
            }
            await children.stop();
        }

        //8 超时等待一组任务,然后中止所有任务
        static async Task Worker8()
        {
            generator.children children = new generator.children();
            children.go(() => Worker("A", 1000));
            children.go(() => Worker("B", 2000));
            var tasks = await children.timed_wait_all(1500);
            await children.stop();
            Log($"成功{tasks.Count}个");
        }

        //9 超时等待一组任务,然后中止所有任务,且在中止任务中就地善后处理
        static async Task Worker9()
        {
            generator.children children = new generator.children();
            children.go(() => Worker("A", 1000));
            children.go(async delegate ()
            {
                try
                {
                    await Worker("B", 2000);
                }
                catch (generator.stop_exception)
                {
                    Log("B被中止");
                    await generator.sleep(500);
                    throw;
                }
                catch (System.Exception)
                {
                }
            });
            var task = await children.timed_wait_all(1500);
            await children.stop();
            Log($"成功{task.Count}个");
        }

        //10 嵌套任务
        static async Task Worker10()
        {
            generator.children children = new generator.children();
            children.go(async delegate ()
            {
                generator.children children1 = new generator.children();
                children1.go(() => Worker("A"));
                children1.go(() => Worker("B"));
                await children1.wait_all();
            });
            children.go(async delegate ()
            {
                generator.children children1 = new generator.children();
                children1.go(() => Worker("C"));
                children1.go(() => Worker("D"));
                await children1.wait_all();
            });
            await children.wait_all();
        }

        //11 嵌套中止
        static async Task Worker11()
        {
            generator.children children = new generator.children();
            children.go(() => Worker("A", 1000));
            children.go(async delegate ()
            {
                try
                {
                    generator.children children1 = new generator.children();
                    children1.go(async delegate ()
                    {
                        try
                        {
                            await Worker("B", 2000);
                        }
                        catch (generator.stop_exception)
                        {
                            Log("B被中止1");
                            await generator.sleep(500);
                            throw;
                        }
                        catch (System.Exception)
                        {
                        }
                    });
                    await children1.wait_all();
                }
                catch (generator.stop_exception)
                {
                    Log("B被中止2");
                    throw;
                }
                catch (System.Exception)
                {
                }
            });
            await generator.sleep(1500);
            await children.stop();
        }

        //12 并行执行且等待一组耗时算法
        static async Task Worker12()
        {
            wait_group wg = new wait_group();
            for (int i = 0; i < 2; i++)
            {
                wg.add();
                int idx = i;
                var _ = Task.Run(delegate ()
                {
                    try
                    {
                        Log($"执行算法{idx}");
                    }
                    finally
                    {
                        wg.done();
                    }
                });
            }
            await wg.wait();
            Log("执行算法完成");
        }

        //13 串行执行耗时算法,耗时算法必需放在线程池中执行,否则依赖同一个strand的调度将不能及时
        static async Task Worker13()
        {
            for (int i = 0; i < 2; i++)
            {
                await generator.send_task(() => Log($"执行算法{i}"));
            }
        }

        static async Task MainWorker()
        {
            await Worker1();
            await Worker2();
            await Worker3();
            await Worker4();
            await Worker5();
            await Worker6();
            await Worker7();
            await Worker8();
            await Worker9();
            await Worker10();
            await Worker11();
            await Worker12();
            await Worker13();
        }

        static void Main(string[] args)
        {
            work_service work = new work_service();
            strand = new work_strand(work);
            generator.go(strand, MainWorker);
            work.run();
            Console.ReadKey();
        }
    }
}
复制代码

框架地址

总结

值得一提的是,该框架特别设计用于工业自动化运动控制以及机器视觉流程开发领域,其独特的树形多任务调度机制极大提高了逻辑的可靠性,同时单线程环境下的每秒调度次数可达一百万次以上,足以应对涉及成千上万输入输出点数的应用场景。经过多个项目的实际验证,证明了其稳定性和可靠性,为工业自动化提供了强有力的支持。

通过本文的介绍,希望能为工业自动化领域的开发者提供一个高效、可靠且易于使用的工具。借助这一工具,大家在构建复杂的控制系统时,能够更加轻松地应对并发处理的挑战。也期待您在评论区留言交流,分享您的宝贵经验和建议。

   

标签:generator,C#,await,Worker,children,单线程,go,async,每秒
From: https://www.cnblogs.com/Leo_wl/p/18555669

相关文章

  • python:reflection 反射
    C#面象对象的语言也有类似的反射  reflection便于根据配置文件,方便切换数据库。切换DAL层即可。#encoding:utf-8#版权所有2024©涂聚文有限公司#许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎#描述:#Author:geovindu,GeovinDu涂聚文.#IDE......
  • 嵌入式安卓开发使用LLDB进行断点调试C/C++代码
    GDBorLLDB?较新的安卓NDK已经没有包括gdbserver了,而且安卓官网也说了,后续不会支持gdb了。我自己之前费了很大的功夫,去交叉编译一个gdbserver,但最后用起来一大堆莫名其妙的问题。所以还是使用LLDB吧。获取相应的工具https://developer.android.google.cn/ndk/downloads这里下......
  • CF 1253 题解
    CF1253题解ASinglePush考虑令\(d_i=b_i-a_i\),那么合法当且仅当\(d\)在一个前缀和一个后缀都是\(0\),其余地方值一致并且非负.BSillyMistake注意到能作一次划分的时候立即划分一定更优,因为这样就不会因为潜在的一天两次进入办公室而得不到答案.贪心的模拟即可.......
  • 流和向量(Streams and Vectors)
    在GNURadio的官方教程中,提到了两个重要的块连接方式:流和向量(StreamsandVectors)。具体的章节链接为:......
  • oracle dataguard学习和各版本DG新特性介绍
    oracledataguard学习和各版本DG新特性介绍DataGuard概述:OracleDataGuard是Oracle自带的数据同步功能,可以提供Oracle数据库的冗灾、数据保护、故障恢复等,实现数据库快速切换与灾难性恢复。DataGuard数据同步技术有以下优势:1)Oracle数据库自身内置的功能,与每个Oracle新......
  • 用Oracle的经验优化达梦数据库?我被经验误导了!
    说明:本文只是记录近期学习达梦数据库的一些内容,并没有说达梦数据库有什么坑,希望有些所谓的"友商"不要恶意抹黑,实际上在我接触过的几家国产数据库中,个人认为达梦数据库无论是产品成熟度,还是技术社区、兼容性、运维习惯等都是名列前茅的,至少对曾经的OracleDBA来讲,是比较友好......
  • 2025最新-计算机毕业设计Java基于kubenetes的OpenStack私有云平台部署
    一、项目介绍  基于K8S的opoenstack私有云平台的监测系统通过对Web应用服务器运行情况的分析统计系统的建设以实现服务器运行数据监控与分析功能。私有云平台是web应用正常运行的核心,为了确保这些网站的稳定运行,势必需要做好对网站服务器的监控。做好对服务器运行的各......
  • JAVA反序列化学习-CommonsCollections6(基于ysoserial)
    环境准备JDK1.8(8u421)我以本地的JDK8版本为准、commons-collections(3.x4.x均可这里使用3.2版本)cc3.2:<dependency><groupId>commons-collections</groupId><artifactId>commons-collections</artifactId><version>3.2</version>&l......
  • Docker Swarm 核心概念及详细使用
    DockerSwarm核心概念及详细使用DockerSwarm介绍DockerSwarm是Docker的原生集群管理工具。它的主要作用是将多个Docker主机集成到一个虚拟的Docker主机中,为Docker容器提供集群和调度功能。通过DockerSwarm,您可以轻松地管理多个Docker主机,并能在这些主机上调度......
  • Flask上传服务器,conda环境配置(都怪torch)
    Flask上传服务器,conda环境配置(都怪pytorch)问题:有个任务,将一个flask的服务换到另一个服务器上,之前的服务器一直用国内的镜像源可以直接下载镜像,当这个服务器设置相同的镜像源也下载不了,一直卡在solvingenvironment这一步。找到一个方法,可以直接复制整个虚拟环境到另一个服务器上......