一 背景
在上一篇中我们介绍了一个关于使用C#中的Queue来定义自己的消费队列,这篇文章我将再次使用Queue来定义另外一种消费队列,这个队列中会使用到System.Threading.Timer
来定义一个10ms的Interval,和上一篇中产生数据一个个消费不同这篇文章中介绍的消费队列中消费定时器时间间隔内的所有待消费项,前面我们还是一样会通过源码来一步步讲述其内部原理,最后会通过几个单元测试来验证对应的使用。
二 源码
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
namespace Pangea.Common.Utility.Buffer
{
public class CustomConsumeQueue<T> : IDisposable
{
#region Fields
public const int INTERVAL_CONSUMING = 10;//单位:ms
// used for dispose
private bool _isDisposed = false;
protected const bool FINALIZE_DISPOSING = false;
protected const bool EXPLICIT_DISPOSING = true;
private Timer _timer;
private Queue<T> _queue;
private readonly object _lockObj = new object();
private readonly Action<IList<T>> _consumerQueueItemsAction;
#if DEBUG
// the counter for working thread numbers in current;
public int _threadCounter = 0;
#endif
#endregion
#region Ctor
public CustomConsumeQueue(Action<IList<T>> consumerQueueItemsAction)
{
_queue = new Queue<T>();
_timer = new Timer(new TimerCallback(BeginTakeQueueItems));
_timer.Change(Timeout.Infinite, Timeout.Infinite);
_consumerQueueItemsAction = consumerQueueItemsAction;
}
~CustomConsumeQueue()
{
Dispose(FINALIZE_DISPOSING);
}
#endregion
#region Methods
#region Public
public bool Add(T item)
{
if (_isDisposed) return false;
lock (_lockObj)
{
_queue.Enqueue(item);
_timer.Change(INTERVAL_CONSUMING, Timeout.Infinite);
}
return true;
}
public int PendingToConsumeCount()
{
lock (_lockObj)
{
return _queue.Count;
}
}
public void Dispose()
{
Dispose(EXPLICIT_DISPOSING);
GC.SuppressFinalize(this);
}
public void Dispose(bool disposingMode)
{
if (_isDisposed) return;
if (disposingMode == EXPLICIT_DISPOSING)
{
// release managed resource whne dispose by explicit
}
_timer?.Dispose();
_timer = null;
_isDisposed = true;
}
#endregion
#region Private
private void BeginTakeQueueItems(object state)
{
ThreadPool.QueueUserWorkItem(state1 =>
{
#if DEBUG
Interlocked.Increment(ref _threadCounter);
#endif
try
{
T[] itemsArray = null;
lock (_lockObj)
{
itemsArray = new T[_queue.Count];
_queue.CopyTo(itemsArray, 0);
_queue.Clear();
if (_isDisposed == false)
{
// may throw a disposed exception
((Timer)state).Change(Timeout.Infinite, Timeout.Infinite);
}
}
Trace.WriteLine($"[{DateTime.Now:HH-mm-ss fff}] Begin into consume procedure,QueueCount:{itemsArray.Length},Time:{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}");
//begin consumer queue items
_consumerQueueItemsAction.Invoke(itemsArray);
}
catch (Exception ex)
{
// TODO Log exception, terminate current thread
}
finally
{
#if DEBUG
Interlocked.Decrement(ref _threadCounter);
#endif
}
});
}
#endregion
#endregion
}
}
2.1 原理讲解
- Add 的过程
在Add的时候除了将待消费项添加到默认的_queue对象中以外就开始设置定时器触发的Interval为INTERVAL_CONSUMING
,这个值默认设置为10ms,也就是当前定时器的触发间隔在添加消费项的时候会默认开启。 - 定时器消费过程
定时器消费的时候会一次性将10ms的间隔内所有消费项一次性取到,然后一次性消费掉,另外需要注意的是,在定时器触发时间回调的过程中需要重新设置定时器的Interval为Timeout.Infinite
,这样就能够将原来的定时器停掉,在下一次再次添加的时候重新设置默认的Interval,这样就能够一次往复交替进行,这个是消费过程的主要逻辑。 - 实现IDispose模式
当前的CustomConsumeQueue显式实现IDispose的接口,代码中也都实现了对_isDisposed字段的判断,这个在使用的过程中需要特别注意。
三 测试
单元测试用例如下:
using NUnit.Framework;
using Pangea.Common.Utility.Buffer;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
namespace ACM.Framework.Test.Modules.Utils
{
public class CustomConsumeQueueTest
{
CustomConsumeQueue<string> _consumingQueue = null;
[SetUp]
public void Setup()
{
CustomConsumeQueue<string> consumeQueue = new CustomConsumeQueue<string>(StartConsuming);
_consumingQueue = consumeQueue;
}
[Test]
public void GeneralWritingTest()
{
int consumedCount = 0;
CustomConsumeQueue<string> consumeQueue = new CustomConsumeQueue<string>(queueItems =>
{
int currentConsumedCount = queueItems.Count;
Console.WriteLine($"[{DateTime.Now:HH-mm-ss fff}] {currentConsumedCount} items has been consumed.");
consumedCount += currentConsumedCount;
});
int producedCount = new Random(Guid.NewGuid().GetHashCode()).Next(1, 100);
AddItem(consumeQueue, producedCount);
int loopTime = 1000;
while (consumedCount != producedCount)
{
Thread.Sleep(100);
loopTime -= 100;
}
Assert.IsTrue(consumeQueue.PendingToConsumeCount() == 0);
Assert.IsTrue(consumeQueue._threadCounter == 0);
Assert.IsTrue(consumedCount == producedCount);
}
[Test]
public void TimerWritingTest()
{
int times = 20;
int paralleCount = 5;
int completedThreadCount = 0;
for (int i = 0; i < paralleCount; i++)
{
int localIndex = i;
ThreadPool.QueueUserWorkItem(obj =>
{
while (times-- > 0)
{
AddItem(_consumingQueue, new Random(Guid.NewGuid().GetHashCode()).Next(1, 100));
Thread.Yield();
}
Interlocked.Increment(ref completedThreadCount);
Trace.WriteLine($"***************Begin run into parralle thread:{localIndex}************************");
});
}
while (completedThreadCount != 5)
{
Thread.Sleep(100);
}
Thread.Sleep(1000);
Assert.IsTrue(_consumingQueue.PendingToConsumeCount() == 0);
Assert.IsTrue(_consumingQueue._threadCounter == 0);
}
[Test]
public void AddAfterDisposeTest()
{
string item1 = GetRandomString();
bool result1 = _consumingQueue.Add(item1);
Assert.IsTrue(result1);
Thread.Sleep(1000); // Console.WriteLine will cost lots of time
Assert.IsTrue(_consumingQueue.PendingToConsumeCount() == 0);
Assert.IsTrue(_consumingQueue._threadCounter == 0, $"[{DateTime.Now:HH-mm-ss fff}] Thread numbers is not zero.");
_consumingQueue.Dispose();
string item2 = GetRandomString();
bool result2 = _consumingQueue.Add(item2);
Assert.IsFalse(result2);
}
private void StartConsuming(IList<string> queueItems)
{
Console.WriteLine($"[{DateTime.Now:HH-mm-ss fff}] {queueItems.Count} items has been consumed.");
}
private void AddItem(CustomConsumeQueue<string> queue, int numbers)
{
IList<string> items = new List<string>();
for (int i = 0; i < numbers; i++)
{
items.Add(GetRandomString());
}
foreach (var item in items)
{
queue.Add(item);
Thread.Sleep(new Random(Guid.NewGuid().GetHashCode()).Next(1, 50));
}
}
private string GetRandomString()
{
return Guid.NewGuid().ToString();
}
}
}
测试的单元用例主要包含下面的几个部分:
- GeneralWritingTest()实现单个线程增加PendingItem,然后实现消费的过程。
- TimerWritingTest()使用5个独立的线程增加PendingItem,在每个线程中随机产生消费项,然后判断消费过程是否正确。
- AddAfterDisposeTest() 主要测试代码实现Dispose的过程,在添加然后消费后中途在显式的Dispose当前的Queue,判断后续是否能够再继续进行添加。