首页 > 编程语言 >C# 自定义固定数量线程池

C# 自定义固定数量线程池

时间:2022-11-17 11:46:47浏览次数:41  
标签:task 自定义 C# System 线程 new using public

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace A9AgentApp.Task
{
    public class CustomThread
    {
        #region Variable
        //一个AutoResetEvent实例
        private AutoResetEvent _locks = new AutoResetEvent(false);

        //一个Thread实例
        private Thread _thread;

        // 绑定回调方法,就是外部实际执行的任务
        public WaitCallback _taskAction;

        //定义一个事件用来绑定工作完成后的操作
        public event Action<CustomThread> WorkCompleted;

        /// <summary>
        ///设置线程拥有的Key
        /// </summary>
        public string Key { get; set; }

        /// <summary>
        /// 当前线程消息输出Action
        /// </summary>
        public Action<String> MessageWriterAction;
        #endregion

        //线程需要做的工作
        private void Work()
        {
            while (true)
            {
                //判断信号状态,如果有set那么 _locks.WaitOne()后的程序就继续执行
                _locks.WaitOne();
                // 将当前线程实例传到Action里
                _taskAction(this);
                // Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + "workComplete");
                //执行完成事件
                WorkCompleted(this);
            }
        }

        #region event
        //构造函数
        public CustomThread()
        {
            // 初始化线程
            _thread = new Thread(Work);
            _thread.IsBackground = true;
            Key = Guid.NewGuid().ToString();
            //线程开始执行
            _thread.Start();
            Console.WriteLine("Thread:" + _thread.ManagedThreadId + " has been created!");
        }

        //Set开起信号
        public void Active()
        {
            _locks.Set();
        }

        #endregion
    }
    public class CustomFixedThreadPool
    {
        #region Variable
        //创建的线程数
        private int TreadCount = 6;
        //空闲线程队列
        private Queue<CustomThread> _freeThreadQueue;
        //工作线程字典(为什么?)
        private Dictionary<string, CustomThread> _workingDictionary;
        //空闲队列,存放需要被执行的外部函数
        private Queue<WaitCallback> _waitTaskQueue;

        #endregion

        #region Event
        /// <summary>
        /// 自定义线程池的构造函数
        /// </summary>
        /// <param name="MessageWriteActions">线程输出消息方法列表</param>
        public CustomFixedThreadPool(List<Action<String>> MessageWriteActions)
        {
            _workingDictionary = new Dictionary<string, CustomThread>();
            _freeThreadQueue = new Queue<CustomThread>();
            _waitTaskQueue = new Queue<WaitCallback>();

            CustomThread task = null;
            //产生固定数目的线程
            for (int i = 0; i < TreadCount; i++)
            {
                task = new CustomThread();
                //给每一个任务绑定事件
                if (MessageWriteActions.Any())
                {
                    task.MessageWriterAction = MessageWriteActions[i % MessageWriteActions.Count];
                }
                else
                {
                    task.MessageWriterAction = (msg) => { };
                }
                task.WorkCompleted += new Action<CustomThread>(WorkComplete);
                //将每一个新创建的线程放入空闲队列中
                _freeThreadQueue.Enqueue(task);
            }
        }

        //线程任务完成之后的工作
        void WorkComplete(CustomThread obj)
        {
            lock (this)
            {
                //将线程从字典中排除
                _workingDictionary.Remove(obj.Key);
                //将该线程放入空闲队列
                _freeThreadQueue.Enqueue(obj);

                //判断是否等待队列中有任务未完成
                if (_waitTaskQueue.Count > 0)
                {
                    //取出一个任务
                    WaitCallback item = _waitTaskQueue.Dequeue();
                    CustomThread newTask = null;
                    //空闲队列中取出一个线程
                    newTask = _freeThreadQueue.Dequeue();
                    // 线程执行任务
                    newTask._taskAction = item;
                    //把线程放入到工作队列当中
                    _workingDictionary.Add(newTask.Key, newTask);
                    //设置信号量
                    newTask.Active();
                    return;
                }
                else
                {
                    return;
                }
            }
        }

        //添加任务到线程池
        public void AddTaskItem(WaitCallback taskItem)
        {
            lock (this)
            {
                CustomThread task = null;
                //判断空闲队列是否存在线程
                if (_freeThreadQueue.Count > 0)
                {
                    //存在线程,取出一个线程
                    task = _freeThreadQueue.Dequeue();
                    //将该线程放入工作队列
                    _workingDictionary.Add(task.Key, task);
                    //执行传入的任务
                    task._taskAction = taskItem;
                    //设置信号量
                    task.Active();
                    return;
                }
                else
                {
                    //空闲队列中没有空闲线程,就把任务放到等待队列中
                    _waitTaskQueue.Enqueue(taskItem);
                    return;
                }
            }
        }
        #endregion
    }
   
}

  

2.使用方法 (初始化线程池)

 infoLabels = new List<Label>() { this.lbl_info1, this.lbl_info2, this.lbl_info3, this.lbl_info4, this.lbl_info5, this.lbl_info6 };
            List<Action<String>> infoLabelWriters = new List<Action<string>>();
            foreach(Label label in infoLabels)
            {
                infoLabelWriters.Add((msg) => label.CrossThreadRun(()=>label.Text = msg));
            }

            customThreadPool = new CustomFixedThreadPool(infoLabelWriters);

3. 使用方法(完成任务)

for(int i = 0; i < 10; i++)
            {
                customThreadPool.AddTaskItem((t) =>
                {
                    var msgWriter = ((CustomThread)t).MessageWriterAction;
                    for(int j = 0; j < 1000; j++)
                    {
                        msgWriter("正在执行第"+(1+j).ToString()+"/1000 条");
                        Thread.Sleep(10);
                    }
                    msgWriter("执行完成!");
                });
            }

 

4. 辅助类

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace A9AgentApp.utils
{
   public static class ControlUtil
    {
        /// <summary>
     /// 跨线程访问控件 在控件上执行委托
     /// </summary>
     /// <param name="ctl">控件</param>
     /// <param name="del">执行的委托</param>
        public static void CrossThreadRun(this Control ctl, ThreadStart process)
        {
            if (process == null) return;
            if (ctl.InvokeRequired) { ctl.Invoke(process, null); }
            else
            {
                process();
            }
        }

        /// <summary>
        /// 跨线程访问控件 在控件上执行委托
        /// </summary>
        /// <param name="ctl">控件</param>
        /// <param name="del">执行的委托</param>
        public static Object CrossThreadCall (this Control ctl, Func<Object> func)
        {
            if (ctl.InvokeRequired) {return  ctl.Invoke(func, null); }
            else
            {
               return func();
            }
        }
    }
}

 

标签:task,自定义,C#,System,线程,new,using,public
From: https://www.cnblogs.com/zhshlimi/p/16898941.html

相关文章

  • JavaScript语法特殊语法和流程控制语句以及练习99乘法表
    JavaScript语法_特殊语法1.语句以;结尾,如果一行只有一条语句则;可以省略(不建议)2.变量的定义使用var关键字,也可以不使用用:定义的变量是局部变量不用:定义对的变量......
  • C/C++学生综合测评系统
    C/C++学生综合测评系统项目十六学生综合测评系统(2周)[目的要求]学生根据所学的程序设计基础原理与程序设计的知识,能够针对-一个小型的程序设计基础管理系统,进行系统的......
  • Veeam Backup & Replication 文件恢复操作
    1、进行备份的文件恢复还原       通过复制的方法将文件恢复出来,然后复制到对应的目录下     ......
  • @EnableFeignClients的使用
    使用feign调用也不报错,也没反应,后来发现是启动没有加@EnableFeignClients注解。 在Springcloud应用中,当我们要使用feign客户端时,一般要做以下三件事情:1.使用注解@En......
  • VScode修改settings.json无法屏蔽flake8报错的解决方案
    许多情况下,我们都希望修改Flake8的报错机制,如屏蔽太长报错等操作。网上的主流方法:修改settings.json(用户或工作区),在其中添加如下代码即可:"python.linting.enabled":......
  • Kubernetes(K8S) 安装Nacos,报 No DataSource set
    原因,数据库为MySQL5.7需要在yaml加上参数mysql.db.param:"characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false&server......
  • mysql-navicat主键递增清零
    在navicat上设置没有用必须使用sql语句清零MySQL自增ID起始值修改方法-52php-博客园(cnblogs.com)mysql自增主键清零方法-木西-Muxy-博客园(cnblogs.com)......
  • nginx location
    location顺序/优先级:(location=)>(location完整路径)>(location^~路径)>(location~,~*正则顺序)>(location部分起始路径)>(/)解释说明(下面会有详解):=......
  • 自学 TypeScript 第二天 编译选项
    前言:昨天我们学习了TS的数据类型,不知道大家回去以后练习没练习,如果你练习了一定会发现一个问题,我们的TS好像和JS不太一样JS写完之后直接就可以放到页面上,就可以用......
  • Zerocash安全性分析
    本文对Zerocash做了简单分析,主要内容参考Zerocash原文。完整文件下载......