首页 > 其他分享 >KafKa消费开发

KafKa消费开发

时间:2023-06-06 09:01:11浏览次数:36  
标签:消费 Console consumeResult KafKa 开发 ex WriteLine Message consumer

 KafKa消费开发配置

以下代码需要写完整,不完整会出现中断,假死现象,长时间不处理问题。(实际项目代码)

        
        /// <summary>
        ///         - offsets 是自动提交的。
        ///         - consumer.Poll / OnMessage 是用于消息消费的。
        ///         - 没有为轮询循环创建(Poll)二外的线程,当然可以创建
        /// </summary>
        public static void Run_Poll(string brokerList, List<string> topics)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = brokerList,
                GroupId = $"Consumer_{topics[0]}_test", //_{topics[0]}  //修改此值会导致数据重新计算
                EnableAutoCommit = false, // 禁止AutoCommit
                //Acks = Acks.Leader, // 假设只需要Leader响应即可
                AutoOffsetReset = AutoOffsetReset.Earliest, // 从最早的开始消费起
                StatisticsIntervalMs=5000,// 5秒内无数据输出侦听中
                SessionTimeoutMs=6000,
                //EnablePartitionEof=true
            };

            try
            {
                using (var consumer = new ConsumerBuilder<Ignore, string>(config).SetErrorHandler((_,e)=> { 
                    LogInfo($"ErrorHandler:{e.Reason}");
                    Console.WriteLine($"ErrorHandler:{e.Reason}");
                }).SetStatisticsHandler((_, json) => {
                    //LogInfo($"{DateTime.Now}  消息监听中...");
                    Console.WriteLine($"{DateTime.Now}  消息监听中...");
                }).Build())
                {
                    consumer.Subscribe(topics);
                    try
                    {
                        while (true)
                        {

                            Stopwatch swIshttp = new Stopwatch();
                            swIshttp.Start();

                            try
                            {
                                System.Threading.CancellationToken cancellationToken = default;
                                var consumeResult = consumer.Consume(cancellationToken);
                                bool returnBl = true;
                                if (consumeResult.Message != null)
                                {
                                    returnBl = SenMsg("", consumeResult.Offset.ToString(), consumeResult.Topic, consumeResult.Message.Value, consumeResult.Timestamp.UtcDateTime);
                                }
                                //Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");
                                if (consumeResult.IsPartitionEOF)
                                {
                                    Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
                                    Thread.Sleep(1000);
                                    continue;
                                }

                                if (returnBl)//处理完成,做标记
                                {
                                    try
                                    {
                                        consumer.Commit(consumeResult);
                                    }
                                    catch (KafkaException e)
                                    {
                                        LogInfo($"Commit 处理完成,做标记失败 : {e.Message}--{e.StackTrace}");
                                        //Console.WriteLine(e.Message);
                                    }
                                }
                            }
                            catch (ConsumeException e)
                            {
                                Console.WriteLine($"Consume error: {e.Error.Reason}");
                                LogInfo($"Consume error: {e.Error.Reason}");
                            }
                            swIshttp.Stop();
                            if (swIshttp.ElapsedMilliseconds > ElapsedMillisecondsMax)
                            {
                               // Console.WriteLine($"consumer.Consume 用时过长 {swIshttp.ElapsedMilliseconds}");
                            }
                        }
                    }
                    catch (OperationCanceledException ee)
                    {
                        Console.WriteLine("Closing consumer.");
                        LogInfo($"Closing consumer.: {ee.Message}--{ee.StackTrace}");
                        consumer.Close();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"Run_Poll ERR.{ex.Message}");
                        LogInfo($"Run_Poll .: {ex.Message}--{ex.StackTrace}");
                    }
                }
            }catch(Exception ex)
            {
                Console.WriteLine($"Run_Poll 处理失败.{ex.Message}");
                LogInfo($"Run_Poll 处理失败  {ex.Message}  {ex.StackTrace} "); 
            }
        }

  

标签:消费,Console,consumeResult,KafKa,开发,ex,WriteLine,Message,consumer
From: https://www.cnblogs.com/rboc/p/17459528.html

相关文章

  • Vue自定义指令-让你的业务开发更简单
    1、使用场景在日常开发中,我们会将重复代码抽象为一个函数或者组件,然后在需要时调用或者引入。但是,对于某些功能,这种方法可能不够优雅或者不够灵活。例如,我们可能需要在DOM元素上添加一些自定义属性或者绑定一些事件,这些操作可能难以通过函数或组件来实现。这时,自定义指令就派上用......
  • 万字长文讲透 RocketMQ 4.X 消费逻辑
    RocketMQ是笔者非常喜欢的消息队列,4.9.X版本是目前使用最广泛的版本,但它的消费逻辑相对较重,很多同学学习起来没有头绪。这篇文章,笔者梳理了RocketMQ的消费逻辑,希望对大家有所启发。1架构概览在展开集群消费逻辑细节前,我们先对RocketMQ4.X架构做一个概览。整体架构中......
  • 万字长文讲透 RocketMQ 4.X 消费逻辑
    RocketMQ是笔者非常喜欢的消息队列,4.9.X版本是目前使用最广泛的版本,但它的消费逻辑相对较重,很多同学学习起来没有头绪。这篇文章,笔者梳理了RocketMQ的消费逻辑,希望对大家有所启发。1架构概览在展开集群消费逻辑细节前,我们先对RocketMQ4.X架构做一个概览。整体架构中......
  • stm32开发环境搭建 mdk5 keil 【未完成】
    百度网盘:https://pan.baidu.com/s/1WH3l3_ICLf1w5FMhKeAWaA提取码:7pxa带stm32f4和stm32f1的支持包 破解安装支持包更换汉化包,打开文件所在位置-替换uv4文件更换global——def文件替换主题风格安装astle格式化工具  打开注册机 软件安装——Keil的安装与配置......
  • 搭建本地开发服务器
    原文点此跳转注意在上一个案例的基础上添加本地开发服务器,请保留上个案例的代码。如需要请查看 Webpack使用。搭建本地开发服务器这一个环节是非常有必要的,我们不可能每次修改源代码就重新打包一次。这样的操作是不是太繁琐了。所以本地开发服务器的作用就能体现了,它会自动监听我......
  • Spark消费Kafka
    0.前言之前先写了处理数据的spark,用文件读写测了一批数据,能跑出结果;今天调通了Kafka,拼在一起,没有半点输出,查了半天,发现是之前的处理部分出了问题,把一个不等号打成了等号,把数据全filter没了。很恐怖,我保证这段时间我没动过这段代码,但上次真的跑出东西了啊(尖叫1.配置流程主节点......
  • Taro框架应用优势下的移动App开发创新模式
    最近公司的一些项目需要跨端框架,技术老大选了Taro,实践了一段时间下来,愈发觉得Taro是个好东西,所以在本篇文章中稍微介绍下。 什么是Taro?Taro(或称为Taro框架)是一种用于构建跨平台应用程序的开源JavaScript框架。它基于React和ReactNative,可以用于开发Web、iOS、Android和微......
  • Java开发手册中为什么不建议在for循环中使用"+"进行字符串操作
    场景java开发手册中对于循环体中进行字符串的拼接要求如下:【推荐】循环体内,字符串的连接方式,使用StringBuilder的append方法进行扩展。说明:下例中,反编译出的字节码文件显示每次循环都会new出一个StringBuilder对象,然后进行append操作,最后通过toString方法返回Stri......
  • 广州app软件定制开发公司哪家好?
    广州不仅是一座文化名城,是岭南文化的中心城市和南越古国的国都,还是国内四大一线城市,经济发展名列前茅。在广州,app软件定制开发还是很盛行的,那如果想在广州做app软件定制开发,找哪家app软件定制开发公司好呢?在广州,app软件定制开发公司数量繁多,不过质量参差不齐,如果稍有不慎,很容易碰......
  • 【六月摸鱼计划】飞碟开发计划偶遇的物联网问题
    记录modbus的上位机记录Modbus的上位机可以使用第三方工具或编程语言来实现。以下是使用Python编程实现记录Modbus数据的示例代码:```pythonimportmodbus_tkimportmodbus_tk.definesascstfrommodbus_tkimportmodbus_tcp创建ModbusTCP客户端master=modbus_tcp.TcpMaster......