有的时候咱们需要循环执行业务,如果单以处理过程不是计算密集型,就可以使用多线程并行处理,这样能大幅度提高执行效率
最开始我是想着有没有现成的,结果找了半天没发现有现成的,于是就自己封装了一个,简单测试了一下发现没啥问题
异步并行执行器
/// <summary> /// 异步并行执行器,可以将任务并行执行,提高执行速度 /// </summary> public class AsyncParallelExecutor<T> { /// <summary> /// 执行过程 /// </summary> /// <param name="progressInfo">进度信息</param> /// <param name="arg">参数数据</param> /// <returns>异步执行的任务</returns> public delegate Task ProcessDelegate(string progressInfo, T arg); /// <summary> /// 执行的任务总数量 /// </summary> readonly int _totalCount; readonly object _lock = new object(); readonly Queue<T> queue = new Queue<T>(); readonly int _parallelCount; readonly ProcessDelegate _process; readonly CancellationToken _token; readonly Action<string, Exception> _output; /// <summary> /// 创建一个异步并行执行器 /// </summary> /// <param name="args">任务的参数集合</param> /// <param name="process">对数据进行处理的过程</param> /// <param name="token">取消令牌</param> /// <param name="output">信息输出</param> /// <param name="parallelCount">执行的异步数量</param> public AsyncParallelExecutor(IEnumerable<T> args, ProcessDelegate process, CancellationToken token, Action<string, Exception> output, int parallelCount = 2) { foreach (var arg in args) { queue.Enqueue(arg); } _totalCount = queue.Count; _process = process; _token = token; _output = output; if (_output == null) { _output = (str, ex) => { }; } _parallelCount = parallelCount; _output($"共{_totalCount}个任务,将使用{_parallelCount}个异步任务并行处理", null); } /// <summary> /// 开始执行异步并行任务 /// </summary> public async Task Start() { Task[] tasks = new Task[_parallelCount]; for (int i = 0; i < _parallelCount; i++) { tasks[i] = AsyncTask(i); } await Task.WhenAll(tasks); } /// <summary> /// 异步函数,不会阻塞 /// </summary> Task AsyncTask(int progressID) { return Task.Run(async () => { while (true) { _token.ThrowIfCancellationRequested(); try { var (success, index) = TryGet(out T val); if (success == false) { break; } await _process($"进程{progressID},进度{index}/{_totalCount}", val); } catch (OperationCanceledException) { _token.ThrowIfCancellationRequested(); } catch (Exception ex) { _output(null, ex); } } _output($"进程{progressID} 结束", null); }); } (bool success, int index) TryGet(out T val) { lock (_lock) { if (queue.Count > 0) { val = queue.Dequeue(); return (true, _totalCount - queue.Count); } else { val = default; return (false, _totalCount); } } } }View Code
使用方法
async Task Test11() { CancellationToken token = new CancellationTokenSource().Token; List<string> taskData = new List<string>(); //将任务封装成泛型集合 for (int i = 0; i < 10; i++) { taskData.Add(Guid.NewGuid().ToString()); //模拟任务数据 } //并行启动5个异步线程并行执行任务,原本需要10*3=30秒的时间,现在只需要30/5=6秒 var ape = new AsyncParallelExecutor<string>(taskData, Execute, token, Output, 5); Log.Info("开始执行..."); try { await ape.Start(); } catch (OperationCanceledException) { Log.Info("任务被中途取消"); } Log.Info("全部执行完毕"); } /// <summary> /// 信息输出委托,可以传null /// </summary> void Output(string msg, Exception ex) { if (ex == null) { Log.Info(msg); } else { Log.Info(ex.ToString()); } } /// <summary> /// 模拟处理数据 /// </summary> /// <param name="progressInof">进度信息</param> /// <param name="data">数据</param> /// <returns></returns> async Task Execute(string progressInof, string data) { Log.Info($"{progressInof},开始处理:{data}"); await Task.Delay(3000); //模拟处理毫时业务 Log.Info($"{progressInof},{data}处理完毕"); }View Code
标签:异步,Task,C#,并行执行,token,output,parallelCount From: https://www.cnblogs.com/luludongxu/p/18307024