首页 > 其他分享 >Mocha MemoryBufferQueue 设计概述

Mocha MemoryBufferQueue 设计概述

时间:2024-01-30 20:55:45浏览次数:25  
标签:Consumer Partition MemoryBufferQueue Mocha 概述 var Segment public

目录

前言

Mocha 是一个基于 .NET 开发的 APM 系统,同时提供可伸缩的可观测性数据分析和存储平台。

更多关于 Mocha 的介绍,可以参考 https://www.cnblogs.com/eventhorizon/p/17979677

Mocha 会需要收集大量的数据,为处理这些数据,我们需要有一个缓冲区。初期我们实现了一个基于内存的缓冲区,下文称之为 MemoryBufferQueue。

Buffer 模块的代码地址:
https://github.com/dotnetcore/mocha/tree/main/src/Mocha.Core/Buffer

本文介绍的版本是 v0.1.0,后续版本可能会有变化。

MemoryBufferQueue 功能概述

MemoryBufferQueue 将数据缓冲到内存中,消费者可以从队列中获取数据,当队列中无数据时,消费者会异步等待数据到来。

MemoryBufferQueue 提供了以下功能:

  1. 支持创建多个 Topic,每个 Topic 都是一个独立的队列。
  2. 支持创建多个 Consumer Group,每个 Consumer Group 的消费进度都是独立的。支持多个 Consumer Group 并发消费同一个 Topic。
  3. 支持同一个 Consumer Group 创建多个 Consumer,以负载均衡的方式消费数据。
  4. 支持数据的批量消费,可以一次性获取多条数据。
  5. 支持重试机制,当消费者处理数据失败时,可以选择不确认消费,这样数据会被重新消费。

需要注意的是,当前版本出于简化实现的考虑,暂不支持消费者的动态扩容和缩容,需要在创建消费者时指定消费者数量。

Buffer 模块 API 设计

MemoryBufferQueue 的出发点的是在项目初期提供一个性能足够高的内存缓存队列。后期随着项目的发展,我们可能会将其替换为别的实现,比如支持持久化的队列。

为了解耦,Buffer 模块使用 Interface 进行了抽象。

public interface IBufferQueue
{
    IBufferProducer<T> CreateProducer<T>(string topicName);

    IBufferConsumer<T> CreateConsumer<T>(BufferConsumerOptions options);

    IEnumerable<IBufferConsumer<T>> CreateConsumers<T>(BufferConsumerOptions options, int consumerNumber);
}

internal interface IBufferQueue<T>
{
    string TopicName { get; }

    IBufferProducer<T> CreateProducer();

    IBufferConsumer<T> CreateConsumer(BufferConsumerOptions options);

    IEnumerable<IBufferConsumer<T>> CreateConsumers(BufferConsumerOptions options, int consumerNumber);
}

public interface IBufferProducer<in T>
{
    string TopicName { get; }

    ValueTask ProduceAsync(T item);
}
public interface IBufferConsumer<out T>
{
    string TopicName { get; }

    string GroupName { get; }

    IAsyncEnumerable<IEnumerable<T>> ConsumeAsync(CancellationToken cancellationToken = default);

    ValueTask CommitAsync();
}

public class BufferConsumerOptions
{
    public required string TopicName { get; init; }

    public required string GroupName { get; init; }

    public bool AutoCommit { get; init; }

    public int BatchSize { get; init; } = 100;
}

数据通过 Producer 写入 BufferQueue,由 Consumer 进行消费。

我们对 BufferQueue 有以下的要求:

  • 同一个数据类型 下的 不同 Topic 的 BufferQueue 互不干扰。

  • 同一个 Topic 下的 不同数据类型 的 BufferQueue 互不干扰。

BufferQueue

因此我们设计了两个层级的接口:

  • IBufferQueue:根据 TopicName类型参数 T 将请求转发给具体的 IBufferQueue<T> 实现(借助 KeyedService 实现),其中参数 T 代表 Buffer 所承载的数据实体的类型。

  • IBufferQueue<T>:具体的 BufferQueue 实现,负责管理 Topic 下的数据。属于 Buffer 模块的内部实现,不对外暴露。

IBufferQueue

Buffer 模块提供了通过 ServiceCollection 进行注册的扩展方法:

public static class BufferServiceCollectionExtensions
{
    public static IServiceCollection AddBuffer(
        this IServiceCollection services,
        Action<BufferOptionsBuilder> configure)
    {
        services.AddSingleton<IBufferQueue, BufferQueue>();
        configure(new BufferOptionsBuilder(services));
        return services;
    }
}

MemoryBufferQueue 模块通过提供 BufferOptionsBuilder 来进行配置:

public static class BufferOptionsBuilderExtensions
{
    public static BufferOptionsBuilder UseMemory(
        this BufferOptionsBuilder builder,
        Action<MemoryBufferOptions> configure)
    {
        var options = new MemoryBufferOptions(builder.Services);
        configure(options);

        return builder;
    }
}

下面是配置和使用 MemoryBufferQueue 的示例:

var services = new ServiceCollection();

services.AddBuffer(options =>
{
    options.UseMemory(bufferOptions =>
    {
        bufferOptions.AddTopic<MochaSpan>("otlp-span", Environment.ProcessorCount);
    });
});

var provider = services.BuildServiceProvider();

var bufferQueue = provider.GetRequiredService<IBufferQueue>();

var producer = bufferQueue.CreateProducer<MochaSpan>("otlp-span");

var consumers = bufferQueue.CreateConsumers<MochaSpan>(new BufferConsumerOptions
{
    TopicName = "otlp-span",
    GroupName = "test",
    AutoCommit = true, // 配置为 false 时,需要手动调用 CommitAsync 方法
    BatchSize = 100
}, 2);

var consumerTasks = consumers.Select(async consumer =>
{
    await foreach (var batch in consumer.ConsumeAsync())
    {
        foreach (var item in batch)
        {
            Console.WriteLine(item);
        }
        // 如果 AutoCommit 为 false,需要手动调用 CommitAsync 方法
        // await consumer.CommitAsync();
    }
});

Task.Run(async () =>
{
    for (int i = 0; i < 1000; i++)
    {
        await producer.ProduceAsync(new MochaSpan());
    }
});

await Task.WhenAll(consumerTasks);

MemoryBufferQueue 的设计

Partition 的设计

为了保证消费速度,MemoryBufferQueue 将数据划分为多个 Partition,每个 Partition 都是一个独立的队列,每个 Partition 都有一个对应的消费者线程。

Producer 以轮询的方式往每个 Partition 中写入数据。
Consumer 最多不允许超过 Partition 的数量,Partition 按平均分配到组内每个 Customer 上。
当一个 Consumer 被分配了多个 Partition 时,以轮训的方式进行消费。
每个 Partition 上会记录不同消费组的消费进度,不同组之间的消费进度互不干扰。

Partition

对并发的支持

Producer 支持并发写入。

Consumer 消费时是绑定 Partition 的,为保证能正确管理 Partition 的消费进度,Consumer 不支持并发消费。

如果要增加消费速度,需创建多个 Consumer。

Partition 的动态扩容

Partition 的基本组成单元是 Segment,Segment 代表保存数据的数组,多个 Segment 通过链表的形式组合成一个 Partition。

当一个 Segment 写满后,通过在其后面追加一个 Segment 实现扩容。

Segment 中用于保存数据的数组的每一个元素称为 Slot,每个 Slot 都有一个Partition 内唯一的自增 Offset。

Segment

Segment 的回收机制

每次在 Partition 中新增 Segment 时,会从头判断此前的 Segment 是否已经被所有消费组消费完,回收最后一个消费完的 Segment 作为新的 Segment 追加到 Partition 末尾使用。

SegmentRecycle

欢迎关注个人技术公众号

标签:Consumer,Partition,MemoryBufferQueue,Mocha,概述,var,Segment,public
From: https://www.cnblogs.com/eventhorizon/p/17997954

相关文章

  • 鸿蒙Stage模型--概述
    Stage模型:HarmonyOS3.1DevelperPreview版本开始新增的模型,是目前主推且会长期演进的模型。在该模型中,由于提供了AbilityStage、WindowStage等类作为应用组件和Window窗口的“舞台”,因此称这种应用模型为Stage模型。设计思想Stage模型之所以成为主推模型,源于其设计思想。Stage模......
  • 《系统科学方法概论》第二章第一、二节概述
    一.1,系统工程:就是以组织建立或者是经营管理某一系统为目的的工程。2,系统工程内容:系统工程思想,系统工程方法论3,系统工程的本质是:方法论二.系统工程发展史1,古代的系统工程思想和实践例如:都江堰水利工程,2,现代系统工程(1)生产管理系统工程阶段20世纪初至20世纪20年代(2)......
  • 鸿蒙OS 融合搜索概述
    HarmonyOS融合搜索为开发者提供搜索引擎级的全文搜索能力,可支持应用内搜索和系统全局搜索,为用户提供更加准确、高效的搜索体验。基本概念全文索引记录字或词的位置和次数等属性,建立的倒排索引。全文搜索通过全文索引进行匹配查找结果的一种搜索引擎技术。全局搜索可以在系统全......
  • Slint-UI移植到任意平台-概述-Rust
    前言本文仅为笔者记忆,个人经验写着玩,目前1.3.2版本。注:本文尚未完成。注:本文尚未完成。注:本文尚未完成。本人目前想要移植一种贴近前端技术的GUI到裸机上,但是裸机不支持UNIX环境,所以绝大部分框架都用不了(如Flutter/skia等),最后发现Slint最合适。Slint有三种渲染器{femtovg/Op......
  • [office] Excel转dbf技巧及其注意事项概述
    1.DBF文件只会保存工作表中命名区域或当前区域中的数据:当以dBASE(DB2、DB3或DB4)格式保存Excel工作表、且该工作表中包含一个名为“Database”的区域时,只有命名区域中的数据会保存到dBASE文件中。如果区域命名之后又添加了新记录,则必须重新定义包括新记录的“Database”区域后,才能......
  • 1、mysql概述
    1.连接管理与安全性每个客户端连接都会在服务器进程中拥有一个线程,这个连接的查询只会在这个单独的线程中执行,该线程只能轮流在某个CPU核心或者CPU中运行。服务器会负责缓存线程,因此不需要为每一个新建的连接创建或者销毁线程。当客户端(应用)连接到MySOL服务器时,服务器需要......
  • Easysearch:语义搜索、知识图和向量数据库概述
    什么是语义搜索?语义搜索是一种使用自然语言处理算法来理解单词和短语的含义和上下文以提供更准确的搜索结果的搜索技术。旨在更好地理解用户的意图和查询内容,而不仅仅是根据关键词匹配,还通过分析查询的语义和上下文来提供更准确和相关的搜索结果。传统的关键词搜索主要依赖于对关键......
  • Easysearch:语义搜索、知识图和向量数据库概述
    什么是语义搜索?语义搜索是一种使用自然语言处理算法来理解单词和短语的含义和上下文以提供更准确的搜索结果的搜索技术。旨在更好地理解用户的意图和查询内容,而不仅仅是根据关键词匹配,还通过分析查询的语义和上下文来提供更准确和相关的搜索结果。传统的关键词搜索主要依赖于对......
  • Linux图形栈概述
    Linux图形栈概述图形渲染相关概念https://www.x.org/wiki/Development/Documentation/Glossary/直接渲染架构DRIDRI(DirectRenderingInfrastructure):是现代Linux上的图形栈架构,允许用户态程序直接向图形硬件发出命令,主要用途是为OpenGL的Mesa提供硬件加速。零拷贝零拷......
  • 指纹面容识别登录流程概述
    近来在帮忙处理一个IOS端,指纹/面容登录的需求。独立的原生IOS开发人员,已经被优化掉了,我是革命一块儿砖,哪里需要哪里搬,-_-||。在此,对期间遇到的一些实践问题,做一个梳理备忘,也希望可以给其他产品及码农提供参考。本文主要侧重于,整体的移动端指纹/面容实现用户登录的解......