首页 > 其他分享 >消息的消费组配置、堆积总结

消息的消费组配置、堆积总结

时间:2023-10-01 11:00:24浏览次数:35  
标签:总结 消费 max 配置 bytes selector 吞吐量 检查点 堆积

一、Azure EventHub
1.1、消费客户端负载均衡

引用包:Azure.Messaging.EventHubs;EventHubConsumerClient简单消费;不能持久化保存所有权\检查点,做负载均衡;需要自定义实现 EventProcessor<TPartition>,如通过Redis实现所有权获取\设置、检查点获取\设置。

参考文档:

https://devblogs.microsoft.com/azure-sdk/eventhubs-clients/

https://devblogs.microsoft.com/azure-sdk/custom-event-processor/

1.2、提供吞吐量,避免堆积

eventBatchMaximumCount ---消息批处理最大数量

EventProcessorOptions:

  PrefetchCount ---Event处理器 缓冲区预取数量

  ConnectionOptions.ReceiveBufferSizeInBytes---EventHub 接收缓冲区字节

增大这3个参数的值,可明显提高吞吐量。

protected override async Task OnProcessingEventBatchAsync(IEnumerable<EventData> events, TPartition partition, CancellationToken cancellationToken)
{
    if (events.Any())
    {
        //一次多个消息可并发处理
        
        var parallelQuery = Partitioner
        //负载平衡方式执行
        .Create(array, true)
        .GetPartitions(partitionCount)
        .AsParallel().Select(async (selector) =>
        {
            using (selector)
            {
                while (selector.MoveNext())
                {
                    byte[] eventBodyBytes = selector.Current.EventBody.ToArray();

                    string body = Encoding.UTF8.GetString(eventBodyBytes);
                    //反序列化,业务逻辑处理
                }
            }
        });
        await Task.WhenAll(parallelQuery.ToArray());

        //消费成功设置检查点,保存redis
    }
}

二、kafka
2.1、提供吞吐量,避免堆积

引用包:Confluent.Kafka,topic分片和消费组数量很多,如:12个,解决消费堆积办法:

使用自动提交策略,不要手动Commit(consumeResult),它是阻塞方法。
ConsumerConfig:

  EnableAutoCommit(enable.auto.commit) 启动自动提交,默认true

  AutoCommitIntervalMs(auto.commit.interval.ms) 自动提交时间间隔,默认5秒

其他可配置参数(ConsumerConfig 有相对应的属性)的默认值,对吞吐量影响不大,如:

  fetch.max.bytes
  max.partition.fetch.bytes
  queued.max.messages.kbytes         
  queued.min.messages
  fetch.wait.max.ms
  receive.message.max.bytes
  socket.receive.buffer.bytes

consumerBuilder.Consume() 一次仅返回单条,先收集到ActionBlock<string> 再异步并发消费;ActionBlock的参数 ExecutionDataflowBlockOptions.MaxDegreeOfParallelism 设置并发度。

问题:应用发布时可能重复消费、消费ActionBlock异常时可能丢失消费。

适用场景:业务流程对 及时性 优先于 准确性\完整性,生产者有定时重复数据、消费保存数据使用是有降级兜底,采用自动提交消费offset的方案,提高吞吐量。

 

标签:总结,消费,max,配置,bytes,selector,吞吐量,检查点,堆积
From: https://www.cnblogs.com/yinyunpan/p/17738639.html

相关文章

  • 2023-2024-1 20231312 《 计算机基础与程序设计》第1周学习总结
    作业信息|这个作业属于哪个课程|<班级的链接>2023-2024-1-计算机基础与程序设计||这个作业要求在哪里|<作业要求链接>2023-2024-1计算机基础与程序设计第一周作业||这个作业的目标|<快速浏览一遍教材《计算机科学概论》,并提出自己不......
  • pytest: 在配置文件pytest.ini中自定义选项/变量
    1pytest.ini中有很多pytest框架自带的选项,即便你没有定义一个pytest.ini文件,这些选项依然存在并影响pytest的很多行为执行pytest-h可以看到这些选项及默认值上面这些默认选项及其默认值可以被被系统自动读取,不信我们在test中打印一个出来看看选取xfail_strict,上图中可以看......
  • 学期2023-2024-1 学号20231315 《计算机基础与程序设计》第一周学习总结
    学期2023-2024-1学号20231315《计算机基础与程序设计》第一周学习总结作业信息这个作业属于哪个课程?2023-2024-1《计算机基础与程序设计》这个作业要求在哪里?2023-2024-1《计算机基础与程序设计》这个作业的目标?**快速浏览一遍教材计算机科学概论,课本每章提出......
  • 9.29每日总结
    今日学习时间两小时,开始学习软考知识,学习下午题第一道大题的解题方法        ......
  • 每日总结
    今日收获背单词!!我真的服了,每天不爆出来几个错就不是电脑啦~~~~技术的进步!明天预计预计明后两天将erp的基本框架搭好,然后就开始进行动态的设计和开发(压力还是蛮大的!!!)背单词必不可少的啦!......
  • 第八九章,知识完整性总结
    第八章-使用系统调用进行文件操作目录1.系统调用操作系统中进程以内核模式和用户模式运行,系统调用可以暂时让处于用户模式的进程拥有内核模式的高权限,以便拥有必要的操作权限。2.系统调用手册可以使用 man2stat等命令来查看操作说明:man2stat:查看stat(),fstat(......
  • #2023-2024-1 20231311《计算机基础与程序设计》第一周学习总结
    作业信息这个作业属于哪个课程https://edu.cnblogs.com/campus/besti/2023-2024-1-CFAP/这个作业的要求在哪里https://edu.cnblogs.com/campus/besti/2023-2024-1-CFAP/homework/13009这个作业的目标快速浏览一遍教材计算机科学概论,课本每章提出至少一个自己不懂的......
  • 2023.9.23 总结
    T1题面:从左往右有n个格子,编号\(1\)至\(n\)。一开始每个格子都有1颗糖果。你总共需要进行\(k\)次操作,每次操作把从某个格子取\(1\)颗糖(前提是该格子有糖),放到另一个格子。当\(k\)次操作全部结束以后,从左往右检查,这\(n\)个格子的糖果数量。求这\(n\)个格子总共有多少......
  • 2023-2024-1 20231304《计算机基础与程序设计》第一周学习总结
    2023-2024-120231304《计算机基础与程序设计》第一周学习总结作业信息这个作业属于哪个课程<班级的链接>2023-2024-1-计算机基础与程序设计这个作业要求在哪里<作业要求的链接>2023-2024-1计算机基础与程序设计第一周作业这个作业的目标快速浏览预习《计算机科......
  • 2023-2024 20231324《计算机基础与程序设计》第1周学习总结
    这个作业属于哪个课程2023-2024-1《计算机基础与程序设计》这个作业的要求在哪里2023-2024-1计算机基础与程序设计第一周作业这个作业的目标快速浏览教材《计算机科学概论》,提出自己不懂或最想解决的问题并在期末回答作业正文本博客链接https://ww......