首页 > 编程语言 >C#中定义自己的消费队列(下)

C#中定义自己的消费队列(下)

时间:2023-03-12 19:56:23浏览次数:49  
标签:定义 C# consumingQueue queue 队列 int using new public

一 背景

上一篇中我们介绍了一个关于使用C#中的Queue来定义自己的消费队列,这篇文章我将再次使用Queue来定义另外一种消费队列,这个队列中会使用到System.Threading.Timer来定义一个10ms的Interval,和上一篇中产生数据一个个消费不同这篇文章中介绍的消费队列中消费定时器时间间隔内的所有待消费项,前面我们还是一样会通过源码来一步步讲述其内部原理,最后会通过几个单元测试来验证对应的使用。

二 源码

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace Pangea.Common.Utility.Buffer
{
    public class CustomConsumeQueue<T> : IDisposable
    {
        #region Fields

        public const int INTERVAL_CONSUMING = 10;//单位:ms

        // used for dispose
        private bool _isDisposed = false;

        protected const bool FINALIZE_DISPOSING = false;
        protected const bool EXPLICIT_DISPOSING = true;

        private Timer _timer;
        private Queue<T> _queue;
        private readonly object _lockObj = new object();
        private readonly Action<IList<T>> _consumerQueueItemsAction;

#if DEBUG

        // the counter for working thread numbers in current;
        public int _threadCounter = 0;

#endif

        #endregion

        #region Ctor

        public CustomConsumeQueue(Action<IList<T>> consumerQueueItemsAction)
        {
            _queue = new Queue<T>();
            _timer = new Timer(new TimerCallback(BeginTakeQueueItems));
            _timer.Change(Timeout.Infinite, Timeout.Infinite);
            _consumerQueueItemsAction = consumerQueueItemsAction;
        }

        ~CustomConsumeQueue()
        {
            Dispose(FINALIZE_DISPOSING);
        }

        #endregion

        #region Methods

        #region Public

        public bool Add(T item)
        {
            if (_isDisposed) return false;

            lock (_lockObj)
            {
                _queue.Enqueue(item);
                _timer.Change(INTERVAL_CONSUMING, Timeout.Infinite);
            }
            return true;
        }

        public int PendingToConsumeCount()
        {
            lock (_lockObj)
            {
                return _queue.Count;
            }
        }

        public void Dispose()
        {
            Dispose(EXPLICIT_DISPOSING);
            GC.SuppressFinalize(this);
        }

        public void Dispose(bool disposingMode)
        {
            if (_isDisposed) return;

            if (disposingMode == EXPLICIT_DISPOSING)
            {
                // release managed resource whne dispose by explicit
            }

            _timer?.Dispose();
            _timer = null;

            _isDisposed = true;
        }

        #endregion

        #region Private

        private void BeginTakeQueueItems(object state)
        {
            ThreadPool.QueueUserWorkItem(state1 =>
            {
#if DEBUG
                Interlocked.Increment(ref _threadCounter);
#endif
                try
                {
                    T[] itemsArray = null;
                    lock (_lockObj)
                    {
                        itemsArray = new T[_queue.Count];
                        _queue.CopyTo(itemsArray, 0);
                        _queue.Clear();

                        if (_isDisposed == false)
                        {
                            // may throw a disposed exception
                            ((Timer)state).Change(Timeout.Infinite, Timeout.Infinite);
                        }
                    }
                    Trace.WriteLine($"[{DateTime.Now:HH-mm-ss fff}] Begin into consume procedure,QueueCount:{itemsArray.Length},Time:{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}");

                    //begin consumer queue items
                    _consumerQueueItemsAction.Invoke(itemsArray);
                }
                catch (Exception ex)
                {
                    // TODO Log exception, terminate current thread
                }
                finally
                {
#if DEBUG
                    Interlocked.Decrement(ref _threadCounter);
#endif
                }
            });
        }

        #endregion

        #endregion
    }
}

2.1 原理讲解

  1. Add 的过程
    在Add的时候除了将待消费项添加到默认的_queue对象中以外就开始设置定时器触发的Interval为INTERVAL_CONSUMING,这个值默认设置为10ms,也就是当前定时器的触发间隔在添加消费项的时候会默认开启。
  2. 定时器消费过程
    定时器消费的时候会一次性将10ms的间隔内所有消费项一次性取到,然后一次性消费掉,另外需要注意的是,在定时器触发时间回调的过程中需要重新设置定时器的IntervalTimeout.Infinite,这样就能够将原来的定时器停掉,在下一次再次添加的时候重新设置默认的Interval,这样就能够一次往复交替进行,这个是消费过程的主要逻辑。
  3. 实现IDispose模式
    当前的CustomConsumeQueue显式实现IDispose的接口,代码中也都实现了对_isDisposed字段的判断,这个在使用的过程中需要特别注意。

三 测试

单元测试用例如下:

using NUnit.Framework;
using Pangea.Common.Utility.Buffer;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace ACM.Framework.Test.Modules.Utils
{
    public class CustomConsumeQueueTest
    {
        CustomConsumeQueue<string> _consumingQueue = null;

        [SetUp]
        public void Setup()
        {
            CustomConsumeQueue<string> consumeQueue = new CustomConsumeQueue<string>(StartConsuming);
            _consumingQueue = consumeQueue;
        }

        [Test]
        public void GeneralWritingTest()
        {
            int consumedCount = 0;
            CustomConsumeQueue<string> consumeQueue = new CustomConsumeQueue<string>(queueItems =>
            {
                int currentConsumedCount = queueItems.Count;
                Console.WriteLine($"[{DateTime.Now:HH-mm-ss fff}] {currentConsumedCount} items has been consumed.");
                consumedCount += currentConsumedCount;
            });

            int producedCount = new Random(Guid.NewGuid().GetHashCode()).Next(1, 100);
            AddItem(consumeQueue, producedCount);

            int loopTime = 1000;
            while (consumedCount != producedCount)
            {
                Thread.Sleep(100);
                loopTime -= 100;
            }

            Assert.IsTrue(consumeQueue.PendingToConsumeCount() == 0);
            Assert.IsTrue(consumeQueue._threadCounter == 0);
            Assert.IsTrue(consumedCount == producedCount);
        }

        [Test]
        public void TimerWritingTest()
        {
            int times = 20;
            int paralleCount = 5;
            int completedThreadCount = 0;
            for (int i = 0; i < paralleCount; i++)
            {
                int localIndex = i;
                ThreadPool.QueueUserWorkItem(obj =>
                {
                    while (times-- > 0)
                    {
                        AddItem(_consumingQueue, new Random(Guid.NewGuid().GetHashCode()).Next(1, 100));
                        Thread.Yield();
                    }
                    Interlocked.Increment(ref completedThreadCount);
                    Trace.WriteLine($"***************Begin run into parralle thread:{localIndex}************************");
                });
            }

            while (completedThreadCount != 5)
            {
                Thread.Sleep(100);
            }

            Thread.Sleep(1000);

            Assert.IsTrue(_consumingQueue.PendingToConsumeCount() == 0);
            Assert.IsTrue(_consumingQueue._threadCounter == 0);
        }

        [Test]
        public void AddAfterDisposeTest()
        {
            string item1 = GetRandomString();
            bool result1 = _consumingQueue.Add(item1);
            Assert.IsTrue(result1);

            Thread.Sleep(1000); // Console.WriteLine will cost lots of time

            Assert.IsTrue(_consumingQueue.PendingToConsumeCount() == 0);
            Assert.IsTrue(_consumingQueue._threadCounter == 0, $"[{DateTime.Now:HH-mm-ss fff}] Thread numbers is not zero.");

            _consumingQueue.Dispose();

            string item2 = GetRandomString();
            bool result2 = _consumingQueue.Add(item2);
            Assert.IsFalse(result2);
        }

        private void StartConsuming(IList<string> queueItems)
        {
            Console.WriteLine($"[{DateTime.Now:HH-mm-ss fff}] {queueItems.Count} items has been consumed.");
        }
        private void AddItem(CustomConsumeQueue<string> queue, int numbers)
        {
            IList<string> items = new List<string>();
            for (int i = 0; i < numbers; i++)
            {
                items.Add(GetRandomString());
            }
            foreach (var item in items)
            {
                queue.Add(item);
                Thread.Sleep(new Random(Guid.NewGuid().GetHashCode()).Next(1, 50));
            }
        }
        private string GetRandomString()
        {
            return Guid.NewGuid().ToString();
        }
    }
}

测试的单元用例主要包含下面的几个部分:

  1. GeneralWritingTest()实现单个线程增加PendingItem,然后实现消费的过程。
  2. TimerWritingTest()使用5个独立的线程增加PendingItem,在每个线程中随机产生消费项,然后判断消费过程是否正确。
  3. AddAfterDisposeTest() 主要测试代码实现Dispose的过程,在添加然后消费后中途在显式的Dispose当前的Queue,判断后续是否能够再继续进行添加。

标签:定义,C#,consumingQueue,queue,队列,int,using,new,public
From: https://www.cnblogs.com/seekdream/p/17208696.html

相关文章

  • python中list的crud
    #list常用的方法list=[1,2,3,4,5,5,4,5]#长度print(len(list))#增list.append(8)list.append(9)print(list,'appent')#删list.pop()#默认删......
  • 题解 ARC111B【Reversible Cards】
    我们将值域中每个数视作一个节点,将每张卡片视作连接两个节点的边,则问题转化为:对于每条边都选择一个端点,使得被选择的节点总数最大。显然每个连通块可以分开处理。设连通块......
  • Ubuntu22.04下解决终端无法打开vscode
    一般来说,有时为了方便会直接在Ubuntu终端登陆root账户获取权限在root下打开vscode会提示无法打开。环境:Ubuntu22.04解决办法:终端输入:vi~/.bashrc添加一行:aliascod......
  • pycharm快捷键
    1、代码编辑快捷键CTRL+ALT+SPACE快速导入任意类CTRL+SHIFT+ENTER代码补全SHIFT+F1查看外部文档CTRL+Q快速查找文档CTRL+P参数信息(在方法中调用......
  • 关于刷Leetcode-剑指offer学习计划-需要关注的题目
    左旋转字符串二维数组中的查找旋转数组的最小数字股票的最大利润青蛙跳台阶把数字翻译成字符串俩个链表的第一个公共节点和为s的俩个数字矩阵中的路径机器人的运......
  • 【题解】CF364D
    题目大意给定集合a,求最大的是大小超过一半的子集的最大公约数的数是什么。题解“超过一半”即想到随机化n次后只有\(\frac{1}{2^n}\)的几率错误,于是随机一个数判断它的......
  • Dapper.Contrib——更加优雅地使用Dapper进行增删改查
    简介Dapper是介于Entityframework与ADO的折中选择。既满足手写查询的高性能需求,又简化了数据库对象映射为内存对象的繁杂工作。Dapper.Contrib是对Dapper的进一步封装,使......
  • Data Analytics
    1DataAnalytics简介DataAnalytics是由DataHunter(北京数猎天下科技有限公司)自主研发的一款企业级业务数据可视化分析产品。其基于探索式分析技术,以最新的数据自服务理念为......
  • C语言之整型提升
    一、什么是整型提升如果用一句话来说明整型提升那就是:只要是char类型和short类型的数据,   并且被使用或者说是只要参与运算,那么它们就会发生整型提升,被转化为普通整型......
  • docker
    docker的安装方法:1、脚本安装方法(最新版)curl-sSLhttps://get.docker.com/|sh2、yum安装3、rpm包安装所有的:https://download.docker.com/centos7.6的:http://dock......