首页 > 其他分享 >.net 到底行不行!2000 人在线的客服系统真实屏录演示(附技术详解)

.net 到底行不行!2000 人在线的客服系统真实屏录演示(附技术详解)

时间:2024-09-25 13:22:56浏览次数:1  
标签:bytesConsumed buffer segments 缓冲区 2000 var net segment 屏录

业余时间用 .net 写了一个免费的在线客服系统:升讯威在线客服与营销系统。

时常有朋友问我性能方面的问题,正好有一个真实客户,在线的访客数量达到了 2000 人。在争得客户同意后,我录了一个视频。

升讯威在线客服系统可以在极低配置的服务器环境下,轻松应对这种情况,依然可以做到:

消息毫秒级送达,操作毫秒级响应

性能

以官方在线使用环境为例,每日处理 HTTPS 请求数大于 16 万次, PV 请求大于 25 万 次的情况下,服务端主程序内存占用小于 300MB,服务器 CPU 占用小于 5%。

每日处理 HTTPS 请求数大于 16 万次:

每日处理 PV 请求大于 25 万 次:

服务端主程序内存占用小于 300MB:

服务器 CPU (Intel Xeon Platinum 8163 / 4 核 2.5 GHz) 占用稳定约 5%:

安全性

  • 访客端使用 https 与 wss 安全连接,数据全程加密传输。
  • 客服端数据报文使用 AES 加密传输。(Advanced Encryption Standard,美国联邦政府区块加密标准)。
  • 可以 100% 私有化部署在您的自有服务器。

拦截的报文中消息以密文传输:


实现效果

客服端

访客端


怎么做到的?技术详解

使用NetworkStream的TCP服务器

在Pipelines之前用.NET编写的典型代码如下所示:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // 在buffer中处理一行消息
    ProcessLine(buffer);
}

此代码可能在本地测试时正确工作,但它有几个潜在错误:

一次ReadAsync调用可能没有收到整个消息(行尾)。
它忽略了stream.ReadAsync()返回值中实际填充到buffer中的数据量。(译者注:即不一定将buffer填充满)
一次ReadAsync调用不能处理多条消息。
这些是读取流数据时常见的一些缺陷。为了解决这个问题,我们需要做一些改变:

我们需要缓冲传入的数据,直到找到新的行。
我们需要解析缓冲区中返回的所有行

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered);
        if (bytesRead == 0)
        {
            // EOF 已经到末尾
            break;
        }
        // 跟踪已缓冲的字节数
        bytesBuffered += bytesRead;

        var linePosition = -1;

        do
        {
            // 在缓冲数据中查找找一个行末尾
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // 根据偏移量计算一行的长度
                var lineLength = linePosition - bytesConsumed;

                // 处理这一行
                ProcessLine(buffer, bytesConsumed, lineLength);

                // 移动bytesConsumed为了跳过我们已经处理掉的行 (包括\n)
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

这一次,这可能适用于本地开发,但一行可能大于1KiB(1024字节)。我们需要调整输入缓冲区的大小,直到找到新行。

因此,我们可以在堆上分配缓冲区去处理更长的一行。我们从客户端解析较长的一行时,可以通过使用ArrayPool避免重复分配缓冲区来改进这一点。

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // 在buffer中计算中剩余的字节数
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // 将buffer size翻倍 并且将之前缓冲的数据复制到新的缓冲区
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // 将旧的buffer丢回池中
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF 末尾
            break;
        }

        // 跟踪已缓冲的字节数
        bytesBuffered += bytesRead;

        do
        {
            // 在缓冲数据中查找找一个行末尾
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // 根据偏移量计算一行的长度
                var lineLength = linePosition - bytesConsumed;

                // 处理这一行
                ProcessLine(buffer, bytesConsumed, lineLength);

                // 移动bytesConsumed为了跳过我们已经处理掉的行 (包括\n)
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

这段代码有效,但现在我们正在重新调整缓冲区大小,从而产生更多缓冲区副本。它将使用更多内存,因为根据代码在处理一行行后不会缩缓冲区的大小。为避免这种情况,我们可以存储缓冲区序列,而不是每次超过1KiB大小时调整大小。

此外,我们不会增长1KiB的 缓冲区,直到它完全为空。这意味着我们最终传递给ReadAsync越来越小的缓冲区,这将导致对操作系统的更多调用。

为了缓解这种情况,我们将在现有缓冲区中剩余少于512个字节时分配一个新缓冲区:

public class BufferSegment
{
    public byte[] Buffer { get; set; }
    public int Count { get; set; }

    public int Remaining => Buffer.Length - Count;
}

async Task ProcessLinesAsync(NetworkStream stream)
{
    const int minimumBufferSize = 512;

    var segments = new List<BufferSegment>();
    var bytesConsumed = 0;
    var bytesConsumedBufferIndex = 0;
    var segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) };

    segments.Add(segment);

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer
        if (segment.Remaining < minimumBufferSize)
        {
            // Allocate a new segment
            segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) };
            segments.Add(segment);
        }

        var bytesRead = await stream.ReadAsync(segment.Buffer, segment.Count, segment.Remaining);
        if (bytesRead == 0)
        {
            break;
        }

        // Keep track of the amount of buffered bytes
        segment.Count += bytesRead;

        while (true)
        {
            // Look for a EOL in the list of segments
            var (segmentIndex, segmentOffset) = IndexOf(segments, (byte)'\n', bytesConsumedBufferIndex, bytesConsumed);

            if (segmentIndex >= 0)
            {
                // Process the line
                ProcessLine(segments, segmentIndex, segmentOffset);

                bytesConsumedBufferIndex = segmentOffset;
                bytesConsumed = segmentOffset + 1;
            }
            else
            {
                break;
            }
        }

        // Drop fully consumed segments from the list so we don't look at them again
        for (var i = bytesConsumedBufferIndex; i >= 0; --i)
        {
            var consumedSegment = segments[i];
            // Return all segments unless this is the current segment
            if (consumedSegment != segment)
            {
                ArrayPool<byte>.Shared.Return(consumedSegment.Buffer);
                segments.RemoveAt(i);
            }
        }
    }
}

(int segmentIndex, int segmentOffest) IndexOf(List<BufferSegment> segments, byte value, int startBufferIndex, int startSegmentOffset)
{
    var first = true;
    for (var i = startBufferIndex; i < segments.Count; ++i)
    {
        var segment = segments[i];
        // Start from the correct offset
        var offset = first ? startSegmentOffset : 0;
        var index = Array.IndexOf(segment.Buffer, value, offset, segment.Count - offset);

        if (index >= 0)
        {
            // Return the buffer index and the index within that segment where EOL was found
            return (i, index);
        }

        first = false;
    }
    return (-1, -1);
}

此代码只是得到很多更加复杂。当我们正在寻找分隔符时,我们同时跟踪已填充的缓冲区序列。为此,我们此处使用List查找新行分隔符时表示缓冲数据。其结果是,ProcessLine和IndexOf现在接受List作为参数,而不是一个byte[],offset和count。我们的解析逻辑现在需要处理一个或多个缓冲区序列。

我们的服务器现在处理部分消息,它使用池化内存来减少总体内存消耗,但我们还需要进行更多更改:

  • 我们使用的byte[]和ArrayPool的只是普通的托管数组。这意味着无论何时我们执行ReadAsync或WriteAsync,这些缓冲区都会在异步操作的生命周期内被固定(以便与操作系统上的本机IO API互操作)。这对GC有性能影响,因为无法移动固定内存,这可能导致堆碎片。根据异步操作挂起的时间长短,池的实现可能需要更改。
  • 可以通过解耦读取逻辑和处理逻辑来优化吞吐量。这会创建一个批处理效果,使解析逻辑可以使用更大的缓冲区块,而不是仅在解析单个行后才读取更多数据。这引入了一些额外的复杂性
  • 我们需要两个彼此独立运行的循环。一个读取Socket和一个解析缓冲区。
  • 当数据可用时,我们需要一种方法来向解析逻辑发出信号。
  • 我们需要决定如果循环读取Socket“太快”会发生什么。如果解析逻辑无法跟上,我们需要一种方法来限制读取循环(逻辑)。这通常被称为“流量控制”或“背压”。
  • 我们需要确保事情是线程安全的。我们现在在读取循环和解析循环之间共享多个缓冲区,并且这些缓冲区在不同的线程上独立运行。
  • 内存管理逻辑现在分布在两个不同的代码段中,从填充缓冲区池的代码是从套接字读取的,而从缓冲区池取数据的代码是解析逻辑。
  • 我们需要非常小心在解析逻辑完成之后我们如何处理缓冲区序列。如果我们不小心,我们可能会返回一个仍由Socket读取逻辑写入的缓冲区序列。

在线体验或下载完整私有化部署包:

https://kf.shengxunwei.com

希望能够打造: 开放、开源、共享。努力打造 .net 社区的一款优秀开源产品。

钟意的话请给个赞支持一下吧,谢谢~

标签:bytesConsumed,buffer,segments,缓冲区,2000,var,net,segment,屏录
From: https://www.cnblogs.com/sheng_chao/p/18431157

相关文章

  • .NET 9 中 LINQ 新增的功能
    .NET9中LINQ新增的功能 思维导航LINQ介绍安装.NET9VSCode中编写ASP.NETCoreWeb应用VSCode中创建.NET9控制台应用CountBy方法AggregateBy方法Index(IEnumerable) 方法参考文章C#/.NET/.NETCore拾遗补漏LINQ介绍语言集成查询(LINQ)是一系列直接......
  • .NET 6.0 + WPF 使用 Prism 框架实现导航
    .NET6.0+WPF使用Prism框架实现导航 合集-.NET基础知识(4) 1..NET9优化,抢先体验C#13新特性08-202.《黑神话:悟空》神话再现,虚幻引擎与Unity/C#谁更强?08-213..NET6.0+WPF使用Prism框架实现导航09-114.2024年C#高效开发:精选实用类库09-23收起 ......
  • .NET 最好用的验证组件 FluentValidation
    .NET最好用的验证组件FluentValidation 合集-.NET开源工具(18)  阅读目录前言项目介绍项目使用高级用法项目地址总结最后前言一个.NET验证框架,支持链式操作,易于理解,功能完善,组件内提供十几种常用验证器,可扩展性好,支持自定义验证器,支持本地化多......
  • .NET 8.0 文档管理系统网盘功能的实现
    .NET8.0文档管理系统网盘功能的实现 合集-.NET开源工具(18)  阅读目录前言项目介绍项目技术项目结构项目使用项目展示项目地址总结最后前言大家好,今天推荐一个文档管理系统Dorisoy.Pan。Dorisoy.Pan是一个基于.NET8和WebAPI构建的文档管......
  • .NET 压缩/解压文件
    .NET压缩/解压文件 本文为大家介绍下.NET解压/压缩zip文件。虽然解压缩不是啥核心技术,但压缩性能以及进度处理还是需要关注下,针对使用较多的zip开源组件验证,给大家提供个技术选型参考之前在《.NETWebSocket高并发通信阻塞问题-唐宋元明清2188-博客园(cnblogs.com)》......
  • ASP.NET Core SignalR :学习消息通讯,实现一个消息通知
    ASP.NETCoreSignalR:学习消息通讯,实现一个消息通知  什么是SignalR    目前我用业余时间正在做一个博客系统,其中有个功能就是评论通知,就是假如A用户评论B用户的时候,如果B用户首页处于打开状态,那么就会提示B用户有未读消息。暂时用SignalR来实现这个功能。我也是......
  • 记.Net Framework中wwwroot文件限制用户访问
    背景项目.NetFramework做的,已经线上跑了很多年了,突然发现用户上传的文件都被放到了wwwroot//Content/Upload目录,这些文件都是比较重要的,程序用来读取解析数据的,但是被直接可以公开访问了。其实要改也很简单,代码改一下,文件挪一下位置就可以了,但是如果这样改就是一个线上大Bug裸......
  • 记.Net Core Host服务使用Dapper内存溢出问题
    背景项目要做一个数据迁移,牵扯大概60多张表,几千万数据,这些数据都被放到了csv文件中并拆分成了10w条记录一个文件。思路是使用.NetCoreHost开一个线程去读取文件夹csv根据业务导入到表就可以。ps:第一次用Dapper做这种处理问题在导入过程中,因为我们的服务器内存只有8G。经常......
  • 使用.NET并行任务库(TPL)与并行Linq(PLINQ)充分利用多核性能
    使用.NET并行任务库(TPL)与并行Linq(PLINQ)充分利用多核性能 前言最近比较闲,(项目要转Java被分到架构组,边缘化人员,无所事事哈哈哈哈)记录一下前段时间用到的.NET框架下采用并行策略充分利用多核CPU进行优化的一个方法起因是项目中有个结算的方法,需要汇总一个月的数据......
  • C#|.net core 基础 - 深拷贝的五大类N种实现方式
    C#|.netcore基础-深拷贝的五大类N种实现方式 合集-C#|.netcore基础(6) 1.C#|.netcore基础-“hello”.IndexOf(“\0”,2)中的坑08-302.C#|.netcore基础-如何判断连续子序列09-033.C#|.netcore基础-值传递vs引用传递09-194.C#|.netcore基础-扩展数......