场景:
kafka消费端应用部署在两台机器上,其中一台能消费到生产端发出的kafka消息,另一台服务器接收不到任何消息。
解决过程:
一、从消费端启动日志中找出所有消费端线程
2024-04-23 20:04:44,726 [xx_xxapp03-1556011171628-976bc2af_watcher_executor] INFO kafka.consumer.RangeAssignor - [Ip:|User:]Consumer xx_xxapp03-1556011171628-976bc2af rebalancing the following partitions: ArrayBuffer(0, 1) for topic xx_import_data with consumers: List(xx_xxapp03-1556011171628-976bc2af-0, xx_xxapp03-1556021084370-e7a7a47d-0, xx_xxapp04-1556011055427-2927d364-0) 2024-04-23 20:04:44,726 [xx_xxapp03-1556011171628-976bc2af_watcher_executor] INFO kafka.consumer.RangeAssignor - [Ip:|User:]xx_xxapp03-1556011171628-976bc2af-0 attempting to claim partition 0
二、分析启动日志
从日志第二行可以看出有三个消费端线程xx_xxapp03-1556011171628-976bc2af-0, xx_xxapp03-1556021084370-e7a7a47d-0,xx_xxapp04-1556011055427-2927d364-0消费两个分区partitions: ArrayBuffer(0, 1)。
我们知道kafka的一个partition 只能让一个消费者线程消费,那么排在最后的消费端线程肯定消费不到消息,因为只有2个partition已经分别被前两个消费线程消费了,如果前两个消费线程和最后一个消费线程分别位于两台不同的机器上,则有一台机器消费不到kafka消息,问题就是出自这里。
三、修改kafka消费线程生产相关的代码
public void startConsumer() { consumerConnector = consumerConnectorFactory.createConsumerConnector(zookeeperConnect, groupId); LOGGER.info("CSIRAS 启动 Kafka topic xx_import_data Kafka Consumer连接"); // topic信息容器 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //cosumer线程数 int consumerThreadNum = partitionNum / serverNum != 0 ? partitionNum / serverNum : 1; topicCountMap.put(topic, consumerThreadNum); // 获取消息流 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); // 获取消息列表 List<KafkaStream<byte[], byte[]>> kafkaStreamList = consumerMap.get(topic); ...... }
int consumerThreadNum = partitionNum / serverNum != 0 ? partitionNum / serverNum : 1;
通过上面的代码控制每个服务器应用上的消费线程数,保证每个应用不会生产过多无效的消费线程。
标签:消费,xxapp03,xx,kafka,一台,线程,服务器,976bc2af From: https://www.cnblogs.com/John-2011/p/18145180