首页 > 其他分享 >升讯威在线客服系统的并发高性能数据处理技术:高性能TCP服务器技术

升讯威在线客服系统的并发高性能数据处理技术:高性能TCP服务器技术

时间:2023-09-06 12:55:13浏览次数:39  
标签:bytesConsumed 读取 buffer 升讯威 TCP var 高性能 缓冲区 segment

我在业余时间开发维护了一款免费开源的升讯威在线客服系统,也收获了许多用户。对我来说,只要能获得用户的认可,就是我最大的动力。

最近客服系统成功经受住了客户现场组织的压力测试,获得了客户的认可。
客户组织多名客服上线后,所有员工同一时间打开访客页面疯狂不停的给在线客服发消息,系统稳定无异常无掉线,客服回复消息正常。消息实时到达无任何延迟。

https://kf.shengxunwei.com/


我会通过一系列的文章详细分析升讯威在线客服系统的并发高性能技术是如何实现的,使用了哪些方案以及具体的做法。

本篇介绍数据传输方面的管线技术。

Pipelines诞生于.NET Core团队,为使Kestrel成为业界最快的Web服务器之一。最初从作为Kestrel内部的实现细节发展成为可重用的API,它在.Net Core 2.1中作为可用于所有.NET开发人员的最高级BCL API(System.IO.Pipelines)提供。

使用NetworkStream的TCP服务器

在Pipelines之前用.NET编写的典型代码如下所示:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // 在buffer中处理一行消息
    ProcessLine(buffer);
}

此代码可能在本地测试时正确工作,但它有几个潜在错误:

一次ReadAsync调用可能没有收到整个消息(行尾)。
它忽略了stream.ReadAsync()返回值中实际填充到buffer中的数据量。(译者注:即不一定将buffer填充满)
一次ReadAsync调用不能处理多条消息。
这些是读取流数据时常见的一些缺陷。为了解决这个问题,我们需要做一些改变:

我们需要缓冲传入的数据,直到找到新的行。
我们需要解析缓冲区中返回的所有行

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered);
        if (bytesRead == 0)
        {
            // EOF 已经到末尾
            break;
        }
        // 跟踪已缓冲的字节数
        bytesBuffered += bytesRead;

        var linePosition = -1;

        do
        {
            // 在缓冲数据中查找找一个行末尾
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // 根据偏移量计算一行的长度
                var lineLength = linePosition - bytesConsumed;

                // 处理这一行
                ProcessLine(buffer, bytesConsumed, lineLength);

                // 移动bytesConsumed为了跳过我们已经处理掉的行 (包括\n)
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

这一次,这可能适用于本地开发,但一行可能大于1KiB(1024字节)。我们需要调整输入缓冲区的大小,直到找到新行。

因此,我们可以在堆上分配缓冲区去处理更长的一行。我们从客户端解析较长的一行时,可以通过使用ArrayPool避免重复分配缓冲区来改进这一点。

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // 在buffer中计算中剩余的字节数
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // 将buffer size翻倍 并且将之前缓冲的数据复制到新的缓冲区
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // 将旧的buffer丢回池中
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF 末尾
            break;
        }

        // 跟踪已缓冲的字节数
        bytesBuffered += bytesRead;

        do
        {
            // 在缓冲数据中查找找一个行末尾
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // 根据偏移量计算一行的长度
                var lineLength = linePosition - bytesConsumed;

                // 处理这一行
                ProcessLine(buffer, bytesConsumed, lineLength);

                // 移动bytesConsumed为了跳过我们已经处理掉的行 (包括\n)
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

这段代码有效,但现在我们正在重新调整缓冲区大小,从而产生更多缓冲区副本。它将使用更多内存,因为根据代码在处理一行行后不会缩缓冲区的大小。为避免这种情况,我们可以存储缓冲区序列,而不是每次超过1KiB大小时调整大小。

此外,我们不会增长1KiB的 缓冲区,直到它完全为空。这意味着我们最终传递给ReadAsync越来越小的缓冲区,这将导致对操作系统的更多调用。

为了缓解这种情况,我们将在现有缓冲区中剩余少于512个字节时分配一个新缓冲区:

public class BufferSegment
{
    public byte[] Buffer { get; set; }
    public int Count { get; set; }

    public int Remaining => Buffer.Length - Count;
}

async Task ProcessLinesAsync(NetworkStream stream)
{
    const int minimumBufferSize = 512;

    var segments = new List<BufferSegment>();
    var bytesConsumed = 0;
    var bytesConsumedBufferIndex = 0;
    var segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) };

    segments.Add(segment);

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer
        if (segment.Remaining < minimumBufferSize)
        {
            // Allocate a new segment
            segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) };
            segments.Add(segment);
        }

        var bytesRead = await stream.ReadAsync(segment.Buffer, segment.Count, segment.Remaining);
        if (bytesRead == 0)
        {
            break;
        }

        // Keep track of the amount of buffered bytes
        segment.Count += bytesRead;

        while (true)
        {
            // Look for a EOL in the list of segments
            var (segmentIndex, segmentOffset) = IndexOf(segments, (byte)'\n', bytesConsumedBufferIndex, bytesConsumed);

            if (segmentIndex >= 0)
            {
                // Process the line
                ProcessLine(segments, segmentIndex, segmentOffset);

                bytesConsumedBufferIndex = segmentOffset;
                bytesConsumed = segmentOffset + 1;
            }
            else
            {
                break;
            }
        }

        // Drop fully consumed segments from the list so we don't look at them again
        for (var i = bytesConsumedBufferIndex; i >= 0; --i)
        {
            var consumedSegment = segments[i];
            // Return all segments unless this is the current segment
            if (consumedSegment != segment)
            {
                ArrayPool<byte>.Shared.Return(consumedSegment.Buffer);
                segments.RemoveAt(i);
            }
        }
    }
}

(int segmentIndex, int segmentOffest) IndexOf(List<BufferSegment> segments, byte value, int startBufferIndex, int startSegmentOffset)
{
    var first = true;
    for (var i = startBufferIndex; i < segments.Count; ++i)
    {
        var segment = segments[i];
        // Start from the correct offset
        var offset = first ? startSegmentOffset : 0;
        var index = Array.IndexOf(segment.Buffer, value, offset, segment.Count - offset);

        if (index >= 0)
        {
            // Return the buffer index and the index within that segment where EOL was found
            return (i, index);
        }

        first = false;
    }
    return (-1, -1);
}

此代码只是得到很多更加复杂。当我们正在寻找分隔符时,我们同时跟踪已填充的缓冲区序列。为此,我们此处使用List查找新行分隔符时表示缓冲数据。其结果是,ProcessLine和IndexOf现在接受List作为参数,而不是一个byte[],offset和count。我们的解析逻辑现在需要处理一个或多个缓冲区序列。

我们的服务器现在处理部分消息,它使用池化内存来减少总体内存消耗,但我们还需要进行更多更改:

  • 我们使用的byte[]和ArrayPool的只是普通的托管数组。这意味着无论何时我们执行ReadAsync或WriteAsync,这些缓冲区都会在异步操作的生命周期内被固定(以便与操作系统上的本机IO API互操作)。这对GC有性能影响,因为无法移动固定内存,这可能导致堆碎片。根据异步操作挂起的时间长短,池的实现可能需要更改。
  • 可以通过解耦读取逻辑和处理逻辑来优化吞吐量。这会创建一个批处理效果,使解析逻辑可以使用更大的缓冲区块,而不是仅在解析单个行后才读取更多数据。这引入了一些额外的复杂性
  • 我们需要两个彼此独立运行的循环。一个读取Socket和一个解析缓冲区。
  • 当数据可用时,我们需要一种方法来向解析逻辑发出信号。
  • 我们需要决定如果循环读取Socket“太快”会发生什么。如果解析逻辑无法跟上,我们需要一种方法来限制读取循环(逻辑)。这通常被称为“流量控制”或“背压”。
  • 我们需要确保事情是线程安全的。我们现在在读取循环和解析循环之间共享多个缓冲区,并且这些缓冲区在不同的线程上独立运行。
  • 内存管理逻辑现在分布在两个不同的代码段中,从填充缓冲区池的代码是从套接字读取的,而从缓冲区池取数据的代码是解析逻辑。
  • 我们需要非常小心在解析逻辑完成之后我们如何处理缓冲区序列。如果我们不小心,我们可能会返回一个仍由Socket读取逻辑写入的缓冲区序列。

复杂性已经到了极端(我们甚至没有涵盖所有案例)。高性能网络应用通常意味着编写非常复杂的代码,以便从系统中获得更高的性能。

System.IO.Pipelines的目标是使这种类型的代码更容易编写。

使用System.IO.Pipelines的TCP服务器

让我们来看看这个例子的样子System.IO.Pipelines:

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    return Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // 从PipeWriter至少分配512字节
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try 
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // 告诉PipeWriter从套接字读取了多少
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // 标记数据可用,让PipeReader读取
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

    // 告诉PipeReader没有更多的数据
    writer.Complete();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();

        ReadOnlySequence<byte> buffer = result.Buffer;
        SequencePosition? position = null;

        do 
        {
            // 在缓冲数据中查找找一个行末尾
            position = buffer.PositionOf((byte)'\n');

            if (position != null)
            {
                // 处理这一行
                ProcessLine(buffer.Slice(0, position.Value));

                // 跳过 这一行+\n (basically position 主要位置?)
                buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
            }
        }
        while (position != null);

        // 告诉PipeReader我们以及处理多少缓冲
        reader.AdvanceTo(buffer.Start, buffer.End);

        // 如果没有更多的数据,停止都去
        if (result.IsCompleted)
        {
            break;
        }
    }

    // 将PipeReader标记为完成
    reader.Complete();
}

我们的行读取器的pipelines版本有2个循环:

FillPipeAsync从Socket读取并写入PipeWriter。
ReadPipeAsync从PipeReader中读取并解析传入的行。
与原始示例不同,在任何地方都没有分配显式缓冲区。这是管道的核心功能之一。所有缓冲区管理都委托给PipeReader/PipeWriter实现。

这使得使用代码更容易专注于业务逻辑而不是复杂的缓冲区管理。

在第一个循环中,我们首先调用PipeWriter.GetMemory(int)从底层编写器获取一些内存; 然后我们调用PipeWriter.Advance(int)告诉PipeWriter我们实际写入缓冲区的数据量。然后我们调用PipeWriter.FlushAsync()来提供数据给PipeReader。

在第二个循环中,我们正在使用PipeWriter最终来自的缓冲区Socket。当调用PipeReader.ReadAsync()返回时,我们得到一个ReadResult包含2条重要信息,包括以ReadOnlySequence形式读取的数据和bool IsCompleted,让reader知道writer是否写完(EOF)。在找到行尾(EOL)分隔符并解析该行之后,我们将缓冲区切片以跳过我们已经处理过的内容,然后我们调用PipeReader.AdvanceTo告诉PipeReader我们消耗了多少数据。

在每个循环结束时,我们完成了reader和writer。这允许底层Pipe释放它分配的所有内存。

System.IO.Pipelines
除了处理内存管理之外,其他核心管道功能还包括能够在Pipe不实际消耗数据的情况下查看数据。

PipeReader有两个核心API ReadAsync和AdvanceTo。ReadAsync获取Pipe数据,AdvanceTo告诉PipeReader不再需要这些缓冲区,以便可以丢弃它们(例如返回到底层缓冲池)。

流量控制

在一个完美的世界中,读取和解析工作是一个团队:读取线程消耗来自网络的数据并将其放入缓冲区,而解析线程负责构建适当的数据结构。通常,解析将比仅从网络复制数据块花费更多时间。结果,读取线程可以轻易地压倒解析线程。结果是读取线程必须减慢或分配更多内存来存储解析线程的数据。为获得最佳性能,在频繁暂停和分配更多内存之间存在平衡。

为了解决这个问题,管道有两个设置来控制数据的流量,PauseWriterThreshold和ResumeWriterThreshold。PauseWriterThreshold决定有多少数据应该在调用PipeWriter.FlushAsync之前进行缓冲停顿。ResumeWriterThreshold控制reader消耗多少后写入可以恢复。

IO调度

通常在使用async / await时,会在线程池线程或当前线程上调用continuation SynchronizationContext。

在执行IO时,对执行IO的位置进行细粒度控制非常重要,这样可以更有效地利用CPU缓存,这对于Web服务器等高性能应用程序至关重要。Pipelines公开了一个PipeScheduler确定异步回调运行位置的方法。这使得调用者可以精确控制用于IO的线程。

实践中的一个示例是在Kestrel Libuv传输中,其中IO回调在专用事件循环线程上运行。


简介

升讯威在线客服与营销系统是一款客服软件,但更重要的是一款营销利器。

https://kf.shengxunwei.com/

  • 可以追踪正在访问网站或使用 APP 的所有访客,收集他们的浏览情况,使客服能够主动出击,施展话术,促进成单。
    访* 客端在 PC 支持所有新老浏览器。包括不支持 WebSocket 的 IE8 也能正常使用。
  • 移动端支持所有手机浏览器、APP、各大平台的公众号对接。
  • 支持访客信息互通,可传输访客标识、名称和其它任意信息到客服系统。
  • 具备一线专业技术水平,网络中断,拔掉网线,手机飞行模式,不丢消息。同类软件可以按视频方式对比测试。

标签:bytesConsumed,读取,buffer,升讯威,TCP,var,高性能,缓冲区,segment
From: https://www.cnblogs.com/sheng_chao/p/17682021.html

相关文章

  • Linux应用编程_网络通信TCP/UDP
    (1)网络协议被分为5层 1)应用层:直接为用户的应用进程提供服务 HTTP协议,FTP协议,DNS,POP3,SNMP,Telnet 2)运输层(传输层):负责向两个主机中进程之间的通信提供服务 (基于TCP/UDP) (1)传输控制协议TCP(TransmissionControlProtocol): 1)数据传输的单位是报文段 2)面向......
  • 【原创】基于QT编写的支持IPv4/IPv6双协议栈,TCP/UDP双模式,DLL内存加载的模块化远控木
    本人已经本科毕业一年有余,在平常实习过程中,发现大佬都对我的本科毕设--双协议栈远控木马感兴趣。据我所知,目前流行的C2远控软件中,MSF支持IPv4和IPv6,但是MSF生成的单个木马只是支持其中的一种协议,而不是双协议栈。CobaltStrike目前尚无IPv6的使用案例。其他支持双协议栈的C2软件......
  • 视频汇聚/视频云存储/视频监控管理平台EasyCVR启动时打印starting server:listen tcp,
    视频云存储/安防监控EasyCVR视频汇聚平台基于云边端智能协同,可实现视频监控直播、视频轮播、视频录像、云存储、回放与检索、智能告警、服务器集群、语音对讲、云台控制、电子地图、H.265自动转码H.264、平台级联等。为了便于用户二次开发、调用与集成,旭帆科技TSINGSEE青犀视频也......
  • 云计算环境中高性能计算的挑战与对策
    文章目录云计算中的高性能计算挑战1.资源竞争:2.网络延迟:3.数据传输效率:4.虚拟化开销:5.节点异构性:高性能计算在云计算环境中的对策1.定制化虚拟机镜像:2.弹性资源调整:3.高效数据传输:4.任务并行度:未来发展和展望1.边缘计算的结合:2.量子计算的应用:3.智能任务调度:结论......
  • 服务器显卡:驱动高性能计算和人工智能应用
    本文分享自天翼云开发者社区《服务器显卡:驱动高性能计算和人工智能应用》,作者:不知不觉一、引言随着高性能计算和人工智能应用的不断发展,服务器显卡的性能显得越来越重要。服务器显卡是服务器硬件配置中的一个关键组件,它不仅提供基本的图形渲染能力,还在高性能计算和人工智能应......
  • tcpdump命令详解
    1、https://blog.csdn.net/wj31932/article/details/106570542/目录1 概述 2命令格式3 常见协议输出格式4 常用的选项和对应参数5 表达式5.1根据偏移量预过滤6 使用方法执行结果范例 6.1Tcpdump -D显示对应的系统的网口,用于-i后面指定网络接口用 6.2-i 网......
  • 探索 Rust:高性能系统编程语言的魅力与安装指南
    Rust是什么?Rust是一门系统编程语言,旨在提供高性能、并发安全和内存安全的编程环境。它于2010年首次发布,由Mozilla开发,并在开源社区的支持下不断发展壮大。优点内存安全:Rust最重要的特点之一是内存安全。它通过引入所有权(ownership)、借用(borrowing)和生命周期(lifetimes)的概念,允......
  • 高性能网络通信模型——Reactor 和 Proactor
    原来8张图,就能学废Reactor和Proactor(qq.com)高并发编程--Reactor模式与Proactor模式(qq.com)Reactor模型Reactor,翻译为反应器,他是一个被动的感觉,可以理解为接收到客户端事件后,Reactor模型会根据事件类型调用相应的代码进行处理。Reactor模型也叫作Dispatcher模式,底层是......
  • 具有200MHz入门级微控制器R7FA6E2BB2CBC、R7FA6E2BB3CNE、R7FA6E2B92CBB是高性能、小
    一、简介RA6E2组是RA6系列中最新的入门级微控制器,基于带有TrustZone的200MHzArm®Cortex®-M33内核。RA6E2MCU作为入门级微控制器,在追求成本优化的同时提供了最佳的性能。与RA4E2组的引脚和外设兼容,使其成为要求更高性能、小尺寸和低引脚数的应用的理想选择。二、器件规格1......
  • c# socket tcp 通信 结构体 字节流 大端序列 小端序列
    SeerAGV_2/SeerMessage.csusingSystem.Reflection;usingSystem.Runtime.InteropServices;namespaceSeerAGV{publicstructSeerMessageHead{publicbytesync;publicbyteversion;publicushortnumber;publicuintl......