第十二章:同步
目录- 第十二章:同步
12.1 简介
在并发编程中,同步的核心任务是协调多个代码段对共享资源的访问,确保数据的一致性和完整性。随着.NET平台对并发编程的支持日益增强,大部分场景已经通过高效的库或框架实现了隐式同步。然而,理解同步的基础仍然是编写健壮代码的关键。
为什么需要同步?
同步通常用来解决以下问题:
- 数据保护:当多个代码段并发运行且共享可变数据时,确保数据的一致性。
- 交互协调:在一个代码段需要通知另一个代码段某个事件或状态变化时,确保信息传递的正确性。
只有代码同时满足以下三个条件,才需要同步机制:
- 并发执行:代码段在不同线程或任务中存在并行运行。
- 共享数据:这些代码段访问同一个数据对象。
- 数据修改:至少有一个代码段会更新(写入)共享数据。
什么时候不需要同步?
并非所有的并发操作都需要同步,以下是常见的例外场景:
- 没有并发执行:如果代码是按顺序运行的,虽然可能是异步的,但只要没有同时执行的情况,就不需要同步。
- 独立数据:每个线程或任务使用独立的数据副本,彼此没有交集。
- 只读数据:即使数据是共享的,但没有代码修改它(例如,不可变类型)。
示例代码说明这一点:
async Task ExampleAsync()
{
int value = 10;
await Task.Delay(1000);
value += 1;
await Task.Delay(1000);
value -= 1;
Console.WriteLine(value);
}
即使是异步代码,但因为是顺序执行,并没有并发访问,同一时间访问value
的线程或任务只有一个,虽然可能不是同一个,所以无需同步。
同步的常见应用
- 保护共享数据
当多个任务并发修改同一变量时,需要通过同步保证数据更新的正确性,以下代码通过 Task.Run 启动了并行任务,但它们共享同一个变量 value。
private int value;
async Task<int> SimpleParallelismAsync()
{
int value = 0;
Task task1 = Task.Run(() => { value = value + 1; });
Task task2 = Task.Run(() => { value = value + 1; });
Task task3 = Task.Run(() => { value = value + 1; });
await Task.WhenAll(task1, task2, task3);
return value;
}
- 线程安全集合
如 ConcurrentDictionary
等线程安全集合内置了同步机制,在并发场景中可以直接使用:
async Task<int> UseThreadSafeCollectionAsync()
{
var dictionary = new ConcurrentDictionary<int, int>();
Task task1 = Task.Run(() => dictionary.TryAdd(1, 10));
Task task2 = Task.Run(() => dictionary.TryAdd(2, 20));
Task task3 = Task.Run(() => dictionary.TryAdd(3, 30));
await Task.WhenAll(task1, task2, task3);
return dictionary.Count; // 始终返回 3
}
- 不可变数据结构
不可变集合(如 ImmutableStack
)通过设计避免了同步问题,因为每次更新都会生成新的集合:
async Task<bool> UseImmutableStackAsync()
{
var stack = ImmutableStack<int>.Empty;
Task task1 = Task.Run(() => stack.Push(1));
Task task2 = Task.Run(() => stack.Push(2));
Task task3 = Task.Run(() => stack.Push(3));
await Task.WhenAll(task1, task2, task3);
return stack.IsEmpty; // 始终返回 true,因为原始 stack 未被修改
}
同步的重要性
同步的核心目标是为多线程环境中的代码提供一致的数据视图。如果缺乏适当的同步,可能导致:
- 数据竞争:多个线程同时修改数据,结果不可预测。
- 死锁:多个线程因不当的锁管理而相互等待,系统无法继续执行。
- 性能瓶颈:过度同步可能导致线程阻塞,降低并发效率。
了解何时需要同步以及如何选择合适的工具,是并发编程的基础。后续小节将进一步探讨常见的同步技术及其在不同场景中的应用。
12.2 原子操作
简介
在多线程环境中,原子操作是不可分割的操作,要么完全执行成功,要么完全不执行,且在执行过程中不会被其他线程中断或观察到中间状态。
原子操作提供了一种轻量级的线程安全方式,特别适用于高性能场景,避免了使用锁引入的上下文切换和性能开销。
.NET 提供了一些内置的原子操作支持,例如通过 Interlocked
类和某些原子性的集合操作,以此来方便地实现线程安全的数据更新。
使用场景
-
计数器或状态管理
在多线程环境下安全地递增、递减或交换值,例如统计请求数、管理资源计数。 -
无锁编程
在高性能并发场景下,减少锁的使用,提高吞吐量。 -
简化共享变量操作
对简单变量执行原子操作,无需引入复杂的锁机制。
代码示例
使用 Interlocked
类
Interlocked
提供线程安全的方法,用于操作简单的整数或引用类型。
1. 线程安全地递增、递减和累加
-
递增 (
Interlocked.Increment
) 和递减 (Interlocked.Decrement
)using System; using System.Threading; using System.Threading.Tasks; class Program { private static int _counter = 0; static async Task Main(string[] args) { // 启动多个并发任务 Task[] tasks = new Task[10]; for (int i = 0; i < tasks.Length; i++) { tasks[i] = Task.Run(() => { for (int j = 0; j < 1000; j++) { Interlocked.Increment(ref _counter); // 线程安全递增 } }); } await Task.WhenAll(tasks); Console.WriteLine($"Final Counter Value: {_counter}"); // 应该为 10000 } }
-
累加 (
Interlocked.Add
)class Program { private static int _sum = 0; static void AddValue(int value) { Interlocked.Add(ref _sum, value); // 原子累加 } static void Main() { Parallel.For(0, 10, i => AddValue(10)); // 并发累加 Console.WriteLine($"Total Sum: {_sum}"); // 应该为 100 } }
2. 线程安全地交换值
- 交换变量值 (
Interlocked.Exchange
)
适用于在并发场景中重置或切换变量值。class Program { private static int _sharedValue = 42; static void UpdateValue(int newValue) { int originalValue = Interlocked.Exchange(ref _sharedValue, newValue); // 原子交换 Console.WriteLine($"Original Value: {originalValue}, New Value: {_sharedValue}"); } static void Main() { Parallel.For(0, 5, i => UpdateValue(i * 10)); // 并发更新 } }
3. 比较并交换值
Interlocked.CompareExchange
只有当当前值等于预期值时,才会更新为新值。private static int _state = 0; public static void SetState(int newState) { int originalState = Interlocked.CompareExchange(ref _state, newState, 0); // 如果 _state 是 0,则设置为 newState if (originalState == 0) { Console.WriteLine("State updated successfully."); } else { Console.WriteLine("State change failed. Current state: " + originalState); } }
4. 无锁栈的简单实现
利用原子操作实现无锁的链表:
public class LockFreeStack<T>
{
private Node? _head;
private class Node
{
public T Value;
public Node? Next;
public Node(T value) => Value = value;
}
public void Push(T value)
{
var newNode = new Node(value);
Node? oldHead;
do
{
oldHead = _head; // 读取当前头节点
newNode.Next = oldHead;
} while (Interlocked.CompareExchange(ref _head, newNode, oldHead) != oldHead);
}
public bool TryPop(out T? value)
{
Node? oldHead;
do
{
oldHead = _head;
if (oldHead == null)
{
value = default;
return false;
}
} while (Interlocked.CompareExchange(ref _head, oldHead.Next, oldHead) != oldHead);
value = oldHead.Value;
return true;
}
}
原子操作的特点与限制
-
原子操作仅适用于简单数据类型
Interlocked
支持的类型包括int
、long
和引用类型等简单变量。- 对于复杂的数据结构或多字段,可能需要其他线程安全方法。
-
原子性无法跨多个操作保证
原子操作仅在单次操作上提供保障,如果需要对多个变量或复杂逻辑实现原子性,需要借助锁或其他同步机制。 -
低粒度但高性能
原子操作非常轻量级,尤其适合频繁更新的场景。但对于复杂场景,可能会导致代码难以理解和维护。
最佳实践
-
优先考虑简单场景
- 原子操作适合单一字段的更新,例如计数器或标志位。
- 避免在复杂逻辑中滥用原子操作。
-
使用合适的工具
- 对单变量更新,首选
Interlocked
方法。 - 对集合操作,优先使用
ConcurrentDictionary
、ConcurrentQueue
等线程安全集合。
- 对单变量更新,首选
-
避免数据争用
- 如果多个线程频繁竞争同一个变量,即使使用原子操作,也可能导致性能下降。
- 通过分片或分区减少争用热点(如将计数分布在多个线程私有变量中,最后合并结果)。
-
小心无锁编程的陷阱
- 无锁编程虽然性能高,但逻辑难以验证,容易引入隐藏的竞争条件。
- 在追求无锁性能的同时,确保代码正确性。
通过熟练使用原子操作,开发者可以在复杂的多线程环境中实现高性能且线程安全的代码。
12.3 阻塞锁lock
问题
当多个线程需要安全地访问和修改共享数据时,如何确保数据的一致性和线程安全?阻塞锁是解决这一问题的最简单和常用工具。
解决方案
在.NET中,lock
语句提供了一种简单易用的阻塞锁实现方式。它通过锁定一个引用对象,确保在某一时间点只有一个线程可以进入指定的代码块。
示例代码:
class MyClass
{
// 锁对象,用于保护共享字段 _value
private readonly object _mutex = new object();
private int _value;
public void Increment()
{
lock (_mutex) // 进入锁
{
_value = _value + 1; // 仅允许一个线程修改 _value
} // 退出锁
}
}
在上述代码中,lock
语句使用 _mutex
对象作为锁,确保 _value
的修改是线程安全的。
阻塞锁的原理
lock
是 Monitor
的简化语法糖。Monitor
提供了更细粒度的控制,但使用 lock
更直观且便于维护。
lock
的工作机制:
- 尝试进入锁:线程尝试获取锁对象(即
_mutex
)。 - 阻塞其他线程:在锁被占用时,其他线程会进入等待状态,直到锁释放。
- 自动释放:当控制流离开
lock
块,无论是否发生异常,锁都会被自动释放。
等价的 Monitor
写法:
public void Increment()
{
Monitor.Enter(_mutex);
try
{
_value = _value + 1;
}
finally
{
Monitor.Exit(_mutex);
}
}
实际上,lock语句只是语法糖,编译过后也是通过Monitor
实现
常见陷阱
-
锁嵌套导致死锁
当线程 A 和线程 B 相互等待对方持有的锁时,会导致程序永远卡住。示例:
private readonly object lockA = new object(); private readonly object lockB = new object(); public void ThreadA() { lock (lockA) { // 需要 lockB,但 lockB 被 ThreadB 持有 lock (lockB) { // 执行代码 } } } public void ThreadB() { lock (lockB) { // 需要 lockA,但 lockA 被 ThreadA 持有 lock (lockA) { // 执行代码 } } }
解决方法:始终按一致的顺序获取多个锁。例如,确保线程总是先获取 lockA,再获取 lockB。
-
过度锁定
- 锁定不必要的代码块会降低并发性能,应仅锁定需要保护的核心逻辑。
-
锁的误用
- 避免对公共对象(如
this
)加锁,否则可能引发外部代码的竞争。
- 避免对公共对象(如
最佳实践
-
限制锁的可见性
- 锁对象(如
_mutex
)应为私有字段,且不能暴露给外部,避免潜在的死锁风险。 - 虽然可以锁定任何应用对象,但是避免锁定
this
、Type
实例或string
类型,因为它们可能被外部代码共享。
- 锁对象(如
-
记录锁的用途
- 在代码注释中明确锁的作用范围和保护的数据,便于日后维护。
-
最小化锁内代码
- 锁定范围越小,发生死锁或性能瓶颈的可能性越低。
- 避免在锁定状态下执行阻塞操作(如 I/O 操作)或长时间运行的逻辑。
-
避免锁定状态下调用外部代码
- 锁定时不要调用委托、触发事件或调用虚方法,因为这些代码的行为可能难以预测,甚至引发死锁。
其他锁类型
除了基本的 lock
语句,.NET 提供了多个高级锁类型,例如:
-
Monitor
提供比lock
更细粒度的锁控制(如超时等待等),但通常不需要直接使用。 -
SpinLock
适用于短时间锁定的场景,可以避免线程挂起,但如果锁持有时间较长会导致性能降低。 -
ReaderWriterLockSlim
支持多读单写的场景,对于读多写少的情况可以提高性能。但在多数情况下,lock
已足够简单且高效。
建议:
在绝大多数应用程序中,lock
是最简单且高效的选择,能够满足 99% 的需求。仅在特殊场景下才需要考虑使用其他类型的锁。
12.4 自旋锁SpinLock(新手不推荐)
简介
SpinLock 是一种轻量级的锁,它通过不断轮询的方式(自旋)等待锁的释放,而不是阻塞线程或切换上下文。它的目标是减少线程上下文切换的开销,因此适合锁等待时间非常短的场景,例如高性能的多线程计算。
与阻塞锁(如 lock
或 Monitor
)不同,SpinLock 不会导致线程进入阻塞挂起状态,而是持续占用 CPU 资源尝试获取锁。
这使得 SpinLock 在短时间锁竞争中具有极高的性能,但如果锁竞争时间较长,会导致 CPU 资源的浪费,强烈不建议新手使用。
使用场景
-
高性能计算
用于短时间占用的锁保护,如计数器更新、快速内存操作等。 -
避免线程切换开销
在实时性较高的程序中,减少上下文切换和线程阻塞导致的延迟。 -
多线程并发编程
场景需要非常精细的性能优化,并且可以确保锁争用时间极短。
代码示例
1. SpinLock 基本用法
using System;
using System.Threading;
class Program
{
private static SpinLock _spinLock = new SpinLock();
private static int _counter = 0;
static void IncrementCounter()
{
bool lockTaken = false;
try
{
_spinLock.Enter(ref lockTaken); // 获取锁
_counter++; // 保护的共享资源
}
finally
{
if (lockTaken)
{
_spinLock.Exit(); // 释放锁
}
}
}
static void Main()
{
const int threadCount = 10;
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
{
threads[i] = new Thread(() =>
{
for (int j = 0; j < 1000; j++)
{
IncrementCounter();
}
});
threads[i].Start();
}
foreach (var thread in threads)
{
thread.Join(); // 等待线程完成
}
Console.WriteLine($"Final Counter Value: {_counter}"); // 输出 10000
}
}
2. 使用 SpinLock 的递归陷阱
SpinLock 默认不支持在同一线程中递归调用 Enter
方法。
static void RecursiveLock(SpinLock spinLock, int depth)
{
if (depth == 0) return;
bool lockTaken = false;
try
{
spinLock.Enter(ref lockTaken); // 获取锁
Console.WriteLine($"Lock taken at depth {depth}");
RecursiveLock(spinLock, depth - 1); // 再次调用导致死锁
}
finally
{
if (lockTaken)
{
spinLock.Exit(); // 释放锁
}
}
}
解决方法:允许线程内重入
SpinLock spinLock = new SpinLock(enableThreadOwnerTracking: true);
但是,启用线程跟踪会增加一些性能开销,应根据需要选择使用。
SpinLock 的特点
-
轻量级与高性能
- SpinLock 避免了线程阻塞带来的上下文切换开销,在短时间锁竞争场景中效率极高。
- 自旋期间线程不会让出 CPU,因此适合短时间操作。
-
可能导致忙等
- 如果锁占用时间较长,SpinLock 会导致其他线程长时间自旋,浪费 CPU 资源。
-
线程跟踪功能
SpinLock
可以选择启用线程所有权跟踪(enableThreadOwnerTracking
)。- 开启后,
SpinLock
能检测同一线程的重复锁请求(支持重入),但会带来额外性能开销。
-
不适合 IO 密集场景
- 在需要等待 IO 或其他慢速操作的场景中,SpinLock 的忙等可能导致严重的性能问题。
最佳实践
-
仅在短时间锁定中使用
- 如果共享资源的锁持有时间较长,应避免使用 SpinLock。
-
谨慎处理递归调用
- 默认情况下 SpinLock 不支持递归调用,启用线程跟踪功能可以解决,但会带来性能开销。
-
释放锁的正确性
- 确保所有获取锁的路径都能正确释放锁,尤其是在异常处理中。推荐在
try...finally
块中管理锁。
- 确保所有获取锁的路径都能正确释放锁,尤其是在异常处理中。推荐在
-
避免死锁风险
- 确保锁获取和释放的顺序始终一致。
- 尽量减少锁内操作的复杂性,避免调用外部方法。
-
考虑 SpinWait 替代
- 如果锁等待逻辑需要更精细的控制,可以使用
SpinWait
,它允许自定义自旋行为并在必要时让出 CPU。
- 如果锁等待逻辑需要更精细的控制,可以使用
SpinLock vs 其他锁
特性 | SpinLock |
lock / Monitor |
Interlocked |
---|---|---|---|
适用场景 | 短时间锁定 | 长时间锁定 | 单变量原子操作 |
性能 | 高(短时间锁定) | 较低(上下文切换开销) | 最高(无锁) |
阻塞 | 无阻塞(自旋) | 阻塞线程 | 无阻塞 |
资源消耗 | 高(占用 CPU) | 中等 | 最低 |
复杂性 | 较高 | 低 | 低 |
递归支持 | 不支持 | 支持 | 不适用 |
12.5 自旋等待SpinWait(新手不推荐)
SpinWait
是 .NET 提供的一种轻量级同步工具,与 SpinLock
类似,SpinWait
也利用了“忙等待”的概念,但它更加灵活,旨在通过自旋和适度的让步(Yielding)来平衡 CPU 使用和上下文切换开销。
核心思想
当一个线程需要等待某个条件满足时,有两种常见的策略:
-
阻塞等待(Blocking):
使用锁或等待句柄(如Monitor.Wait
、AutoResetEvent
等)来挂起线程,直到条件满足。这种方式会导致线程上下文切换,增加额外开销。 -
自旋等待(Spinning):
线程在一个循环中反复检查条件是否满足,同时不挂起线程。自旋的优势在于避免了上下文切换的开销,但会消耗 CPU 资源。
SpinWait
是介于这两者之间的一种策略:
- 它首先通过自旋等待一段时间(消耗 CPU),以避免线程挂起的开销。
- 当自旋次数达到一定阈值后,它会通过调用
Thread.Yield
或短暂睡眠让出 CPU,避免长时间占用 CPU 核心。
基本用法
以下是一个使用 SpinWait
的简单示例:
using System;
using System.Threading;
class SpinWaitExample
{
private static bool _isConditionMet = false;
public static void Main()
{
Thread workerThread = new Thread(() =>
{
Console.WriteLine("Worker: Performing work...");
Thread.Sleep(1000); // 模拟一些工作
_isConditionMet = true; // 设置条件
Console.WriteLine("Worker: Work completed.");
});
workerThread.Start();
// 主线程使用 SpinWait 等待条件满足
SpinWait spinWait = new SpinWait();
while (!_isConditionMet)
{
spinWait.SpinOnce(); // 每次循环调用 SpinOnce
}
Console.WriteLine("Main: Condition met, proceeding with execution.");
}
}
代码说明:
-
SpinWait
实例:
可以创建一个SpinWait
实例来控制自旋行为。 -
SpinOnce
方法:
每次调用SpinOnce
,线程会执行一次自旋操作,随着调用次数的增加,它会逐步调整自旋策略(比如在高自旋次数时调用Thread.Yield
让出时间片)。 -
条件检查:
在循环中反复检查_isConditionMet
,直到条件满足。
SpinWait 的特点
-
自适应自旋策略:
SpinWait
会根据调用SpinOnce
的次数动态调整策略。在初期,它会持续自旋以避免线程上下文切换;当自旋次数较多时,它会尝试让出 CPU(通过Thread.Yield
或短暂的睡眠)。 -
非阻塞:
与阻塞式等待相比,SpinWait
不会直接挂起线程,因此在等待时间较短的情况下性能更高。 -
避免 CPU 过载:
在高自旋次数情况下,通过让出 CPU 避免了长时间占用 CPU 资源。
SpinOnce 的行为
SpinOnce
是 SpinWait
的核心方法,每次调用它时,SpinWait
会决定下一步的行为:
-
短时间自旋:
在前几次调用时,SpinWait
会以忙等待的方式直接占用 CPU 核心。 -
让出 CPU:
当自旋次数达到一定阈值时,它会调用Thread.Yield
将 CPU 时间片让给其他线程。 -
短暂睡眠:
如果自旋次数继续增加,SpinWait
会调用Thread.Sleep(1)
进行短暂的线程休眠。
这种动态调整的特性使得 SpinWait
能在短时间等待中保持高性能,同时在长时间等待中避免 CPU 过载。
SpinWait 的常见属性和方法
-
SpinOnce()
:
执行一次自旋操作,动态调整自旋策略。 -
NextSpinWillYield
(属性):
返回一个布尔值,表示下一次调用SpinOnce
时是否会尝试让出 CPU。SpinWait spinWait = new SpinWait(); while (!condition) { spinWait.SpinOnce(); if (spinWait.NextSpinWillYield)// 判断下次调用spinWait.SpinOnce()是否会让出CPU { Console.WriteLine("Yielding CPU to other threads..."); } }
-
Count
(属性):
返回调用SpinOnce
的次数,可以用来调试或控制自旋行为。
使用场景
-
短时间等待:
如果等待的条件会在很短的时间内满足,SpinWait
可以避免线程挂起,提高性能。示例:线程池中的任务调度器可以使用
SpinWait
来等待任务队列的更新。 -
高性能并发场景:
在高并发场景中,SpinWait
可以用来等待轻量级的标志位或状态切换,而不引入锁的开销。示例:生产者-消费者模型中,消费者可以使用
SpinWait
等待生产者的状态更新。 -
替代
Thread.Sleep
或Thread.Yield
:
在需要微秒级别等待的场景中,SpinWait
比直接使用Thread.Sleep
或Thread.Yield
更高效。
SpinWait vs SpinLock
特性 | SpinWait |
SpinLock |
---|---|---|
目的 | 等待条件满足(循环检查) | 保护临界区(锁机制) |
是否提供锁机制 | 否 | 是 |
动态调整策略 | 是(自适应自旋 + 让出 CPU) | 否(纯自旋) |
使用复杂性 | 较低 | 较高 |
适用场景 | 条件等待 | 临界区保护 |
底层实现
SpinWait
的实现依赖硬件和操作系统的调度支持:
- 在自旋阶段,
SpinWait
会反复执行轻量级的 CPU 指令,避免线程挂起。 - 在达到高自旋次数后,
SpinWait
会调用Thread.Yield
或Thread.Sleep
,让出 CPU 时间片。 - 通过这种方式,
SpinWait
平衡了性能和资源消耗。
12.6 轻量级读写锁ReaderWriterLockSlim
ReaderWriterLockSlim
是 .NET 中的一种高级锁机制,旨在通过区分“读操作”和“写操作”来提高并发性能。它允许多个线程同时执行读操作(共享锁),但在写操作(独占锁)时会阻止其他线程的读写操作。
这种锁的设计理念是:
- 读操作的并发性高: 允许多个线程同时读取共享资源。
- 写操作的独占性强: 写操作需要独占锁,确保线程安全。
相较于传统的 lock
或 Monitor
,ReaderWriterLockSlim
在 读多写少 的场景下提供了更高的性能。
核心特点
-
读写分离:
- 允许多个线程同时读取数据,只要没有线程在写入。
- 如果有线程正在写入,所有其他线程(包括读线程)都会被阻塞。
-
支持递归锁:
同一线程可以多次获取读锁或写锁(递归锁)。 -
升级锁功能:
允许线程从读锁升级为写锁,但需要注意升级锁可能导致死锁问题。 -
高性能:
相较于老式的ReaderWriterLock
,ReaderWriterLockSlim
更轻量、更高效,适合高并发场景。
基本用法
以下是使用 ReaderWriterLockSlim
的典型示例:
using System;
using System.Threading;
class ReaderWriterLockSlimExample
{
private static ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private static int _sharedResource = 0;
public static void Main()
{
Thread writer = new Thread(WriteResource);
Thread reader1 = new Thread(ReadResource);
Thread reader2 = new Thread(ReadResource);
writer.Start();
reader1.Start();
reader2.Start();
writer.Join();
reader1.Join();
reader2.Join();
Console.WriteLine("All operations completed.");
}
private static void WriteResource()
{
_lock.EnterWriteLock(); // 获取写锁
try
{
Console.WriteLine("Writer: Acquired write lock.");
_sharedResource++;
Thread.Sleep(1000); // 模拟写操作
Console.WriteLine($"Writer: Updated shared resource to {_sharedResource}.");
}
finally
{
_lock.ExitWriteLock(); // 释放写锁
Console.WriteLine("Writer: Released write lock.");
}
}
private static void ReadResource()
{
_lock.EnterReadLock(); // 获取读锁
try
{
Console.WriteLine($"Reader: Acquired read lock. Shared resource = {_sharedResource}");
Thread.Sleep(500); // 模拟读操作
}
finally
{
_lock.ExitReadLock(); // 释放读锁
Console.WriteLine("Reader: Released read lock.");
}
}
}
代码说明:
-
EnterWriteLock
和ExitWriteLock
:EnterWriteLock
用于获取写锁,确保只有一个线程可以写入共享资源。- 操作完成后,必须调用
ExitWriteLock
释放锁。
-
EnterReadLock
和ExitReadLock
:EnterReadLock
用于获取读锁,允许多个线程同时读取共享资源。- 操作完成后,必须调用
ExitReadLock
释放锁。
-
读多写少:
示例中,两个读线程可以在写线程完成之前同时读取数据,体现了读写分离的优势。
常用方法
方法/属性 | 描述 |
---|---|
EnterReadLock |
获取读锁,允许多个线程同时读取。 |
ExitReadLock |
释放读锁。 |
EnterWriteLock |
获取写锁,阻止其他线程的读写操作。 |
ExitWriteLock |
释放写锁。 |
EnterUpgradeableReadLock |
获取可升级的读锁,允许从读锁升级为写锁。 |
ExitUpgradeableReadLock |
释放可升级的读锁。 |
IsReadLockHeld |
当前线程是否持有读锁。 |
IsWriteLockHeld |
当前线程是否持有写锁。 |
IsUpgradeableReadLockHeld |
当前线程是否持有可升级的读锁。 |
可升级读锁
什么是可升级读锁?
EnterUpgradeableReadLock
允许线程在持有读锁的同时,有条件地升级为写锁。它适用于以下场景:
- 线程开始时需要读取数据。
- 根据某些条件,决定是否需要修改(写入)数据。
以下是一个可升级读锁的示例:
private static void UpgradeableReadExample()
{
_lock.EnterUpgradeableReadLock(); // 获取可升级读锁
try
{
Console.WriteLine($"Thread: Reading shared resource = {_sharedResource}");
if (_sharedResource == 0) // 满足条件时升级为写锁
{
_lock.EnterWriteLock();
try
{
Console.WriteLine("Thread: Upgraded to write lock.");
_sharedResource = 42;
Console.WriteLine("Thread: Updated shared resource.");
}
finally
{
_lock.ExitWriteLock(); // 释放写锁
}
}
}
finally
{
_lock.ExitUpgradeableReadLock(); // 释放可升级读锁
}
}
注意:
- 只能在单个线程中使用可升级读锁。
- 如果多个线程同时尝试升级读锁到写锁,可能会导致死锁。
- 在可升级读锁中,其他线程仍可以获取读锁,但不能获取写锁。
最佳实践
-
选择合适的锁类型
如果读操作占绝对主导地位,优先选择ReaderWriterLockSlim
;否则,可以考虑更轻量的lock
。 -
合理使用升级读锁
- 仅在确实需要时才升级为写锁。
- 避免过长时间持有升级读锁以减少竞争。
-
减少锁粒度
尽量缩小锁的作用范围,减少持有锁的时间,避免潜在的死锁或性能下降。 -
避免嵌套锁操作
嵌套ReaderWriterLockSlim
的读写锁操作会引发复杂的死锁问题,应尽量避免。 -
释放锁的顺序
- 保证
ExitReadLock
、ExitWriteLock
和ExitUpgradeableReadLock
与其对应的Enter
方法严格匹配。 - 使用
try-finally
确保锁释放。
- 保证
12.7 异步锁
传统的同步锁(如 lock
或 Monitor
)无法直接与 async/await
兼容,因为它们并未为异步操作设计。为此,.NET 提供了 SemaphoreSlim
,并支持异步方法,同时第三方库(如 Nito.AsyncEx)也引入了更优雅的异步锁实现,例如 AsyncLock
。
使用场景
- 在
async/await
场景中,需要保护共享数据免受并发访问。 - 希望避免阻塞线程,并保持程序的高性能和响应性。
- 多个异步任务同时访问一个共享资源时需要协调。
代码示例
使用 SemaphoreSlim
using System;
using System.Threading;
using System.Threading.Tasks;
class MyClass
{
private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1);
private int _value;
public async Task DelayAndIncrementAsync()
{
await _mutex.WaitAsync();
try
{
int oldValue = _value;
await Task.Delay(TimeSpan.FromSeconds(oldValue)); // 模拟异步操作
_value = oldValue + 1;
}
finally
{
_mutex.Release();
}
}
}
使用 AsyncLock
(来自 Nito.AsyncEx)
using System;
using System.Threading.Tasks;
using Nito.AsyncEx;
class MyClass
{
private readonly AsyncLock _mutex = new AsyncLock();
private int _value;
public async Task DelayAndIncrementAsync()
{
using (await _mutex.LockAsync())
{
int oldValue = _value;
await Task.Delay(TimeSpan.FromSeconds(oldValue)); // 模拟异步操作
_value = oldValue + 1;
}
}
}
背后原理
-
SemaphoreSlim 的异步特性:
SemaphoreSlim.WaitAsync
使用信号量限制同时访问的线程数量。通过异步方式,它不会阻塞调用线程,而是返回一个等待的任务,直到资源可用。Release
方法释放锁,使其他等待中的任务得以运行。
-
AsyncLock 的实现:
AsyncLock
是一个简化的异步锁实现,通常基于SemaphoreSlim
或类似的同步机制封装。- 它通过
LockAsync
提供了更易用的 API,同时内置了IDisposable
模式,让锁的释放更方便和安全。
-
避免阻塞线程:
- 传统同步锁可能阻塞线程,占用线程池资源。而异步锁通过任务调度器让出线程,从而提升系统的整体性能。
12.8 阻塞信号
问题
在多线程编程中,经常需要在线程间发送通知。例如,一个线程需要等待另一个线程完成某些初始化操作,然后再继续执行。这种线程间的信号协调如何实现?
解决方案
.NET 提供了多种跨线程的信号机制,最常见的就是 ManualResetEventSlim
。它是一种手动重置的事件信号,可以在线程间进行同步通知。ManualResetEventSlim
拥有两种状态:
-
有信号状态(Signaled):
表示事件已经触发,所有等待的线程都会被释放。 -
无信号状态(Non-Signaled):
表示事件未触发,任何调用Wait
的线程都会阻塞,直到事件被设置为有信号状态。
基本用法
以下是一段示例代码,展示如何使用 ManualResetEventSlim
在不同线程间发送信号:
using System;
using System.Threading;
class MyClass
{
// 手动重置事件
private readonly ManualResetEventSlim _initialized = new ManualResetEventSlim();
private int _value;
// 等待初始化完成
public int WaitForInitialization()
{
Console.WriteLine("Waiting for initialization...");
_initialized.Wait(); // 阻塞当前线程,直到事件被触发
Console.WriteLine("Initialization complete.");
return _value;
}
// 另一个线程完成初始化并发送信号
public void InitializeFromAnotherThread()
{
Console.WriteLine("Initializing...");
_value = 13; // 设置共享数据
_initialized.Set(); // 触发信号,释放等待的线程
Console.WriteLine("Signal sent.");
}
}
class Program
{
static void Main()
{
var myClass = new MyClass();
// 启动线程等待信号
var waitingThread = new Thread(() =>
{
int value = myClass.WaitForInitialization();
Console.WriteLine($"Value received: {value}");
});
waitingThread.Start();
// 主线程延迟后初始化
Thread.Sleep(2000); // 模拟一些延迟操作
myClass.InitializeFromAnotherThread();
waitingThread.Join(); // 等待线程结束
Console.WriteLine("Main thread exiting.");
}
}
输出示例:
Waiting for initialization...
Initializing...
Signal sent.
Initialization complete.
Value received: 13
Main thread exiting.
代码说明:
-
声明事件:
ManualResetEventSlim _initialized
是一个手动重置事件,用于线程间的信号同步。 -
等待信号:
WaitForInitialization
方法在调用Wait()
时会阻塞当前线程,直到事件变为有信号状态。 -
触发信号:
InitializeFromAnotherThread
方法调用Set()
方法将事件设置为有信号状态,释放所有等待中的线程。 -
线程间通信:
一个线程完成初始化后发送信号,另一个线程接收到信号后继续执行。
ManualResetEventSlim 的核心方法
方法/属性 | 描述 |
---|---|
Wait() |
阻塞当前线程,直到事件进入有信号状态。 |
Set() |
将事件设置为有信号状态,释放所有等待中的线程。 |
Reset() |
将事件重置为无信号状态,阻止后续线程通过 Wait() 。 |
IsSet |
返回事件当前是否处于有信号状态。 |
Wait(TimeSpan) |
带超时的等待。在指定时间内如果事件未触发,返回 false 。 |
与其他同步信号的对比
如果 ManualResetEventSlim 无法满足需求,还可以考虑以下同步信号类型,由于不常用,仅仅给出简单示例:
1. AutoResetEvent
自动重置事件,每次调用 Set()
只释放一个等待的线程,然后自动重置为无信号状态。
适用场景: 需要按顺序逐个释放线程,比如生产者/消费者模型。
代码示例:
using System;
using System.Threading;
class AutoResetEventExample
{
private static AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
static void Main()
{
new Thread(Worker).Start();
Thread.Sleep(1000); // 模拟其他操作
Console.WriteLine("主线程发送信号...");
_autoResetEvent.Set(); // 释放一个等待线程
}
private static void Worker()
{
Console.WriteLine("子线程等待信号...");
_autoResetEvent.WaitOne(); // 等待信号
Console.WriteLine("子线程收到信号,继续执行!");
}
}
2. CountdownEvent
倒计数事件,初始化时设置一个计数器值,每调用一次 Signal()
计数减一,当计数降为 0 时,所有等待线程被释放。
适用场景: 等待多个线程完成任务后再继续执行,比如多线程下载完成后合并结果。
代码示例:
using System;
using System.Threading;
class CountdownEventExample
{
private static CountdownEvent _countdown = new CountdownEvent(3); // 初始计数器为 3
static void Main()
{
for (int i = 0; i < 3; i++)
{
new Thread(Worker).Start(i);
}
Console.WriteLine("主线程等待所有子线程完成...");
_countdown.Wait(); // 等待计数器归零
Console.WriteLine("所有子线程已完成!");
}
private static void Worker(object id)
{
Console.WriteLine($"子线程 {id} 正在执行...");
Thread.Sleep(1000); // 模拟工作
Console.WriteLine($"子线程 {id} 完成!");
_countdown.Signal(); // 计数器减一
}
}
3. Barrier
障碍同步机制,多个线程在每个阶段完成时等待其他线程,所有线程完成该阶段后自动进入下一个阶段。
适用场景: 线程需要分阶段协作,比如并行计算中的阶段性数据处理。
代码示例:
using System;
using System.Threading;
class BarrierExample
{
private static Barrier _barrier = new Barrier(3, (b) =>
{
Console.WriteLine($"所有线程已到达阶段 {b.CurrentPhaseNumber + 1},进入下一阶段...");
});
static void Main()
{
for (int i = 0; i < 3; i++)
{
new Thread(Worker).Start(i);
}
}
private static void Worker(object id)
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine($"线程 {id} 正在完成阶段 {i + 1}...");
Thread.Sleep(1000); // 模拟工作
_barrier.SignalAndWait(); // 等待其他线程到达障碍
}
}
}
总结
ManualResetEventSlim
:通用信号机制,适合跨线程协调,手动控制信号状态。AutoResetEvent
:自动信号机制,每次触发仅释放一个线程,适合逐一释放线程的场景。CountdownEvent
:倒计数信号,适合等待一组线程完成任务。Barrier
:阶段性同步信号,适合线程分阶段协作。
等待多个线程示例
以下示例展示如何使用多个 ManualResetEventSlim
同步多个线程:
using System;
using System.Threading;
class Program
{
static void Main()
{
var events = new ManualResetEventSlim[3]; // 创建三个信号
for (int i = 0; i < events.Length; i++)
{
events[i] = new ManualResetEventSlim(false);
}
for (int i = 0; i < events.Length; i++)
{
int index = i; // 避免闭包问题
new Thread(() =>
{
Console.WriteLine($"Thread {index + 1} starting...");
Thread.Sleep(1000 * (index + 1)); // 模拟工作
Console.WriteLine($"Thread {index + 1} completed.");
events[index].Set(); // 触发对应的信号
}).Start();
}
// 等待所有线程完成
foreach (var e in events)
{
e.Wait();
}
Console.WriteLine("All threads completed. Main thread exiting.");
}
}
输出示例:
Thread 1 starting...
Thread 2 starting...
Thread 3 starting...
Thread 1 completed.
Thread 2 completed.
Thread 3 completed.
All threads completed. Main thread exiting.
ManualResetEventSlim 的注意事项
-
避免信号丢失:
如果一个线程在另一个线程调用Wait()
之前就已经调用了Set()
,等待线程可能会错过信号。这种情况下,应确保信号的触发顺序符合逻辑。 -
线程阻塞:
Wait()
会阻塞线程,因此应避免长时间等待,或者考虑使用带有超时的Wait(TimeSpan)
方法。 -
信号的正确使用:
在某些场景下,如果信号只是用来协调访问共享数据(而不是线程间通知),更合适的方式可能是使用锁(如Monitor
或ReaderWriterLockSlim
)。 -
轻量级场景:
ManualResetEventSlim
适用于轻量级同步。如果需要更复杂的线程协调机制,可以考虑其他同步方式(如AutoResetEvent
或Barrier
)。
12.9 异步信号
异步信号是用于异步代码中的一种信号机制,允许任务在非阻塞的情况下等待某种条件满足。不同于传统的阻塞信号(如 ManualResetEventSlim
),异步信号不会阻塞线程,而是返回一个可等待的任务,当条件满足时继续执行。
在 .NET 中,可以通过 SemaphoreSlim
、TaskCompletionSource<T>
或第三方库(如 Nito.AsyncEx 的 AsyncManualResetEvent
)来实现异步信号。
代码示例
示例 1:使用 SemaphoreSlim
using System;
using System.Threading;
using System.Threading.Tasks;
class AsyncSignalExample
{
private readonly SemaphoreSlim _signal = new SemaphoreSlim(0); // 初始无信号状态
private int _sharedData;
// 等待信号的异步方法
public async Task<int> WaitForSignalAsync()
{
await _signal.WaitAsync(); // 异步等待信号
return _sharedData;
}
// 触发信号的方法
public async Task SetSignalAsync()
{
await Task.Delay(1000); // 模拟异步操作
_sharedData = 42; // 更新共享数据
_signal.Release(); // 触发信号
Console.WriteLine("信号已发送");
}
}
class Program
{
static async Task Main()
{
var example = new AsyncSignalExample();
// 同时启动等待和触发任务
var waitTask = example.WaitForSignalAsync();
var setTask = example.SetSignalAsync();
int result = await waitTask; // 等待信号
await setTask;
Console.WriteLine($"收到信号,数据为:{result}");
}
}
示例 2:使用 TaskCompletionSource<T>
实现单次信号
当一个通知仅需发送一次时,可以使用 TaskCompletionSource<T>
。
using System;
using System.Threading.Tasks;
class AsyncSignalExample
{
private readonly TaskCompletionSource<object> _initialized = new TaskCompletionSource<object>();
private int _value1;
private int _value2;
// 异步等待信号的任务
public async Task<int> WaitForInitializationAsync()
{
await _initialized.Task; // 等待信号
return _value1 + _value2;
}
// 触发信号的方法
public void Initialize()
{
_value1 = 13;
_value2 = 17;
_initialized.TrySetResult(null); // 触发信号
Console.WriteLine("信号已发送");
}
}
class Program
{
static async Task Main()
{
var example = new AsyncSignalExample();
// 启动等待任务
var waitTask = example.WaitForInitializationAsync();
// 模拟异步触发
await Task.Delay(1000);
example.Initialize();
// 获取结果
int result = await waitTask;
Console.WriteLine($"收到信号,计算结果为:{result}");
}
}
示例 3:使用 AsyncManualResetEvent
实现多次信号
对于需要多次设置和重置信号的场景,可以使用 Nito.AsyncEx 提供的 AsyncManualResetEvent
。
using System;
using System.Threading.Tasks;
using Nito.AsyncEx;
class AsyncSignalExample
{
private readonly AsyncManualResetEvent _connected = new AsyncManualResetEvent();
// 异步等待信号的任务
public async Task WaitForConnectedAsync()
{
await _connected.WaitAsync(); // 等待信号
Console.WriteLine("已连接");
}
// 设置或重置信号的方法
public void ConnectedChanged(bool connected)
{
if (connected)
{
_connected.Set(); // 设置信号
Console.WriteLine("信号已设置");
}
else
{
_connected.Reset(); // 重置信号
Console.WriteLine("信号已重置");
}
}
}
class Program
{
static async Task Main()
{
var example = new AsyncSignalExample();
// 启动等待任务
var waitTask = example.WaitForConnectedAsync();
// 模拟信号变化
example.ConnectedChanged(true);
await Task.Delay(1000); // 等待信号响应
example.ConnectedChanged(false);
}
}
背后原理
-
TaskCompletionSource<T>
的实现:TaskCompletionSource<T>
创建了一个Task
,通过TrySetResult
或TrySetException
等方法控制任务的完成状态。- 一旦信号被触发,所有等待任务都会被解除挂起。
-
AsyncManualResetEvent
的机制:- 使用内部的
TaskCompletionSource
来管理信号状态。 Set
方法触发信号,完成等待的任务;Reset
方法创建新的未完成任务,等待新的信号触发。
- 使用内部的
-
SemaphoreSlim
的WaitAsync
本质上是对一个信号量计数器的操作,当计数器大于 0 时,不阻塞线程。 -
与阻塞信号的区别:
- 阻塞信号:线程挂起,直到信号释放。
- 异步信号:线程释放,任务被挂起,资源利用率更高。
常见陷阱
-
不正确的信号状态管理:
- 在需要重复使用信号时,忘记调用
Reset
,可能导致等待的任务无法再次被触发。
- 在需要重复使用信号时,忘记调用
-
误用阻塞操作:
- 在异步代码中使用阻塞信号(如
ManualResetEventSlim
),可能导致线程被不必要地占用,降低性能。
- 在异步代码中使用阻塞信号(如
-
资源泄漏:
- 如果使用
SemaphoreSlim
,忘记调用Release
会导致信号量计数不一致,可能阻止等待任务完成。
- 如果使用
-
误用异步和同步混合操作:
-
示例(问题代码):
_signal.WaitAsync().Wait(); // 同步等待异步信号,可能导致死锁
-
最佳实践
-
根据需求选择合适的信号工具:
- 单次信号:使用
TaskCompletionSource<T>
。 - 多次信号:使用
AsyncManualResetEvent
。
- 单次信号:使用
-
清晰管理信号的生命周期:
- 在
Set
后,若需要再次等待,应调用Reset
明确恢复到初始状态。
- 在
-
避免混用同步和异步:
- 在异步代码中,始终使用异步信号机制,避免混合同步等待。
-
结合超时与取消令牌:
-
在
WaitAsync
中传递CancellationToken
,避免信号等待无限期挂起。 -
示例:
await _signal.WaitAsync(cancellationToken); // await _signal.WaitAsync(TimeSpan.FromSeconds(5));// 或者
-
12.10 节流
问题
在高并发场景中,代码可能会生成大量并发任务或线程,导致以下问题:
- 资源耗尽: 过多的并发操作可能导致 CPU、内存、网络连接等资源被耗尽。
- 性能下降: 由于资源竞争,任务处理效率可能反而降低。
- 系统不稳定: 如果无法有效限制并发数,可能导致应用程序崩溃或响应变慢。
为了解决上述问题,需要引入节流机制,通过限制并发操作的数量,平衡性能与资源消耗。
解决方案
节流机制的核心是限制并发操作的数量。根据代码的并发类型,可以采用不同的节流方法。
以下是常见的节流方案:
- 数据流(Dataflow)
- PLINQ(并行 LINQ)
Parallel
类- 异步代码:
SemaphoreSlim
和Channel
1. 数据流(Dataflow)
数据流(Dataflow)是 .NET 提供的一种用于并发处理的编程模型,它内置了节流机制,可以通过设置 MaxDegreeOfParallelism
来限制并发任务的数量。
示例:TransformBlock 的节流
以下代码使用 TPL 数据流的 TransformBlock
实现节流:
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static async Task Main(string[] args)
{
// 创建一个 TransformBlock,用于执行并发的乘法操作
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10 // 最大并发数为 10
};
var block = new TransformBlock<int, int>(data =>
{
Console.WriteLine($"Processing {data} on Task {Task.CurrentId}");
Task.Delay(100).Wait(); // 模拟耗时操作
return data * 2;
}, options);
// 向块中发送数据
for (int i = 0; i < 20; i++)
{
block.Post(i);
}
block.Complete(); // 表示不再有更多的数据发送到块中
// 获取处理结果
while (await block.OutputAvailableAsync())
{
Console.WriteLine(await block.ReceiveAsync());
}
Console.WriteLine("Processing complete.");
}
}
代码说明:
-
TransformBlock
:
用于将输入映射到输出的异步数据块。 -
ExecutionDataflowBlockOptions
:
设置块的最大并发数为 10,限制同时处理的数据量。 -
节流效果:
即使发送了 20 条数据,只有 10 个任务会同时运行。
2. PLINQ(并行 LINQ)
PLINQ 是 LINQ 的并行版本,支持对集合中的元素进行并行处理。同样可以通过 WithDegreeOfParallelism
限制并发数。
示例:PLINQ 实现节流
using System;
using System.Linq;
class Program
{
static void Main(string[] args)
{
var values = Enumerable.Range(1, 20);
// 使用 PLINQ 对数据进行并行处理,并限制并发数
var results = values
.AsParallel()
.WithDegreeOfParallelism(5) // 最大并发数为 5
.Select(item =>
{
Console.WriteLine($"Processing {item} on Task {Task.CurrentId}");
Task.Delay(100).Wait(); // 模拟耗时操作
return item * 2;
})
.ToList();
Console.WriteLine("Results: " + string.Join(", ", results));
}
}
代码说明:
-
AsParallel
:
将集合转为并行查询。 -
WithDegreeOfParallelism
:
限制并发任务的数量为 5。 -
节流效果:
即使集合有 20 个元素,最多只有 5 个任务会同时执行。
3. Parallel
类
Parallel
类提供了多种方法(如 Parallel.ForEach
)用于并行处理集合中的数据。同样可以通过 ParallelOptions
设置最大并发数。
示例:Parallel.ForEach 的节流
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var matrices = new List<int>(Enumerable.Range(1, 20));
// 使用 Parallel.ForEach 并限制最大并发数
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 4 // 最大并发数为 4
};
Parallel.ForEach(matrices, options, matrix =>
{
Console.WriteLine($"Processing {matrix} on Task {Task.CurrentId}");
Task.Delay(100).Wait(); // 模拟耗时操作
});
Console.WriteLine("Processing complete.");
}
}
代码说明:
-
ParallelOptions
:
设置最大并发数为 4。 -
节流效果:
即使集合有 20 个元素,最多只有 4 个任务会同时执行。
4. 异步代码的节流
在异步编程中,可以使用 SemaphoreSlim
或 Channel
来限制并发任务的数量。这种方法适用于网络请求或 I/O 操作等异步任务。
示例 1:使用 SemaphoreSlim
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
var urls = new List<string>
{
"https://example.com",
"https://example.org",
"https://example.net"
};
using var client = new HttpClient();
using var semaphore = new SemaphoreSlim(2); // 最大并发数为 2
var tasks = urls.Select(async url =>
{
await semaphore.WaitAsync(); // 获取信号
try
{
Console.WriteLine($"Downloading {url}...");
string content = await client.GetStringAsync(url);
Console.WriteLine($"Downloaded {url}");
return content;
}
finally
{
semaphore.Release(); // 释放信号
}
});
var results = await Task.WhenAll(tasks);
Console.WriteLine("All downloads complete.");
}
}
代码说明:
-
SemaphoreSlim
:
控制同时执行的异步任务数量。 -
节流效果:
即使有多个 URL 需要下载,最多只有 2 个请求会同时发出。
示例 2:使用 Channel
Channel
是 .NET 提供的高性能生产者-消费者模型,同样可以实现节流。
using System;
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
var data = new List<int>(Enumerable.Range(1, 20));
var channel = Channel.CreateBounded<int>(5); // 最大并发数为 5
// 写入数据到通道
_ = Task.Run(async () =>
{
foreach (var item in data)
{
await channel.Writer.WriteAsync(item);
Console.WriteLine($"Produced {item}");
}
channel.Writer.Complete();
});
// 从通道中读取数据并处理
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Processing {item} on Task {Task.CurrentId}");
await Task.Delay(100); // 模拟耗时操作
}
Console.WriteLine("Processing complete.");
}
}
代码说明:
-
Channel.CreateBounded
:
创建一个容量为 5 的通道,用于限制同时处理的数据量。 -
节流效果:
生产者可以连续写入数据,但消费者最多只能处理 5 个数据项。
节流的适用场景
-
高并发任务:
限制同时执行的任务数量,防止资源耗尽。 -
网络请求:
控制并发的 HTTP 请求数量,防止服务器过载。 -
CPU 密集型任务:
避免任务过度占用 CPU,确保其他线程也能获得资源。 -
生产者-消费者模型:
限制消费者的处理速度,防止缓冲区过度填充。 -
最佳实践:
- 根据硬件性能设置合理的并发限制。
- 测试实际负载,避免限制过于宽松或过于保守。