首页 > 其他分享 >kafka消息只能在一台服务器消费的问题解决过程

kafka消息只能在一台服务器消费的问题解决过程

时间:2024-04-19 10:15:46浏览次数:31  
标签:消费 xxapp03 xx kafka 一台 线程 服务器 976bc2af

场景:

kafka消费端应用部署在两台机器上,其中一台能消费到生产端发出的kafka消息,另一台服务器接收不到任何消息。

解决过程:

一、从消费端启动日志中找出所有消费端线程

2024-04-23 20:04:44,726 [xx_xxapp03-1556011171628-976bc2af_watcher_executor] INFO  kafka.consumer.RangeAssignor -
                [Ip:|User:]Consumer xx_xxapp03-1556011171628-976bc2af rebalancing the following partitions: ArrayBuffer(0, 1) for topic xx_import_data with consumers: List(xx_xxapp03-1556011171628-976bc2af-0, xx_xxapp03-1556021084370-e7a7a47d-0, xx_xxapp04-1556011055427-2927d364-0)
2024-04-23 20:04:44,726 [xx_xxapp03-1556011171628-976bc2af_watcher_executor] INFO  kafka.consumer.RangeAssignor -
                [Ip:|User:]xx_xxapp03-1556011171628-976bc2af-0 attempting to claim partition 0

二、分析启动日志
从日志第二行可以看出有三个消费端线程xx_xxapp03-1556011171628-976bc2af-0, xx_xxapp03-1556021084370-e7a7a47d-0,xx_xxapp04-1556011055427-2927d364-0消费两个分区partitions: ArrayBuffer(0, 1)。
我们知道kafka的一个partition 只能让一个消费者线程消费,那么排在最后的消费端线程肯定消费不到消息,因为只有2个partition已经分别被前两个消费线程消费了,如果前两个消费线程和最后一个消费线程分别位于两台不同的机器上,则有一台机器消费不到kafka消息,问题就是出自这里。

三、修改kafka消费线程生产相关的代码

public void startConsumer() {
        consumerConnector = consumerConnectorFactory.createConsumerConnector(zookeeperConnect, groupId);
        LOGGER.info("CSIRAS 启动 Kafka topic xx_import_data Kafka Consumer连接");
        // topic信息容器
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        //cosumer线程数
        int consumerThreadNum = partitionNum / serverNum != 0 ? partitionNum / serverNum : 1;
        topicCountMap.put(topic, consumerThreadNum);
        // 获取消息流
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
        // 获取消息列表
        List<KafkaStream<byte[], byte[]>> kafkaStreamList = consumerMap.get(topic);
        ......
}

int consumerThreadNum = partitionNum / serverNum != 0 ? partitionNum / serverNum : 1;

通过上面的代码控制每个服务器应用上的消费线程数,保证每个应用不会生产过多无效的消费线程。

标签:消费,xxapp03,xx,kafka,一台,线程,服务器,976bc2af
From: https://www.cnblogs.com/John-2011/p/18145180

相关文章

  • 一次 kafka 消费者的性能调优过程
    背景最近上线了一个kafka的消费者,数据规模大概是低峰期单机每分钟消费88W条,QPS14666。上线后看了下数据,进程CPU到了132%。 8核的机器,单进程CPU132倒也还好,但还是想看看,到底是咋回事。过程第一次排查&优化(协程池化->约为0优化)于是就开始采集pprof的数据。golangpprof的采......
  • Kafka 线上性能调优
    Kafka线上性能调优是一项综合工程,不仅仅是Kafka本身,还应该从硬件(存储、网络、CPU)以及操作系统方面来整体考量,首先我们要有一套生产部署方案,基于这套方案再进行调优,这样就有了可靠的底层保证,才能保证Kafka集群整体的稳定性。1.线上部署方案1.1操作系统我们知道Kafka是......
  • 低功耗蓝牙系列芯片CH57x,CH58x,CH59x实现蓝牙多通道配对/回连同一台电脑或多台电脑实
    依旧以老朋友CH583为例,我们随便打开一个HID例程,还是以HID_Keyboard例程做讲解:1、直奔主题到我们的库函数初始化函数中去,红圈圈中的SNVAddr配置信息为我们此次配对连接后所获取到的信息存储的地址区,配对信息中包含:对方MAC地址,设备信息,协商所得IRK密钥等关键信息,都会保存到SNVAddr......
  • MSSQL 数据库服务器磁盘空间报警 -
    如服务器上有创建MSSQLReplication,则会自动创建distribution数据库,有时distribution数据库日志文件过大解决方案:1.第一种方案:查看日志大小, dbccsqlperf(logspace),查看哪个数据库日志文件过大,如果有数据库日志文件非常大,就需要通过检查日志的VLF使用情况来进行诊断,日志文件......
  • kafka中文输出乱码
    目录乱码问题通常是由于编码不一致导致的。Kafka在处理消息时,如果生产者和消费者的字符编码设置不一致,可能会导致乱码。解决方法:确认Kafka生产者和消费者的字符编码设置一致。对于Java生产者和消费者,可以在配置中设置字符编码:props.put("key.serializer","org.apache.kafk......
  • 服务器raid卡,守护数据安全,赋能新质生产力
    RAID卡,全称为独立冗余磁盘阵列卡,在数据中心、服务器、网络存储等领域得到广泛应用,RAID卡通过不同的RAID级别实现数据容错和冗余。例如,RAID0主要适用于需要高速数据传输但对数据安全要求不高的场景,如数据的缓存;RAID1使用镜像备份确保数据不因硬盘故障而丢失。然而市面上的Raid卡......
  • 用海豚调度器定时调度从Kafka到HDFS的kettle任务脚本
    在实际项目中,从Kafka到HDFS的数据是每天自动生成一个文件,按日期区分。而且Kafka在不断生产数据,因此看看kettle是不是需要时刻运行?能不能按照每日自动生成数据文件?为了测试实际项目中的海豚定时调度从Kafka到HDFS的Kettle任务情况,特地提前跑一下海豚定时调度这个任务,看看到底什么......
  • golang+kafka
    目录1.安装JDK、Zookeeper、Scala、kafka2.启动kafka3.创建topics4.查看topics5.打开一个producer6.打开一个consumer7.测试发送和接收消息Windows下安装Kafka1.安装JDK、Zookeeper、Scala、kafka安装Kafka之前,需要安装JDK、Zookeeper、Scala。Kafka依赖Zookeeper,......
  • 在Go语言中往Kafka中发送数据,通常会使用Sarama库
    目录Sarama简介基本步骤示例代码Sarama简介Sarama是一个用Go语言编写的ApacheKafka客户端库,由Shopify公司最初开发,并在后来被IBM接管维护。Sarama库提供了一套完整的Kafka功能支持,包括生产者(Producer)、消费者(Consumer)以及消费者组(ConsumerGroup)等组件,允许开发者在Go应用程序......
  • 批量扫描并上报所有服务器已信任的authorized_keys
    https://www.cnblogs.com/iAmSoScArEd/p/18140656-我超怕的codefromflaskimportFlask,requestimportcsvapp=Flask(__name__)@app.route('/',methods=['POST'])defreceive_data():data=request.data.decode('utf-8')......