线程
.NET 的最近版本在线程、并行、并发和异步等方面做出了巨大的改进,例如 ThreadPool 的完全重写(在 .NET 6 和 .NET 7 中),异步方法基础设施的完全重写(在 .NET Core 2.1 中),ConcurrentQueue
ThreadStatic
.NET 运行时使得将数据与线程关联起来变得很容易,这通常被称为线程本地存储(TLS)。实现这一点的最常见方式是用 [ThreadStatic] 属性注解一个静态字段(另一个用于更高级用途的是通过 ThreadLocal
private static int s_onePerProcess;
[ThreadStatic]
private static int t_onePerThread;
历史上,访问这样一个 [ThreadStatic] 字段需要一个非内联的 JIT 辅助方法(例如 CORINFO_HELP_GETSHARED_NONGCTHREADSTATIC_BASE_NOCTOR),但现在有了 dotnet/runtime#82973 和 dotnet/runtime#85619,那个辅助方法的常见和快速路径可以被内联到调用者中。我们可以通过一个简单的基准测试来看到这一点,该基准测试只是增加了一个存储在 [ThreadStatic] 中的 int。
// dotnet run -c Release -f net7.0 --filter "*" --runtimes net7.0 net8.0
// dotnet run -c Release -f net7.0 --filter "*" --runtimes nativeaot7.0 nativeaot8.0
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
BenchmarkSwitcher.FromAssembly(typeof(Tests).Assembly).Run(args);
[HideColumns("Error", "StdDev", "Median", "RatioSD")]
public partial class Tests
{
[ThreadStatic]
private static int t_value;
[Benchmark]
public int Increment() => ++t_value;
}
方法 | 运行时 | 平均值 | 比率 |
---|---|---|---|
Increment | .NET 7.0 | 8.492 ns | 1.00 |
Increment | .NET 8.0 | 1.453 ns | 0.17 |
[ThreadStatic] 同样通过 dotnet/runtime#84566 和 dotnet/runtime#87148 为 Native AOT 优化:
方法 | 运行时 | 平均值 | 比率 |
---|---|---|---|
Increment | NativeAOT 7.0 | 2.305 ns | 1.00 |
Increment | NativeAOT 8.0 | 1.325 ns | 0.57 |
ThreadPool
让我们试验一下。创建一个新的控制台应用程序,并在 .csproj 中添加 <PublishAot>true</PublishAot>
。然后将程序的全部内容设为这样:
// dotnet run -c Release -f net8.0
Task.Run(() => Console.WriteLine(Environment.StackTrace)).Wait();
这个想法是看看在 ThreadPool 线程上运行的工作项的堆栈跟踪。现在运行它,你应该会看到类似这样的内容:
at System.Environment.get_StackTrace()
at Program.<>c.<<Main>$>b__0_0()
at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
这里重要的部分是最后一行:我们看到我们是从 PortableThreadPool 被调用的,这是自 .NET 6 以来在所有操作系统上使用的托管线程池实现。现在,不是直接运行,让我们发布为 Native AOT 并运行结果应用程序(对于我们正在寻找的特定事情,这部分应该在 Windows 上完成)。
dotnet publish -c Release -r win-x64
D:\examples\tmpapp\bin\Release\net8.0\win-x64\publish\tmpapp.exe
现在,我们看到这个:
at System.Environment.get_StackTrace() + 0x21
at Program.<>c.<<Main>$>b__0_0() + 0x9
at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread, ExecutionContext, ContextCallback, Object) + 0x3d
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task&, Thread) + 0xcc
at System.Threading.ThreadPoolWorkQueue.Dispatch() + 0x289
at System.Threading.WindowsThreadPool.DispatchCallback(IntPtr, IntPtr, IntPtr) + 0x45
再次注意最后一行:“WindowsThreadPool”。在 Windows 上发布的 Native AOT 应用程序历来都使用包装 Windows 线程池的 ThreadPool 实现。工作项队列和调度代码都与线程池相同,但线程管理本身是委托给 Windows 池的。现在在 .NET 8 中,通过 dotnet/runtime#85373,Windows 上的项目可以选择使用任一池;Native AOT 应用程序可以选择使用线程池,其他应用程序可以选择使用 Windows 池。选择加入或退出很简单:在 .csproj 中的 <PropertyGroup/>
中,添加 <UseWindowsThreadPool>false</UseWindowsThreadPool>
以在 Native AOT 应用程序中选择退出,反之,在其他应用程序中使用 true 以选择加入。当使用此 MSBuild 开关时,在 Native AOT 应用程序中,不使用的任何池都可以自动被剪除。为了实验,也可以设置 DOTNET_ThreadPool_UseWindowsThreadPool 环境变量为 0 或 1,分别显式选择退出或加入。
目前还没有硬性规定哪个池可能更好;这个选项已经添加,以便开发者进行实验。我们已经看到,与线程池相比,Windows 池在更大的机器上的 I/O 扩展性不太好。然而,如果应用程序的其他地方已经大量使用了 Windows 线程池,那么整合到同一个池中可以减少过度订阅。此外,如果线程池线程经常被阻塞,Windows 线程池对这种阻塞有更多的信息,可能可以更有效地处理这些情况。我们可以通过一个简单的例子来看这一点。编译这段代码:
// dotnet run -c Release -f net8.0
using System.Diagnostics;
var sw = Stopwatch.StartNew();
var barrier = new Barrier(Environment.ProcessorCount * 2 + 1);
for (int i = 0; i < barrier.ParticipantCount; i++)
{
ThreadPool.QueueUserWorkItem(id =>
{
Console.WriteLine($"{sw.Elapsed}: {id}");
barrier.SignalAndWait();
}, i);
}
barrier.SignalAndWait();
Console.WriteLine($"Done: {sw.Elapsed}");
这是一个复杂的重现,它创建了一堆工作项,所有这些工作项都会阻塞,直到所有的工作项都被处理完毕:基本上,它接收线程池提供的每一个线程,并且永远不会归还(直到程序退出)。当我在我的机器上运行这个程序,其中 Environment.ProcessorCount 是 12,我得到的输出如下:
00:00:00.0038906: 0
00:00:00.0038911: 1
00:00:00.0042401: 4
00:00:00.0054198: 9
00:00:00.0047249: 6
00:00:00.0040724: 3
00:00:00.0044894: 5
00:00:00.0052228: 8
00:00:00.0049638: 7
00:00:00.0056831: 10
00:00:00.0039327: 2
00:00:00.0057127: 11
00:00:01.0265278: 12
00:00:01.5325809: 13
00:00:02.0471848: 14
00:00:02.5628161: 15
00:00:03.5805581: 16
00:00:04.5960218: 17
00:00:05.1087192: 18
00:00:06.1142907: 19
00:00:07.1331915: 20
00:00:07.6467355: 21
00:00:08.1614072: 22
00:00:08.6749720: 23
00:00:08.6763938: 24
Done: 00:00:08.6768608
线程池速注入了 Environment.ProcessorCount 个线程,但在此之后,它只会每秒注入一到两个额外的线程。现在,设置 DOTNET_ThreadPool_UseWindowsThreadPool 为 1,然后再试一次:
00:00:00.0034909: 3
00:00:00.0036281: 4
00:00:00.0032404: 0
00:00:00.0032727: 1
00:00:00.0032703: 2
00:00:00.0447256: 5
00:00:00.0449398: 6
00:00:00.0451899: 7
00:00:00.0454245: 8
00:00:00.0456907: 9
00:00:00.0459155: 10
00:00:00.0461399: 11
00:00:00.0463612: 12
00:00:00.0465538: 13
00:00:00.0467497: 14
00:00:00.0469477: 15
00:00:00.0471055: 16
00:00:00.0472961: 17
00:00:00.0474888: 18
00:00:00.0477131: 19
00:00:00.0478795: 20
00:00:00.0480844: 21
00:00:00.0482900: 22
00:00:00.0485110: 23
00:00:00.0486981: 24
Done: 00:00:00.0498603
Windows 池在这里注入线程的速度更快。这是好还是坏,取决于你的场景。如果你发现自己为你的应用程序设置了一个非常高的最小线程池线程数,你可能会想尝试这个选项。
Tasks
即使在之前的版本中对 async/await 进行了所有的改进,这个版本中的 async 方法仍然开销更低,无论它们是同步完成还是异步完成。
当一个 async Task/Task
// dotnet run -c Release -f net7.0 --filter "*" --runtimes net7.0 net8.0
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
BenchmarkSwitcher.FromAssembly(typeof(Tests).Assembly).Run(args);
[HideColumns("Error", "StdDev", "Median", "RatioSD")]
[MemoryDiagnoser(displayGenColumns: false)]
public class Tests
{
[Benchmark] public async Task<TimeSpan> ZeroTimeSpan() => TimeSpan.Zero;
[Benchmark] public async Task<DateTime> MinDateTime() => DateTime.MinValue;
[Benchmark] public async Task<Guid> EmptyGuid() => Guid.Empty;
[Benchmark] public async Task<DayOfWeek> Sunday() => DayOfWeek.Sunday;
[Benchmark] public async Task<decimal> ZeroDecimal() => 0m;
[Benchmark] public async Task<double> ZeroDouble() => 0;
[Benchmark] public async Task<float> ZeroFloat() => 0;
[Benchmark] public async Task<Half> ZeroHalf() => (Half)0f;
[Benchmark] public async Task<(int, int)> ZeroZeroValueTuple() => (0, 0);
}
Method | Runtime | Mean | Ratio | Allocated | Alloc Ratio |
---|---|---|---|---|---|
ZeroTimeSpan | .NET 7.0 | 31.327 ns | 1.00 | 72 B | 1.00 |
ZeroTimeSpan | .NET 8.0 | 8.851 ns | 0.28 | – | 0.00 |
MinDateTime | .NET 7.0 | 31.457 ns | 1.00 | 72 B | 1.00 |
MinDateTime | .NET 8.0 | 8.277 ns | 0.26 | – | 0.00 |
EmptyGuid | .NET 7.0 | 32.233 ns | 1.00 | 80 B | 1.00 |
EmptyGuid | .NET 8.0 | 9.013 ns | 0.28 | – | 0.00 |
Sunday | .NET 7.0 | 30.907 ns | 1.00 | 72 B | 1.00 |
Sunday | .NET 8.0 | 8.235 ns | 0.27 | – | 0.00 |
ZeroDecimal | .NET 7.0 | 33.109 ns | 1.00 | 80 B | 1.00 |
ZeroDecimal | .NET 8.0 | 13.110 ns | 0.40 | – | 0.00 |
ZeroDouble | .NET 7.0 | 30.863 ns | 1.00 | 72 B | 1.00 |
ZeroDouble | .NET 8.0 | 8.568 ns | 0.28 | – | 0.00 |
ZeroFloat | .NET 7.0 | 31.025 ns | 1.00 | 72 B | 1.00 |
ZeroFloat | .NET 8.0 | 8.531 ns | 0.28 | – | 0.00 |
ZeroHalf | .NET 7.0 | 33.906 ns | 1.00 | 72 B | 1.00 |
ZeroHalf | .NET 8.0 | 9.008 ns | 0.27 | – | 0.00 |
ZeroZeroValueTuple | .NET 7.0 | 33.339 ns | 1.00 | 72 B | 1.00 |
ZeroZeroValueTuple | .NET 8.0 | 11.274 ns | 0.34 | – | 0.00 |
这些更改帮助一些异步方法在同步完成时变得紧凑。其他的更改帮助几乎所有的异步方法在异步完成时变得紧凑。当一个异步方法第一次暂停,假设它返回的是 Task/Task
- 一个用于保存 C# 编译器生成的 TStateMachine 状态机结构。
- 一个用于缓存指向 MoveNext 的 Action 委托。
- 一个用于存储 ExecutionContext,以便流向下一个 MoveNext 调用。
如果我们可以减少所需的字段,我们可以通过分配更小的对象而不是更大的对象,使每个异步方法的成本降低。这正是 dotnet/runtime#83696 和 dotnet/runtime#83737 所完成的,它们一起从每个这样的异步方法任务的大小中削减了 16 字节(在 64 位进程中)。如何实现呢?
C# 语言允许任何东西都可以被等待,只要它遵循正确的模式,暴露一个返回具有正确形状的类型的 GetAwaiter() 方法。该模式包括一组接受 Action 委托的 “OnCompleted” 方法,使异步方法构建器能够向等待器提供一个继续操作,这样当等待的操作完成时,它可以调用 Action 来恢复方法的处理。因此,AsyncStateMachineBox 类型上有一个字段,用于缓存一个懒加载创建的指向其 MoveNext 方法的 Action 委托;该 Action 在第一次需要它的暂停等待期间创建,然后可以用于所有后续的等待,这样 Action 在异步方法的生命周期内最多分配一次,无论调用暂停多少次。然而,如果状态机等待的东西不是已知的等待器,那么只需要委托;运行时有快速路径,避免在等待所有内置等待器时需要该 Action。有趣的是,Task 本身有一个用于存储委托的字段,而该字段只在创建 Task 来调用委托时使用(例如,Task.Run,ContinueWith 等)。由于今天分配的大多数任务都来自异步方法,这意味着大多数任务都有一个浪费的字段。我们发现我们可以将这个基础字段用于这个缓存的 MoveNext Action,使得这个字段对几乎所有的任务都相关,并允许我们删除状态机箱上的额外 Action 字段。
在基础 Task 上还有另一个在异步方法中未使用的现有字段:状态对象字段。当你使用 StartNew 或 ContinueWith 方法创建一个 Task 时,你可以提供一个对象状态,然后将其传递给 Task 的委托。然而,在异步方法中,这个字段就在那里,未被使用,孤独,被遗忘,悲伤。因此,我们可以将 ExecutionContext 存储在这个现有的状态字段中(小心不要让它通过通常暴露对象状态的 Task 的 AsyncState 属性暴露出来)。
我们可以通过一个简单的基准测试来看到去掉这两个字段的效果:
// dotnet run -c Release -f net7.0 --filter "*" --runtimes net7.0 net8.0
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
BenchmarkSwitcher.FromAssembly(typeof(Tests).Assembly).Run(args);
[HideColumns("Error", "StdDev", "Median", "RatioSD")]
[MemoryDiagnoser(displayGenColumns: false)]
public class Tests
{
[Benchmark]
public async Task YieldOnce() => await Task.Yield();
}
Method | Runtime | Mean | Ratio | Allocated | Alloc Ratio |
---|---|---|---|---|---|
YieldOnce | .NET 7.0 | 918.6 ns | 1.00 | 112 B | 1.00 |
YieldOnce | .NET 8.0 | 865.8 ns | 0.94 | 96 B | 0.86 |
正如我们预测的,减少了16字节。
异步方法的开销也以其他方式减少。例如,dotnet/runtime#82181 缩小了用作自定义 IValueTaskSource/IValueTaskSource
[Flags]
public enum ConfigureAwaitOptions
{
None = 0,
ContinueOnCapturedContext = 1,
SuppressThrowing = 2,
ForceYielding = 4,
}
ContinueOnCapturedContext 你知道;这就是今天的 ConfigureAwait(true)。ForceYielding 是在各种情况下不时出现的东西,但本质上你正在等待某件事,而不是在你等待它的时候如果它已经完成了就同步地继续,你实际上希望系统假装它没有完成,即使它已经完成了。然后,而不是同步地继续,延续总是会从调用者异步地运行。这在各种方式上都可以作为优化。考虑一下在 .NET 7 的 SocketsHttpHandler 的 HTTP/2 实现中的这段代码:
private void DisableHttp2Connection(Http2Connection connection)
{
_ = Task.Run(async () => // fire-and-forget
{
bool usable = await connection.WaitForAvailableStreamsAsync().ConfigureAwait(false);
... // other stuff
};
}
在 .NET 8 中使用 ForceYielding,代码现在是:
private void DisableHttp2Connection(Http2Connection connection)
{
_ = DisableHttp2ConnectionAsync(connection); // fire-and-forget
async Task DisableHttp2ConnectionAsync(Http2Connection connection)
{
bool usable = await connection.WaitForAvailableStreamsAsync().ConfigureAwait(ConfigureAwaitOptions.ForceYielding);
.... // other stuff
}
}
我们没有单独的 Task.Run,而是在 WaitForAvailableStreamsAsync 返回的任务的 await 上搭了个便车(我们知道它会快速返回任务给我们),确保在调用 DisableHttp2Connection 之后的工作不会同步运行。或者想象一下你的代码是这样做的:
return Task.Run(WorkAsync);
static async Task WorkAsync()
{
while (...) await Something();
}
这是使用 Task.Run 来排队一个异步方法的调用。这个异步方法导致一个任务被分配,加上 Task.Run 导致一个任务被分配,加上需要将一个工作项排队到线程池,所以至少有三个分配。现在,这个相同的功能可以写成:
return WorkAsync();
static async Task WorkAsync()
{
await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding);
while (...) await Something();
}
而不是三个分配,我们最终只有一个:异步任务的分配。这是因为在之前的版本中引入的所有优化中,状态机箱对象也将被排队到线程池。
然而,这个支持中最有价值的添加可能是 SuppressThrowing。它的作用就像它听起来的那样:当你等待一个任务完成失败或取消,这样通常 await 会传播异常,它不会。所以,例如,在 System.Text.Json 中,我们之前有这样的代码:
// Exceptions should only be propagated by the resuming converter
try
{
await state.PendingTask.ConfigureAwait(false);
}
catch { }
现在我们有这样的代码:
// Exceptions should only be propagated by the resuming converter
await state.PendingTask.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
或者在 SemaphoreSlim 中,我们有这样的代码:
await new ConfiguredNoThrowAwaiter<bool>(asyncWaiter.WaitAsync(TimeSpan.FromMilliseconds(millisecondsTimeout), cancellationToken));
if (cancellationToken.IsCancellationRequested)
{
// If we might be running as part of a cancellation callback, force the completion to be asynchronous.
await TaskScheduler.Default;
}
private readonly struct ConfiguredNoThrowAwaiter<T> : ICriticalNotifyCompletion, IStateMachineBoxAwareAwaiter
{
private readonly Task<T> _task;
public ConfiguredNoThrowAwaiter(Task<T> task) => _task = task;
public ConfiguredNoThrowAwaiter<T> GetAwaiter() => this;
public bool IsCompleted => _task.IsCompleted;
public void GetResult() => _task.MarkExceptionsAsHandled();
public void OnCompleted(Action continuation) => TaskAwaiter.OnCompletedInternal(_task, continuation, continueOnCapturedContext: false, flowExecutionContext: true);
public void UnsafeOnCompleted(Action continuation) => TaskAwaiter.OnCompletedInternal(_task, continuation, continueOnCapturedContext: false, flowExecutionContext: false);
public void AwaitUnsafeOnCompleted(IAsyncStateMachineBox box) => TaskAwaiter.UnsafeOnCompletedInternal(_task, box, continueOnCapturedContext: false);
}
internal readonly struct TaskSchedulerAwaiter : ICriticalNotifyCompletion
{
private readonly TaskScheduler _scheduler;
public TaskSchedulerAwaiter(TaskScheduler scheduler) => _scheduler = scheduler;
public bool IsCompleted => false;
public void GetResult() { }
public void OnCompleted(Action continuation) => Task.Factory.StartNew(continuation, CancellationToken.None, TaskCreationOptions.DenyChildAttach, _scheduler);
public void UnsafeOnCompleted(Action continuation)
{
if (ReferenceEquals(_scheduler, Default))
{
ThreadPool.UnsafeQueueUserWorkItem(s => s(), continuation, preferLocal: true);
}
else
{
OnCompleted(continuation);
}
}
}
现在我们只有这样的代码:
await ((Task)asyncWaiter.WaitAsync(TimeSpan.FromMilliseconds(millisecondsTimeout), cancellationToken)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
if (cancellationToken.IsCancellationRequested)
{
// If we might be running as part of a cancellation callback, force the completion to be asynchronous.
await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding);
}
值得注意的是里面的 (Task) 强制类型转换。WaitAsync 返回一个 Task
上面的 SemaphoreSlim 示例也使用了新的 ConfigureAwaitOptions 来替换在 .NET 8 中添加的之前的优化。dotnet/runtime#83294 在 ConfiguredNoThrowAwaiter
// dotnet run -c Release -f net7.0 --filter "*" --runtimes net7.0 net8.0
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
BenchmarkSwitcher.FromAssembly(typeof(Tests).Assembly).Run(args);
[HideColumns("Error", "StdDev", "Median", "RatioSD")]
[MemoryDiagnoser(displayGenColumns: false)]
public class Tests
{
private readonly CancellationToken _token = new CancellationTokenSource().Token;
private readonly SemaphoreSlim _sem = new SemaphoreSlim(0);
private readonly Task[] _tasks = new Task[100];
[Benchmark]
public Task WaitAsync()
{
for (int i = 0; i < _tasks.Length; i++)
{
_tasks[i] = _sem.WaitAsync(_token);
}
_sem.Release(_tasks.Length);
return Task.WhenAll(_tasks);
}
}
Method | Runtime | Mean | Ratio | Allocated | Alloc Ratio |
---|---|---|---|---|---|
WaitAsync | .NET 7.0 | 85.48 us | 1.00 | 44.64 KB | 1.00 |
WaitAsync | .NET 8.0 | 69.37 us | 0.82 | 36.02 KB | 0.81 |
Task 上的其他操作也有其他改进。dotnet/runtime#81065 从 Task.WhenAll 中移除了一个防御性的 Task[] 分配。它之前做了一个防御性的复制,这样它就可以在复制上验证是否有任何元素是 null(一个复制,因为另一个线程可能错误地并发地将元素置为 null);这是在面对多线程误用的情况下为参数验证付出的大代价。相反,该方法仍然会验证输入中是否有 null,如果一个 null 因为输入集合被错误地并发地与 WhenAll 的同步调用同时变异而滑过,那么它在那个时候就会忽略 null。在做这些改变的时候,PR 也特别处理了 List
// dotnet run -c Release -f net7.0 --filter "*" --runtimes net7.0 net8.0
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using System.Collections.ObjectModel;
using System.Runtime.CompilerServices;
BenchmarkSwitcher.FromAssembly(typeof(Tests).Assembly).Run(args);
[HideColumns("Error", "StdDev", "Median", "RatioSD")]
[MemoryDiagnoser(displayGenColumns: false)]
public class Tests
{
[Benchmark]
public void WhenAll_Array()
{
AsyncTaskMethodBuilder atmb1 = AsyncTaskMethodBuilder.Create();
AsyncTaskMethodBuilder atmb2 = AsyncTaskMethodBuilder.Create();
Task whenAll = Task.WhenAll(atmb1.Task, atmb2.Task);
atmb1.SetResult();
atmb2.SetResult();
whenAll.Wait();
}
[Benchmark]
public void WhenAll_List()
{
AsyncTaskMethodBuilder atmb1 = AsyncTaskMethodBuilder.Create();
AsyncTaskMethodBuilder atmb2 = AsyncTaskMethodBuilder.Create();
Task whenAll = Task.WhenAll(new List<Task>(2) { atmb1.Task, atmb2.Task });
atmb1.SetResult();
atmb2.SetResult();
whenAll.Wait();
}
[Benchmark]
public void WhenAll_Collection()
{
AsyncTaskMethodBuilder atmb1 = AsyncTaskMethodBuilder.Create();
AsyncTaskMethodBuilder atmb2 = AsyncTaskMethodBuilder.Create();
Task whenAll = Task.WhenAll(new ReadOnlyCollection<Task>(new[] { atmb1.Task, atmb2.Task }));
atmb1.SetResult();
atmb2.SetResult();
whenAll.Wait();
}
[Benchmark]
public void WhenAll_Enumerable()
{
AsyncTaskMethodBuilder atmb1 = AsyncTaskMethodBuilder.Create();
AsyncTaskMethodBuilder atmb2 = AsyncTaskMethodBuilder.Create();
var q = new Queue<Task>(2);
q.Enqueue(atmb1.Task);
q.Enqueue(atmb2.Task);
Task whenAll = Task.WhenAll(q);
atmb1.SetResult();
atmb2.SetResult();
whenAll.Wait();
}
}
Method | Runtime | Mean | Ratio | Allocated | Alloc Ratio |
---|---|---|---|---|---|
WhenAll_Array | .NET 7.0 | 210.8 ns | 1.00 | 304 B | 1.00 |
WhenAll_Array | .NET 8.0 | 160.9 ns | 0.76 | 264 B | 0.87 |
WhenAll_List | .NET 7.0 | 296.4 ns | 1.00 | 376 B | 1.00 |
WhenAll_List | .NET 8.0 | 185.5 ns | 0.63 | 296 B | 0.79 |
WhenAll_Collection | .NET 7.0 | 271.3 ns | 1.00 | 360 B | 1.00 |
WhenAll_Collection | .NET 8.0 | 199.7 ns | 0.74 | 328 B | 0.91 |
WhenAll_Enumerable | .NET 7.0 | 328.2 ns | 1.00 | 472 B | 1.00 |
WhenAll_Enumerable | .NET 8.0 | 230.0 ns | 0.70 | 432 B | 0.92 |
泛型 WhenAny 也在 dotnet/runtime#88154 的一部分中得到了改进,它从一个额外的继续中移除了一个任务分配,这是一个实现细节。这是我最喜欢的 PR 类型之一:它不仅提高了性能,还使代码更清晰,代码量也更少。
// dotnet run -c Release -f net7.0 --filter "*" --runtimes net7.0 net8.0
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using System.Runtime.CompilerServices;
BenchmarkSwitcher.FromAssembly(typeof(Tests).Assembly).Run(args);
[HideColumns("Error", "StdDev", "Median", "RatioSD")]
[MemoryDiagnoser(displayGenColumns: false)]
public class Tests
{
[Benchmark]
public Task<Task<int>> WhenAnyGeneric_ListNotCompleted()
{
AsyncTaskMethodBuilder<int> atmb1 = default;
AsyncTaskMethodBuilder<int> atmb2 = default;
AsyncTaskMethodBuilder<int> atmb3 = default;
Task<Task<int>> wa = Task.WhenAny(new List<Task<int>>() { atmb1.Task, atmb2.Task, atmb3.Task });
atmb3.SetResult(42);
return wa;
}
}
Method | Runtime | Mean | Ratio | Allocated | Alloc Ratio |
---|---|---|---|---|---|
WhenAnyGeneric_ListNotCompleted | .NET 7.0 | 555.0 ns | 1.00 | 704 B | 1.00 |
WhenAnyGeneric_ListNotCompleted | .NET 8.0 | 260.3 ns | 0.47 | 504 B | 0.72 |
关于任务的最后一个例子,虽然这个例子有点不同,因为它特别是关于提高测试性能(和测试可靠性)。假设你有一个像这样的方法:
public static async Task LogAfterDelay(Action<string, TimeSpan> log)
{
long startingTimestamp = Stopwatch.GetTimestamp();
await Task.Delay(TimeSpan.FromSeconds(30));
log("Completed", Stopwatch.GetElapsedTime(startingTimestamp));
}
这个方法的目的是等待30秒,然后记录一个完成消息以及方法观察到的过去的时间。这显然是你在真实应用中会找到的功能的简化,但你可以从中推断出你可能写过的代码。你如何测试这呢?也许你已经写了像这样的测试:
[Fact]
public async Task LogAfterDelay_Success_CompletesAfterThirtySeconds()
{
TimeSpan ts = default;
Stopwatch sw = Stopwatch.StartNew();
await LogAfterDelay((message, time) => ts = time);
sw.Stop();
Assert.InRange(ts, TimeSpan.FromSeconds(30), TimeSpan.MaxValue);
Assert.InRange(sw.Elapsed, TimeSpan.FromSeconds(30), TimeSpan.MaxValue);
}
这验证了方法在其日志中包含了至少30秒的值,也验证了至少过去了30秒。问题是什么?从性能的角度来看,问题是这个测试必须等待30秒!这对于本来可以近乎瞬间完成的东西来说,是大量的开销。现在想象一下延迟更长,比如10分钟,或者我们有一堆测试都需要做同样的事情。这使得良好和彻底的测试变得无法承受。
为了解决这类情况,许多开发者引入了他们自己的时间流动的抽象。现在在 .NET 8 中,这已经不再需要了。从 dotnet/runtime#83604 开始,核心库包括 System.TimeProvider。这个抽象基类抽象了时间的流动,有获取当前 UTC 时间、获取当前本地时间、获取当前时区、获取高频时间戳和创建计时器(反过来返回新的 System.Threading.ITimer,支持改变计时器的滴答间隔)的成员。然后像 Task.Delay 和 CancellationTokenSource 的构造函数这样的核心库成员有新的接受 TimeProvider 的重载,并使用它进行时间相关的功能,而不是硬编码到 DateTime.UtcNow、Stopwatch 或 System.Threading.Timer。有了这个,我们可以重写我们之前的方法:
public static async Task LogAfterDelay(Action<string, TimeSpan> log, TimeProvider provider)
{
long startingTimestamp = provider.GetTimestamp();
await Task.Delay(TimeSpan.FromSeconds(30), provider);
log("Completed", provider.GetElapsedTime(startingTimestamp));
}
它已经增加了接受 TimeProvider 参数的功能,虽然在使用依赖注入(DI)机制的系统中,它可能只是从 DI 中获取一个 TimeProvider 单例。然后它使用 provider 上的对应成员,而不是使用 Stopwatch.GetTimestamp 或 Stopwatch.GetElapsedTime,而不是使用只接受持续时间的 Task.Delay 重载,它使用也接受 TimeProvider 的重载。在生产中使用时,可以传递 TimeProvider.System,这是基于系统时钟实现的(如果不提供 TimeProvider,你会得到的就是这个),但在测试中,可以传递一个自定义实例,一个手动控制观察到的时间流动的实例。在 Microsoft.Extensions.TimeProvider.Testing NuGet 包中就存在这样一个自定义的 TimeProvider:FakeTimeProvider。下面是一个使用它和我们的 LogAfterDelay 方法的例子:
// dotnet run -c Release -f net8.0 --filter "*"
using Microsoft.Extensions.Time.Testing;
using System.Diagnostics;
Stopwatch sw = Stopwatch.StartNew();
var fake = new FakeTimeProvider();
Task t = LogAfterDelay((message, time) => Console.WriteLine($"{message}: {time}"), fake);
fake.Advance(TimeSpan.FromSeconds(29));
Console.WriteLine(t.IsCompleted);
fake.Advance(TimeSpan.FromSeconds(1));
Console.WriteLine(t.IsCompleted);
Console.WriteLine($"Actual execution time: {sw.Elapsed}");
static async Task LogAfterDelay(Action<string, TimeSpan> log, TimeProvider provider)
{
long startingTimestamp = provider.GetTimestamp();
await Task.Delay(TimeSpan.FromSeconds(30), provider);
log("Completed", provider.GetElapsedTime(startingTimestamp));
}
当我运行这个时,它输出了以下内容:
False
Completed: 00:00:30
True
Actual execution time: 00:00:00.0119943
换句话说,在手动推进时间29秒后,操作还没有完成。然后我们手动推进了一秒钟,操作完成了。它报告说过去了30秒,但实际上,整个操作只花了实际墙钟时间的0.01秒。
有了这个,让我们移动到 Parallel...
Parallel
.NET 6 在 Parallel 中引入了新的异步方法,形式为 Parallel.ForEachAsync。在它的引入之后,我们开始收到对于 for 循环的等价物的请求,所以现在在 .NET 8 中,通过 dotnet/runtime#84804,这个类获得了一组 Parallel.ForAsync 方法。这些以前可以通过传入一个从像 Enumerable.Range 这样的方法创建的 IEnumerable
await Parallel.ForEachAsync(Enumerable.Range(0, 1_000), async i =>
{
...
});
但现在你可以更简单、更便宜地实现同样的功能:
await Parallel.ForAsync(0, 1_000, async i =>
{
...
});
这最终会更便宜,因为你不需要分配可枚举的/枚举器,而且多个工作器试图剥离下一个迭代的同步可以以一种更不昂贵的方式完成,一个 Interlocked 而不是使用像 SemaphoreSlim 这样的异步锁。
// dotnet run -c Release -f net8.0 --filter "*"
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
BenchmarkSwitcher.FromAssembly(typeof(Tests).Assembly).Run(args);
[HideColumns("Error", "StdDev", "Median", "RatioSD")]
[MemoryDiagnoser(displayGenColumns: false)]
public class Tests
{
[Benchmark(Baseline = true)]
public Task ForEachAsync() => Parallel.ForEachAsync(Enumerable.Range(0, 1_000_000), (i, ct) => ValueTask.CompletedTask);
[Benchmark]
public Task ForAsync() => Parallel.ForAsync(0, 1_000_000, (i, ct) => ValueTask.CompletedTask);
}
方法 | 平均 | 比率 | 分配 | 分配比率 |
---|---|---|---|---|
ForEachAsync | 589.5 ms | 1.00 | 87925272 B | 1.000 |
ForAsync | 147.5 ms | 0.25 | 792 B | 0.000 |
这里的分配列特别明显,也有点误导。为什么 ForEachAsync 在分配方面这么糟糕?这是因为同步机制。这里的测试代理没有执行任何工作,所以所有的时间都花在了源上。在 Parallel.ForAsync 的情况下,获取下一个值是一个单独的 Interlocked 指令。在 Parallel.ForEachAsync 的情况下,它是一个 WaitAsync,而在很多竞争下,许多 WaitAsync 调用将异步完成,导致分配。在一个真实的工作负载中,其中的主体代理正在执行真实的工作,同步或异步,那么同步的影响就会小得多。这里我把调用改为了一个简单的 Task.Delay,延迟1ms(并且也显著降低了迭代次数):
// dotnet run -c Release -f net8.0 --filter "*"
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
BenchmarkSwitcher.FromAssembly(typeof(Tests).Assembly).Run(args);
[HideColumns("Error", "StdDev", "Median", "RatioSD")]
[MemoryDiagnoser(displayGenColumns: false)]
public class Tests
{
[Benchmark(Baseline = true)]
public Task ForEachAsync() => Parallel.ForEachAsync(Enumerable.Range(0, 100), async (i, ct) => await Task.Delay(1));
[Benchmark]
public Task ForAsync() => Parallel.ForAsync(0, 100, async (i, ct) => await Task.Delay(1));
}
GitHub Copilot: 和这两种方法实际上是一样的:
方法 | 平均 | 比率 | 分配 | 分配比率 |
---|---|---|---|---|
ForEachAsync | 89.39 ms | 1.00 | 27.96 KB | 1.00 |
ForAsync | 89.44 ms | 1.00 | 27.84 KB | 1.00 |
有趣的是,这个 Parallel.ForAsync 方法也是核心库中第一个基于 .NET 7 中引入的泛型数学接口的公共方法之一:
public static Task ForAsync<T>(T fromInclusive, T toExclusive, Func<T, CancellationToken, ValueTask> body)
where T : notnull, IBinaryInteger<T>
.net7 线程的性能优化部分
Threading 是一种横切关注点,影响着每一个应用程序,因此线程空间的变化可能会产生广泛的影响。这个版本看到了 ThreadPool 本身的两个非常重大的变化;dotnet/runtime#64834 将 "IO pool" 完全切换到使用完全托管的实现(尽管之前工作池已完全切换到托管模式,但之前的 IO 池仍然使用原生代码),而 dotnet/runtime#71864 同样将计时器实现从基于原生的切换为完全基于托管代码。这两个变化可能会影响性能,而且前者已经在更大规模硬件上进行了演示,但在很大程度上,这并不是它们的主要目标。相反,其他 PRs 一直专注于提高吞吐量。
特别值得一提的是 dotnet/runtime#69386。ThreadPool 拥有一个“全局队列”,任何线程都可以将工作排入其中,然后池中的每个线程都有自己的“本地队列”(任何线程都可以从中出列,但只有拥有它的线程才能将工作排入其中)。当工作线程需要另一个要处理的工作时,首先检查自己的本地队列,然后检查全局队列,仅当两个地方都找不到工作时,它才会去检查所有其他线程的本地队列,以查看是否可以帮助减轻它们的负载。随着机器的核心数量和线程数量不断增加,对这些共享队列(特别是全局队列)的争用也越来越多。该 PR 针对这样规模更大的机器进行了处理,一旦机器达到一定阈值(目前是32个处理器),就引入了额外的全局队列。这有助于将访问分配到多个队列中,从而减少争用。
另一个是 dotnet/runtime#57885。为了协调线程,当工作项被入队和出队时,池会向其线程发出请求,让它们知道有可用的工作要做。然而,这常常导致过度订阅,即当系统未满载时,会有更多的线程争抢尝试获取工作项。这反过来会表现为吞吐量下降。这个改变彻底改变了如何请求线程,这样一次只请求一个额外的线程,当该线程出队其第一个工作项后,如果还有剩余的工作,它可以请求一个额外的线程,然后那个线程可以请求一个额外的线程,依此类推。这是我们性能测试套件中的一个性能测试(我已经将其简化,去掉了测试中的一堆配置选项,但它仍然准确地是其中的一个配置)。乍一看你可能会想,“嘿,这是一个关于 ArrayPool 的性能测试,为什么它会出现在一个关于线程的讨论中?”你是对的,这是一个专注于 ArrayPool 的性能测试。然而,如前所述,线程影响一切,在这种情况下,那个在中间的 await Task.Yield() 导致此方法的剩余部分被排队到 ThreadPool 中执行。并且,由于测试的结构,执行“真实的工作”与线程池线程争抢获取下一个任务的 CPU 周期竞争,当移动到 .NET 7 时,它显示出可衡量的改进。
private readonly byte[][] _nestedArrays = new byte[8][];
private const int Iterations = 100_000;
private static byte IterateAll(byte[] arr)
{
byte ret = default;
foreach (byte item in arr) ret = item;
return ret;
}
[Benchmark(OperationsPerInvoke = Iterations)]
public async Task MultipleSerial()
{
for (int i = 0; i < Iterations; i++)
{
for (int j = 0; j < _nestedArrays.Length; j++)
{
_nestedArrays[j] = ArrayPool<byte>.Shared.Rent(4096);
_nestedArrays[j].AsSpan().Clear();
}
await Task.Yield();
for (int j = _nestedArrays.Length - 1; j >= 0; j--)
{
IterateAll(_nestedArrays[j]);
ArrayPool<byte>.Shared.Return(_nestedArrays[j]);
}
}
}
方法 | 运行时 | 平均值 | 比率 |
---|---|---|---|
MultipleSerial | .NET 6.0 | 14.340 us | 1.00 |
MultipleSerial | .NET 7.0 | 9.262 us | 0.65 |
ThreadPool 之外也有一些改进。一个显著的变化是在 dotnet/runtime#68790 中处理 AsyncLocal
除了 ThreadPool 外,其他地方也有所改进。一个值得注意的变化是在处理 AsyncLocal
private AsyncLocal<int> asyncLocal1 = new AsyncLocal<int>();
private AsyncLocal<int> asyncLocal2 = new AsyncLocal<int>();
private AsyncLocal<int> asyncLocal3 = new AsyncLocal<int>();
private AsyncLocal<int> asyncLocal4 = new AsyncLocal<int>();
[Benchmark(OperationsPerInvoke = 4000)]
public void Update()
{
for (int i = 0; i < 1000; i++)
{
asyncLocal1.Value++;
asyncLocal2.Value++;
asyncLocal3.Value++;
asyncLocal4.Value++;
}
}
方法 | 运行时 | 平均值 | 比率 | 代码大小 | 分配 | 分配比率 |
---|---|---|---|---|---|---|
Update | .NET 6.0 | 61.96 ns | 1.00 | 1,272 B | 176 B | 1.00 |
Update | .NET 7.0 | 61.92 ns | 1.00 | 1,832 B | 144 B | 0.82 |
另一个有价值的修复是在 dotnet/runtime#70165 中对锁定的处理。这个特定的改进有点难以用 benchmarkdotnet 来演示,所以只需尝试运行这个程序,先在 .NET 6 上运行,然后在 .NET 7 上运行:
using System.Diagnostics;
var rwl = new ReaderWriterLockSlim();
var tasks = new Task[100];
int count = 0;
DateTime end = DateTime.UtcNow + TimeSpan.FromSeconds(10);
while (DateTime.UtcNow < end)
{
for (int i = 0; i < 100; ++i)
{
tasks[i] = Task.Run(() =>
{
var sw = Stopwatch.StartNew();
rwl.EnterReadLock();
rwl.ExitReadLock();
sw.Stop();
if (sw.ElapsedMilliseconds >= 10)
{
Console.WriteLine(Interlocked.Increment(ref count));
}
});
}
Task.WaitAll(tasks);
}
这个程序简单地启动了100个任务,每个任务都进入和退出一个读写锁,等待它们全部完成,然后再重复这个过程,持续10秒。它还计时进入和退出锁所需的时间,并在必须等待至少15毫秒时写入警告。当我在 .NET 6 上运行这个程序时,我得到了大约100次进入/退出锁需要 >= 10 毫秒的情况。在 .NET 7 上,我得到了0次。为什么会有这种差异呢?ReaderWriterLockSlim 的实现有自己的旋转循环实现,这个旋转循环试图在旋转时混合各种操作,范围从调用 Thread.SpinWait 到 Thread.Sleep(0) 到 Thread.Sleep(1)。问题在于 Thread.Sleep(1)。这表示“让这个线程睡眠1毫秒”;然而,操作系统对这种时间有最终的决定权,而在 Windows 上,默认的睡眠时间会接近15毫秒(在 Linux 上稍低但仍然相当高)。因此,每次锁的争用足够强烈以至于强制它调用 Thread.Sleep(1),我们就会至少延迟15毫秒,如果不是更多。上述 PR 通过消除对 Thread.Sleep(1) 的使用来解决这个问题。
最后要提到的与线程相关的变化是:dotnet/runtime#68639。这个是特定于 Windows 的。Windows 有处理器组的概念,每个处理器组可以有多达64个核心,且默认情况下,当一个进程运行时,它被分配一个特定的处理器组,并且只能使用该组中的核心。在 .NET 7 中,运行时将其默认值翻转,以便默认情况下尽可能使用所有处理器组。
net6 线程部分的改进
我们来谈谈线程,从 ThreadPool 开始。
有时候,性能优化是关于消除不必要的工作,或者做出优化常见情况而稍微降低小众情况的权衡,或者利用新的低级功能来更快地做某事,或者其他许多事情。但有时,性能优化是关于找到帮助糟糕但常见的代码变得稍微不那么糟糕的方法。
线程池的工作很简单:运行工作项。为了做到这一点,线程池在其核心需要两件事:一个待处理的工作队列,和一组处理它们的线程。我们可以轻易地编写一个功能性的,简单的线程池:
static class SimpleThreadPool
{
private static BlockingCollection<Action> s_work = new();
public static void QueueUserWorkItem(Action action) => s_work.Add(action);
static SimpleThreadPool()
{
for (int i = 0; i < Environment.ProcessorCount; i++)
new Thread(() =>
{
while (true) s_work.Take()();
}) { IsBackground = true }.Start();
}
}
嗯,这是一个功能性的线程池。但是...并不是一个很好的线程池。一个好的线程池最难的部分在于线程的管理,特别是在任何给定的时间点确定应该有多少线程在服务工作队列。线程太多,你可能会让系统停滞不前,因为所有的线程都在争夺系统的资源,通过上下文切换增加了巨大的开销,并且由于缓存抖动而相互干扰。线程太少,你可能会让系统停滞不前,因为工作项没有被快速处理,或者更糟糕的是,正在运行的工作项被阻塞等待其他工作项运行,但没有足够的额外线程来运行它们。.NET ThreadPool 有多种机制来确定在任何时间点应该有多少线程在运行。首先,它有一个饥饿检测机制。这个机制是一个相当直接的门,每秒触发一次或两次,检查是否有任何进展在从池的队列中移除项目:如果没有进展,意味着没有被出队,池假设系统是饥饿的并注入一个额外的线程。其次,它有一个爬山算法,这个算法通过操纵可用的线程数量,不断寻求最大化工作项的吞吐量;每完成 N 个工作项后,它评估增加或减少一个线程到/从循环中是否有助于或损害工作项的吞吐量,从而使其适应系统当前的需求。然而,爬山机制有一个弱点:为了正确地完成它的工作,工作项需要完成...如果工作项没有完成,比如说,池中的所有线程都被阻塞,爬山就暂时无用,注入额外线程的唯一机制就是饥饿机制,这个机制(按设计)相当慢。
这种情况可能会出现在一个系统被“同步阻塞异步”工作淹没的时候,这个术语是用来指启动异步工作然后同步阻塞等待它完成的;在常见的情况下,这样的反模式最终会阻塞一个线程池线程,这个线程依赖于另一个线程池线程做工作以便解除第一个线程的阻塞,这可能很快导致所有的线程池线程都被阻塞,直到注入足够的线程使每个人都能向前进展。这样的“同步阻塞异步”代码,通常表现为调用一个异步方法然后阻塞等待返回的任务(例如 int i = GetValueAsync().Result)在生产代码中通常被认为是不可接受的,这些代码意味着要可扩展,但有时候它是无法避免的,例如你被迫实现一个同步的接口,而你手头上唯一可以用来实现的功能只能通过异步方法来暴露。
我们可以通过一个糟糕的复现来看到这个影响:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
var tcs = new TaskCompletionSource();
var tasks = new List<Task>();
for (int i = 0; i < Environment.ProcessorCount * 4; i++)
{
int id = i;
tasks.Add(Task.Run(() =>
{
Console.WriteLine($"{DateTime.UtcNow:MM:ss.ff}: {id}");
tcs.Task.Wait();
}));
}
tasks.Add(Task.Run(() => tcs.SetResult()));
var sw = Stopwatch.StartNew();
Task.WaitAll(tasks.ToArray());
Console.WriteLine($"Done: {sw.Elapsed}");
这将一堆工作项排队到线程池,所有这些工作项都阻塞等待一个任务完成,但是那个任务不会完成,直到最后一个排队的工作项完成它以解锁所有其他的工作项。因此,我们最终阻塞了池中的每一个线程,等待线程池检测到饥饿并注入另一个线程,然后复现就会尽职尽责地阻塞它,如此反复,直到最后有足够的线程,每个排队的工作项都可以并发运行。在 .NET Framework 4.8 和 .NET 5 上,上述复现在我的12逻辑核心的机器上需要大约32秒才能完成。你可以在这里看到输出;注意每个工作项上的时间戳,你可以看到在非常快速地增加到与核心数量相等的线程数量后,它然后非常慢地引入额外的线程。
07:54.51: 4
07:54.51: 8
07:54.51: 1
07:54.51: 5
07:54.51: 9
07:54.51: 0
07:54.51: 10
07:54.51: 2
07:54.51: 11
07:54.51: 3
07:54.51: 6
07:54.51: 7
07:55.52: 12
07:56.52: 13
07:57.53: 14
07:58.52: 15
07:59.52: 16
07:00.02: 17
07:01.02: 18
07:01.52: 19
07:02.51: 20
07:03.52: 21
07:04.52: 22
07:05.03: 23
07:06.02: 24
07:07.03: 25
07:08.01: 26
07:09.03: 27
07:10.02: 28
07:11.02: 29
07:11.52: 30
07:12.52: 31
07:13.52: 32
07:14.02: 33
07:15.02: 34
07:15.53: 35
07:16.51: 36
07:17.02: 37
07:18.02: 38
07:18.52: 39
07:19.52: 40
07:20.52: 41
07:21.52: 42
07:22.55: 43
07:23.52: 44
07:24.53: 45
07:25.52: 46
07:26.02: 47
Done: 00:00:32.5128769
我很高兴地说,对于.NET 6,这种情况有所改善。这并不是让你开始编写更多的同步阻塞异步代码,而是承认有时这是无法避免的,特别是在现有的应用程序可能无法一次性转移到异步模型,可能有一些遗留组件等情况下。dotnet/runtime#53471教会了线程池我们在这些情况下看到的最常见的阻塞形式,即等待一个尚未完成的任务。作为回应,只要阻塞持续,线程池就会变得更加积极地增加其目标线程数,然后在阻塞结束后立即再次降低目标数。在.NET 6上再次运行相同的控制台应用程序,我们可以看到大约32秒的时间缩短到大约1.5秒,线程池对阻塞的反应更快地注入线程。
07:53.39: 5
07:53.39: 7
07:53.39: 6
07:53.39: 8
07:53.39: 9
07:53.39: 10
07:53.39: 1
07:53.39: 0
07:53.39: 4
07:53.39: 2
07:53.39: 3
07:53.47: 12
07:53.47: 11
07:53.47: 13
07:53.47: 14
07:53.47: 15
07:53.47: 22
07:53.47: 16
07:53.47: 17
07:53.47: 18
07:53.47: 19
07:53.47: 21
07:53.47: 20
07:53.50: 23
07:53.53: 24
07:53.56: 25
07:53.59: 26
07:53.63: 27
07:53.66: 28
07:53.69: 29
07:53.72: 30
07:53.75: 31
07:53.78: 32
07:53.81: 33
07:53.84: 34
07:53.91: 35
07:53.97: 36
07:54.03: 37
07:54.10: 38
07:54.16: 39
07:54.22: 40
07:54.28: 41
07:54.35: 42
07:54.41: 43
07:54.47: 44
07:54.54: 45
07:54.60: 46
07:54.68: 47
Done: 00:00:01.3649530
有趣的是,这个改进是由.NET 6中另一个大的线程池相关改变更容易实现的:现在的实现完全是用C#。在.NET的之前版本中,线程池的核心调度例程是在托管代码中,但所有关于线程管理的逻辑都仍然在运行时的本地中。所有这些逻辑之前已经被移植到C#中,以支持CoreRT和mono,但它并没有被用于coreclr。从.NET 6和dotnet/runtime#43841开始,它现在在所有地方都被使用。这应该使得进一步的改进和优化更容易,并在未来的版本中使池有更多的进步。
从线程池移开,.NET/runtime#55295 是一个有趣的改进。在多线程代码中,您经常会遇到一些情况,无论是直接使用低锁算法,还是间接使用并发原语(如锁和信号量),都会有忙于等待某事发生的情况。基于这样一个观念:操作系统中阻塞等待某事发生对于较长的等待是非常高效的,但在等待操作的开始和结束时会带来非同寻常的开销;如果你等待的事情很可能很快就会发生,你可能最好直接循环尝试再次发生,或者在非常短暂的暂停后尝试。我在那里使用的“PAUSE”一词并非偶然,因为x86指令集包括了“PAUSE”指令,它告诉处理器代码正在执行忙等待,并帮助其进行相应的优化。然而,“PAUSE”指令所产生的延迟在不同的处理器架构上可能差异很大,例如,在英特尔 Core i5 上可能仅需 9 个周期,在 AMD Ryzen 7 上可能需要 65 个周期,在英特尔 Core i7 上可能需要 140 个周期。这使得调整使用 spin 循环编写的更高层次代码的行为变得具有挑战性,因为运行时中的核心代码和核心库中的关键并发相关类型确实如此。为了应对这种差异并提供一致的暂停视角,之前的 .NET 发行版尝试在启动时测量暂停的持续时间,然后使用这些指标在对角线上正常化使用多少暂停。然而,这种方法有几个缺点。尽管在启动路径的主要线程上没有进行测量,但它仍然为每个进程贡献了毫秒级的 CPU 时间,这些时间叠加在每天发生的数百万或数十亿次 .NET 进程调用上。此外,该测量仅对进程执行一次,但由于多种原因,进程寿命期间的开销实际上可能会发生变化,例如,如果虚拟机被暂停并从一台物理机移动到另一台。为了克服这个问题,上述 PR 改变了方案。与其在启动时一次测量较长时间,不如定期进行短暂测量,并据此刷新对暂停时间认识的刷新。这应该会导致 CPU 利用率的整体下降,以及更准确地了解这些暂停的成本,从而使依赖它的应用程序和服务的行为更加一致。
让我们继续谈谈 Task,在这里有很多的改进。一个值得注意且早该改变的是使 Task.FromResult
当然,任务与C#中的异步方法紧密相连,值得看一下C# 10和.NET 6中一个小而重要的特性,这可能会直接或间接影响很多.NET代码。这需要一些背景知识。当C#编译器去实现一个带有签名async SomeTaskLikeType的异步方法时,它会咨询SomeTaskLikeType来看应该使用什么“构建器”来帮助实现该方法。例如,ValueTask带有[AsyncMethodBuilder(typeof(AsyncValueTaskMethodBuilder))]属性,因此任何异步ValueTask方法都会导致编译器使用AsyncValueTaskMethodBuilder作为该方法的构建器。如果我们编译一个简单的异步方法,我们可以看到这一点:
public static async ValueTask ExampleAsync() { }
for which the compiler produces approximately the following as the implementation of ExampleAsync:
public static ValueTask ExampleAsync()
{
<ExampleAsync>d__0 stateMachine = default;
stateMachine.<>t__builder = AsyncValueTaskMethodBuilder.Create();
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
这种构建器类型在生成的代码中被用来创建构建器实例(通过一个静态的 Create 方法),访问构建的任务(通过一个 Task 实例属性),完成那个构建的任务(通过 SetResult 和 SetException 实例方法),以及处理与那个构建的任务相关的状态管理,当一个 await 产生(通过 AwaitOnCompleted 和 UnsafeAwaitOnCompleted 实例方法)。由于有四种类型内置在核心库中,它们被设计为用作异步方法的返回类型(Task,Task
然而,这种模型的一个缺点是,选择哪个构建器与从异步方法返回的类型的定义有关。所以,如果你想定义你的异步方法返回 Task,Task
ValueTask
我们之前看到的 [AsyncMethodBuilder] 属性现在可以放在方法上,除了类型之外,感谢 dotnet/roslyn#54033;当一个异步方法被 [AsyncMethodBuilder(typeof(SomeBuilderType))] 属性标记时,C# 编译器将会优先选择那个构建器而不是默认的。并且,伴随着 C# 10 语言/编译器特性,.NET 6 包含了两种新的构建器类型,PoolingAsyncValueTaskMethodBuilder 和 PoolingAsyncValueTaskMethodBuilder
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
public static async ValueTask ExampleAsync() { }
now the compiler generates:
public static ValueTask ExampleAsync()
{
<ExampleAsync>d__0 stateMachine = default;
stateMachine.<>t__builder = PoolingAsyncValueTaskMethodBuilder.Create();
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
这意味着ExampleAsync现在可能使用池化的对象来支持返回的ValueTask实例。我们可以通过一个简单的基准测试来看到这一点:
const int Iters = 100_000;
[Benchmark(OperationsPerInvoke = Iters, Baseline = true)]
public async Task WithoutPooling()
{
for (int i = 0; i < Iters; i++)
await YieldAsync();
async ValueTask YieldAsync() => await Task.Yield();
}
[Benchmark(OperationsPerInvoke = Iters)]
public async Task WithPooling()
{
for (int i = 0; i < Iters; i++)
await YieldAsync();
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
async ValueTask YieldAsync() => await Task.Yield();
}
Method | Mean | Ratio | Allocated |
---|---|---|---|
WithoutPooling | 763.9ns | 1.00 | 112B |
WithPooling | 781.9ns | 1.02 | – |
注意每次调用的分配从112字节降到0。那么,为什么不直接将这种行为设为 AsyncValueTaskMethodBuilder 和 AsyncValueTaskMethodBuilder 的默认行为呢?有两个原因。首先,它确实创建了一个功能差异。任务比 ValueTasks 更有能力,支持并发使用,多个等待者,和同步阻塞。如果消费代码,例如,正在执行:
ValueTask vt = SomeMethodAsync();
await vt;
await vt;
当 ValueTask 由 Task 支持时,这将“正常工作”,但是当启用池化时,可能会以多种方式和不同的严重程度失败。代码分析规则 CA2012 旨在帮助避免此类代码,但仅此一项是不足以防止此类中断的。其次,如你从上面的基准测试中可以看到,虽然池化避免了分配,但它带来了一点额外的开销。这里没有显示的是,维护池本身(每个异步方法都维护一个池)在内存和工作集中的额外开销。这里还有一些可能的开销没有显示出来,这些是任何类型的池化的常见陷阱。例如,GC 优化为使 gen0 收集非常快,它可以做到这一点的一种方式是不需要扫描 gen1 或 gen2 作为 gen0 GC 的一部分。但是,如果有来自 gen1 或 gen2 的 gen0 对象的引用,那么它确实需要扫描这些世代的部分(这就是为什么将引用存储到字段中涉及“GC 写屏障”,以查看是否将对 gen0 对象的引用存储到来自更高世代的一个中)。由于池化的整个目的是保持对象长时间存在,这些对象可能最终会在这些更高的世代中,它们存储的任何引用可能最终会使 GC 更昂贵;这很容易在这些状态机中出现,因为在方法中使用的每个参数和局部变量可能都需要被跟踪。因此,从性能的角度来看,最好只在可能重要并且性能测试证明它能够朝正确方向推动指针的地方使用这种能力。当然,我们可以看到,除了节省分配外,还有一些场景实际上确实提高了吞吐量,这通常是人们在测量分配减少(即减少分配以减少在垃圾收集中花费的时间)时真正关注的改进点。
private const int Concurrency = 256;
private const int Iters = 100_000;
[Benchmark(Baseline = true)]
public Task NonPooling()
{
return Task.WhenAll(from i in Enumerable.Range(0, Concurrency)
select Task.Run(async delegate
{
for (int i = 0; i < Iters; i++)
await A().ConfigureAwait(false);
}));
static async ValueTask A() => await B().ConfigureAwait(false);
static async ValueTask B() => await C().ConfigureAwait(false);
static async ValueTask C() => await D().ConfigureAwait(false);
static async ValueTask D() => await Task.Yield();
}
[Benchmark]
public Task Pooling()
{
return Task.WhenAll(from i in Enumerable.Range(0, Concurrency)
select Task.Run(async delegate
{
for (int i = 0; i < Iters; i++)
await A().ConfigureAwait(false);
}));
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
static async ValueTask A() => await B().ConfigureAwait(false);
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
static async ValueTask B() => await C().ConfigureAwait(false);
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
static async ValueTask C() => await D().ConfigureAwait(false);
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
static async ValueTask D() => await Task.Yield();
}
Method | Mean | Ratio | Allocated |
---|---|---|---|
NonPooling | 3.271s | 1.00 | 11,800,058 KB |
Pooling | 2.896s | 0.88 | 214KB |
除了这些新的构建器,.NET 6中还引入了其他与任务相关的新API。Task.WaitAsync在dotnet/runtime#48842中被引入,它提供了一个优化的实现,用于创建一个新的任务,该任务将在前一个任务完成或指定的超时时间已过或指定的CancellationToken已请求取消时完成。这对于替换一个相当常见的模式非常有用(不幸的是,开发者经常做错),开发者希望等待一个任务完成,但是有超时和/或取消。例如,这样:
Task t = ...;
using (var cts = new CancellationTokenSource())
{
if (await Task.WhenAny(Task.Delay(timeout, cts.Token), t) != t)
{
throw new TimeoutException();
}
cts.Cancel();
await t;
}
can now be replaced with just this:
Task t = ...;
await t.WaitAsync(timeout);
并且速度更快,开销更小。一个很好的例子来自 dotnet/runtime#55262,它使用新的 Task.WaitAsync 替换了存在于 SemaphoreSlim.WaitAsync 内部的类似实现,使得后者现在更易于维护,速度更快,分配更少。
private SemaphoreSlim _sem = new SemaphoreSlim(0, 1);
private CancellationTokenSource _cts = new CancellationTokenSource();
[Benchmark]
public Task WithCancellationToken()
{
Task t = _sem.WaitAsync(_cts.Token);
_sem.Release();
return t;
}
[Benchmark]
public Task WithTimeout()
{
Task t = _sem.WaitAsync(TimeSpan.FromMinutes(1));
_sem.Release();
return t;
}
[Benchmark]
public Task WithCancellationTokenAndTimeout()
{
Task t = _sem.WaitAsync(TimeSpan.FromMinutes(1), _cts.Token);
_sem.Release();
return t;
}
Method | Runtime | Mean | Ratio | Allocated |
---|---|---|---|---|
WithCancellationToken | .NET Framework 4.8 | 2.993us | 1.00 | 1,263B |
WithCancellationToken | .NET Core 3.1 | 1.327us | 0.44 | 536B |
WithCancellationToken | .NET 5.0 | 1.337us | 0.45 | 496B |
WithCancellationToken | .NET 6.0 | 1.056us | 0.35 | 448B |
WithTimeout | .NET Framework 4.8 | 3.267us | 1.00 | 1,304B |
WithTimeout | .NET Core 3.1 | 1.768us | 0.54 | 1,064B |
WithTimeout | .NET 5.0 | 1.769us | 0.54 | 1,056B |
WithTimeout | .NET 6.0 | 1.086us | 0.33 | 544B |
WithCancellationTokenAndTimeout | .NET Framework 4.8 | 3.838us | 1.00 | 1,409B |
WithCancellationTokenAndTimeout | .NET Core 3.1 | 1.901us | 0.50 | 1,080B |
WithCancellationTokenAndTimeout | .NET 5.0 | 1.929us | 0.50 | 1,072B |
WithCancellationTokenAndTimeout | .NET 6.0 | 1.186us | 0.31 | 544B |
.NET 6 还看到了长期以来要求添加的 Parallel.ForEachAsync (dotnet/runtime#46943),它使得异步枚举 IEnumerable
关于 CancellationToken,.NET 6 中的取消支持也有了性能改进,包括对现有功能和新 API 的改进,这些新 API 使应用程序能够做得更好。一个有趣的改进是 dotnet/runtime#48251,这是一个很好的例子,说明了人们如何为一个场景设计、实现和优化,只是发现它做出了错误的权衡。当 CancellationToken 和 CancellationTokenSource 在 .NET Framework 4.0 中引入时,当时的预期是,主要的使用场景将是许多线程并行地从同一个 CancellationToken 中注册和注销。这导致了一个非常整洁(但复杂)的无锁实现,涉及到相当多的分配和开销。如果你实际上是从许多并行线程的同一个令牌中注册和注销,那么这个实现非常高效,结果是良好的吞吐量。但是,如果你没有这样做,你就会为一些没有提供相应利益的东西付出很多开销。而且,幸运的是,现在几乎从来没有这种情况。更常见的是,CancellationToken 被串行使用,通常一次性注册多个,但这些注册大部分都是作为串行执行流的一部分添加的,而不是全部并发添加的。这个 PR 认识到了这个现实,并将实现恢复到一个更简单、更轻量、更快的版本,这个版本对绝大多数的使用场景表现更好(尽管如果它实际上被多个线程同时猛击,会有所损失)。
private CancellationTokenSource _source = new CancellationTokenSource();
[Benchmark]
public void CreateTokenDispose()
{
using (var cts = new CancellationTokenSource())
_ = cts.Token;
}
[Benchmark]
public void CreateRegisterDispose()
{
using (var cts = new CancellationTokenSource())
cts.Token.Register(s => { }, null).Dispose();
}
[Benchmark]
public void CreateLinkedTokenDispose()
{
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(_source.Token))
_ = cts.Token;
}
[Benchmark(OperationsPerInvoke = 1_000_000)]
public void CreateManyRegisterDispose()
{
using (var cts = new CancellationTokenSource())
{
CancellationToken ct = cts.Token;
for (int i = 0; i < 1_000_000; i++)
ct.Register(s => { }, null).Dispose();
}
}
[Benchmark(OperationsPerInvoke = 1_000_000)]
public void CreateManyRegisterMultipleDispose()
{
using (var cts = new CancellationTokenSource())
{
CancellationToken ct = cts.Token;
for (int i = 0; i < 1_000_000; i++)
{
var ctr1 = ct.Register(s => { }, null);
var ctr2 = ct.Register(s => { }, null);
var ctr3 = ct.Register(s => { }, null);
var ctr4 = ct.Register(s => { }, null);
var ctr5 = ct.Register(s => { }, null);
ctr5.Dispose();
ctr4.Dispose();
ctr3.Dispose();
ctr2.Dispose();
ctr1.Dispose();
}
}
}
Method | Runtime | Mean | Ratio | Allocated |
---|---|---|---|---|
CreateTokenDispose | .NET Framework 4.8 | 10.236 ns | 1.00 | 72 B |
CreateTokenDispose | .NET Core 3.1 | 6.934 ns | 0.68 | 64 B |
CreateTokenDispose | .NET 5.0 | 7.268 ns | 0.71 | 64 B |
CreateTokenDispose | .NET 6.0 | 6.200 ns | 0.61 | 48 B |
CreateRegisterDispose | .NET Framework 4.8 | 144.218 ns | 1.00 | 385 B |
CreateRegisterDispose | .NET Core 3.1 | 79.392 ns | 0.55 | 352 B |
CreateRegisterDispose | .NET 5.0 | 79.431 ns | 0.55 | 352 B |
CreateRegisterDispose | .NET 6.0 | 56.715 ns | 0.39 | 192 B |
CreateLinkedTokenDispose | .NET Framework 4.8 | 103.622 ns | 1.00 | 209 B |
CreateLinkedTokenDispose | .NET Core 3.1 | 61.944 ns | 0.60 | 112 B |
CreateLinkedTokenDispose | .NET 5.0 | 53.526 ns | 0.52 | 80 B |
CreateLinkedTokenDispose | .NET 6.0 | 38.631 ns | 0.37 | 64 B |
CreateManyRegisterDispose | .NET Framework 4.8 | 87.713 ns | 1.00 | 56 B |
CreateManyRegisterDispose | .NET Core 3.1 | 43.491 ns | 0.50 | – |
CreateManyRegisterDispose | .NET 5.0 | 41.124 ns | 0.47 | – |
CreateManyRegisterDispose | .NET 6.0 | 35.437 ns | 0.40 | – |
CreateManyRegisterMultipleDispose | .NET Framework 4.8 | 439.874 ns | 1.00 | 281 B |
CreateManyRegisterMultipleDispose | .NET Core 3.1 | 234.367 ns | 0.53 | – |
CreateManyRegisterMultipleDispose | .NET 5.0 | 229.483 ns | 0.52 | – |
CreateManyRegisterMultipleDispose | .NET 6.0 | 192.213 ns | 0.44 | – |
CancellationToken 还有新的 API 来帮助提高性能。dotnet/runtime#43114 添加了 Register 和 Unregister 的新重载,它们接受一个 Action<object, CancellationToken> 委托,而不是 Action
标签:00,Task,07,Thread,--,Improvements,线程,NET,public From: https://www.cnblogs.com/yahle/p/Performance_Improvements_in_NET_8_Thread.html