一、设置消费者多线程 参数
private static final int CONSUMER_THREAD_NUM = 1; //订阅topic Map<String, Integer> topicCountMap = Collections.singletonMap(topic, CONSUMER_THREAD_NUM); Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConnector.createMessageStreams(topicCountMap);
1.1 在 Kafka 的旧版 API 中(特别是 0.8 版本及其之前的版本),createMessageStreams
方法用于创建从 Kafka 主题读取数据的消息流。这个方法通常存在于 Kafka 的高级消费者 API 中。
topicCountMap
参数是一个映射,其键是 Kafka 主题的名称,值是该主题应该有多少个消费者线程或流来消费数据。这通常用于指定每个主题应有多少并发读取线程。
具体来说,topicCountMap
的作用如下:
- 主题的识别:键(Key)是 Kafka 主题的名称。这告诉 Kafka 消费者你想从哪些主题中读取数据。
- 并发控制:值(Value)是整数,表示你希望为该主题创建多少个消息流。每个消息流都会有一个单独的消费者线程来读取主题中的消息。例如,如果你为某个主题指定了值
3
,那么将会有三个并发的消费者线程读取该主题的消息。
topicCountMap
中指定了值 3,并不意味着这三个消费者线程将仅消费该主题的一个分区。实际上,这三个消费者线程将尝试消费该主题的所有分区,但具体哪个线程消费哪个分区取决于 Kafka 的分区分配策略。
在 Kafka 中,一个消费者组内的消费者线程会共同消费该组订阅的所有主题的所有分区。每个分区只能由组内的一个消费者线程消费,以确保消息的顺序性和不重复消费。
当你有多个消费者线程在消费者组内时,Kafka 会使用分区分配策略(如 Range 或 RoundRobin)来决定每个消费者线程应该消费哪些分区。这意味着,如果你有 3 个消费者线程,并且主题有 4 个分区,那么可能有两个线程各消费一个分区,而第三个线程消费剩下的两个分区。
因此,为某个主题指定值 3 意味着你希望为该主题启动 3 个消费者线程,并让这些线程共同消费该主题的所有分区。具体每个线程消费哪些分区,则取决于 Kafka 的分区分配策略和当前消费者组的状态
二、关于kafka 消费者 消费者线程和消费分区说明
1、 kafka 消费者 设置3个消费线程 ,三个消费者线程将尝试消费该主题的所有分区,每个分区只能由组内的一个消费者线程消费,以确保消息的顺序性和不重复消费,如kafka有3个分区 只有一个消费者但设置3个消费线程那应该每个消费者线程消费一个分区,但不般不这样设置 正常一个消费者设置一个消费线程,一个消费者线程消费这3个分区。 如果想提高消费速度可以在拉取消息 处理业务逻辑时开启多个线程。 设置消费者线程数量private static final int CONSUMER_THREAD_NUM = 1; //订阅topic Map<String, Integer> topicCountMap = Collections.singletonMap(topic, CONSUMER_THREAD_NUM); Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConnector.createMessageStreams(topicCountMap);
2、分区分配策略有哪些 Range(范围)分配策略: 它基于消费者实例的总数和分区的总数进行整除运算,以确保分区尽可能均匀地分配给所有消费者 (默认选择) RoundRobin(轮询)分配策略:RoundRobin策略将消费者组内的所有消费者以及它们所订阅的所有主题的分区按照某种顺序(通常是hashcode)进行排序,然后通过轮询的方式逐个将分区分配给每个消费者 通过 :partition.assignment.strategy 可以配置分区分配策略 标签:消费,消费者,分区,主题,kafka,线程,参数,多线程,Kafka From: https://www.cnblogs.com/liyanbofly/p/18084446