首页 > 其他分享 >4.5 Kafka Consumer API之多线程并发处理

4.5 Kafka Consumer API之多线程并发处理

时间:2022-10-24 18:01:52浏览次数:94  
标签:4.5 props value Kafka key offset new 多线程 public


1.Consumer一对一消费Partition
(1).简介
这种类型是经典模式,每一个线程单独创建一个KafkaConsumer消费一个partition,用于保证线程安全。

(2).代码示例

public class KafkaConsumerRunner implements Runnable {
private static final String topicName = "steven";
//状态开关,如果值为false则进行线程任务处理
private final AtomicBoolean closed = new AtomicBoolean(false);

//Kafka的Consumer是线程不安全的,如果要使用需要开发者自己解决线程安全问题
//每一个线程单独创建一个KafkaConsumer,用于保证线程安全
private final KafkaConsumer consumer;

public KafkaConsumerRunner() {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

consumer = new KafkaConsumer<>(props);
TopicPartition p0 = new TopicPartition(topicName, 0);
TopicPartition p1 = new TopicPartition(topicName, 1);
consumer.assign(Arrays.asList(p0, p1));
}

public void run() {
try {
while (!closed.get()) {
//处理消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
//处理每个分区的消息
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
//返回去告诉kafka新的offset
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
//单个partition中的offset,并且进行提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
consumer.commitSync(offset);
}
}
} catch (WakeupException e) {
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
}

public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
public class ConsumerThreadSample {
public static void main(String[] args) {
KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner();
Thread thread = new Thread(kafkaConsumerRunner);
thread.start();
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaConsumerRunner.shutdown();
}
}

(2).代码运行结果
运行自定义分区负载均衡发送示例代码向主题名为steven的partition0和partition1各发送5条数据,然后再执行ConsumerThreadSample代码,可以观察到控制台输出如下信息。

parition = 0,offset = 0,key = key-0,value = value-0
parition = 0,offset = 1,key = key-2,value = value-2
parition = 0,offset = 2,key = key-4,value = value-4
parition = 0,offset = 3,key = key-6,value = value-6
parition = 0,offset = 4,key = key-8,value = value-8

partition = 1,offset = 0,key = key-1,value = value-1
partition = 1,offset = 1,key = key-3,value = value-3
partition = 1,offset = 2,key = key-5,value = value-5
partition = 1,offset = 3,key = key-7,value = value-7
partition = 1,offset = 4,key = key-9,value = value-9

2.一个Consumer多个时间处理器
(1).代码示例

public class ConsumerRecordThreadSample {
private static final String topicName = "steven";

private static final String brokerList = "127.0.0.1:9092";

private static final String groupId = "test";

public static void main(String[] args) {
int workerNum = 5;
ConsumerExecutor consumers = new ConsumerExecutor(brokerList, groupId, topicName);
consumers.execute(workerNum);
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumers.shutdown();
}
}
public class ConsumerExecutor {
private final KafkaConsumer<String, String> consumer;

private ExecutorService executors;

public ConsumerExecutor(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}

public void execute(int workerNum) {
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (final ConsumerRecord record : records) {
executors.submit(new ConsumerRecordWorker(record));
}
}
}

public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case.");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}
public class ConsumerRecordWorker implements Runnable {
private ConsumerRecord<String, String> record;

public ConsumerRecordWorker(ConsumerRecord record) {
this.record = record;
}

@Override
public void run() {
//假如说数据入库操作
System.err.printf("Thread-" + Thread.currentThread().getName() + ",partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}

(2).代码运行结果
运行自定义分区负载均衡发送示例代码向主题名为steven的partition0和partition1各发送5条数据,然后再执行ConsumerThreadSample代码,可以观察到控制台输出如下信息。

parition = 0,offset = 0,key = key-0,value = value-0
parition = 0,offset = 1,key = key-2,value = value-2
parition = 0,offset = 2,key = key-4,value = value-4
parition = 0,offset = 3,key = key-6,value = value-6
parition = 0,offset = 4,key = key-8,value = value-8

partition = 1,offset = 0,key = key-1,value = value-1
partition = 1,offset = 1,key = key-3,value = value-3
partition = 1,offset = 2,key = key-5,value = value-5
partition = 1,offset = 3,key = key-7,value = value-7
partition = 1,offset = 4,key = key-9,value = value-9


标签:4.5,props,value,Kafka,key,offset,new,多线程,public
From: https://blog.51cto.com/u_15843693/5790760

相关文章

  • 6.4 Kafka集群之副本集
    1.简介kafka的数据是存储在日志文件中的,kafka副本集(副本因子)是指将这些日志文件复制多份从而起到数据备份的目的。kafka中的topic只是个逻辑概念,实际存储数据的是partiti......
  • 6.5 Kafka集群之Leader选举
    1.Broker选举(1).不采用多数投票方式选举的原因kafka并没有采用多数投票来选举leader的(redis和es采用的是多数投票方式来进行选举的),原因有两个,一是防止选举时选举到了数......
  • Kafka Consumer指定时间戳位置消费消息
    KafkaConsumer指定时间戳位置消费消息若用户不想从最旧的或最早的offset位置开始消费,想指定某个时间戳位置开始消费,是否可行呢?答案:可行的用户给定时间戳,kafkaserve......
  • 15.Linux下安装Kafka
    1.解压解压安装包并将解压后的目录移动到/usr/local/kafka目录下。tar-zxvfkafka_2.11-2.3.0.tgzmvkafka_2.11-2.3.0/usr/local/kafkacd2.启动cd3.检验执行jps命令,如......
  • 聊聊kafka
    两个月因为忙于工作毫无输出了,最近想给团队小伙伴分享下kafka的相关知识,于是就想着利用博客来做个提前的准备工作了;接下来会对kafka做一个简单的介绍,包括利用akf原则来解析......
  • kafka springBoot 报错 not present and missingTopicsFatal is true kafka missing-t
    这个问题可以追溯到springboot和kafka的版本问题,解决这个问题太麻烦,要去看官方文档,我选择不看。这里提供一种通用的解决方式在kafkaConfig配置文件中添加下面的代码/**......
  • 【综合笔试题】难度 4.5/5,扫描线的特殊运用(详尽答疑)
    ​题目描述这是LeetCode上的218.天际线问题,难度为困难。Tag:「扫描线问题」、「优先队列(堆)」城市的天际线是从远处观看该城市中所有建筑物形成的轮廓的外部轮廓。给......
  • Java多线程(3):ThreadPool(上)
    您好,我是湘王,这是我的博客园,欢迎您来,欢迎您再来~ 开完一趟车完整的过程是启动、行驶和停车,但老司机都知道,真正费油的不是行驶,而是长时间的怠速、频繁地踩刹车等动作。因......
  • Java生产者消费者问题-多线程-线程同步-线程安全-线程通信
    packageA_ShangGuiGu.Thread.ThreadTest;/***生产者消费者问题:生产者(Producer)将商品(commodity)交给商店(Shop),消费者(Consumer)进行消费*商店里的商品数量上限为20......
  • 《Kafka: a Distributed Messaging System for Log Processing》论文阅读
    《Kafka:aDistributedMessagingSystemforLogProcessing》论文阅读这篇论文是LinkedIn在2011年发表的关于Kafka的论文;论文发表时,kafka还不够完善,使用的人也很少,但......