多线程消费方式
方式1:一个线程对应一个消费者
消费者数量不大于分区数,最好也能对等起来
方式2:多线程消费同一个分区
位移提交和顺序控制的处理非常复杂,不推荐
方式1:消费者==分区数
int threadNum = 4;
for (int i = 0; i < threadNum; i++) {
new KafkaConsumer(config,"tcp_link").start();
}
public KafkaConsumerThread(Properties config, String topic) {
this.kafkaConsumer = new KafkaConsumer(config);
this.kafkaConsumer.subscribe(Collections.singleton(topic));
}
每个线程可以顺序消费各个分区中的消息,但也有个问题,每个
这里有个问题,每个线程都要维护一个独立的TCP连接,如果分区数和线程数很大,系统开销是大大的!
方式3:poll与处理消息模块拆分
处理消息模块用多线程
public class KafkaConsumerBase {
public final Properties configs;
public KafkaConsumerBase() {
this.configs = buildConfig();
}
public Properties buildConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:8080");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group.demo");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return properties;
}
public void Scheduled(String[] args) {
List<String> topics = Arrays.asList("tcp_link", "agent");
KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(this.configs,
topics, Runtime.getRuntime().availableProcessors());
}
public static
class KafkaConsumerThread extends Thread {
private KafkaConsumer kafkaConsumer;
private ThreadPoolExecutor executor;
private Map<TopicPartition, OffsetAndMetadata> offsets;
public KafkaConsumerThread(Properties configs, List<String> topics, int threadNum) {
this.kafkaConsumer = new KafkaConsumer<>(configs);
kafkaConsumer.subscribe(topics);
executor = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(10));
if (!records.isEmpty()) {
executor.submit(new MetricParser(records, offsets));
synchronized (offsets) {
if (!offsets.isEmpty()) {
kafkaConsumer.commitSync();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}
}
public class MetricParser extends Thread {
public final ConsumerRecords<String, String> records;
public Map<TopicPartition, OffsetAndMetadata> offsets;
public MetricParser(ConsumerRecords<String, String> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
this.records = records;
this.offsets = offsets;
}
@Override
public void run() {
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
long lastOffset = tpRecords.get(tpRecords.size() - 1).offset();
synchronized (offsets) {
if (!offsets.containsKey(tp)) {
offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
} else {
long currOffset = offsets.get(tp).offset();
if (currOffset < lastOffset + 1) {
offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
}
}
}
}
}
}
标签:消费,offsets,kafkaConsumer,kafka,records,put,new,public
From: https://www.cnblogs.com/yuanbaobao/p/17973521