1.Consumer一对一消费Partition
(1).简介
这种类型是经典模式,每一个线程单独创建一个KafkaConsumer消费一个partition,用于保证线程安全。
(2).代码示例
public class KafkaConsumerRunner implements Runnable {
private static final String topicName = "steven";
//状态开关,如果值为false则进行线程任务处理
private final AtomicBoolean closed = new AtomicBoolean(false);
//Kafka的Consumer是线程不安全的,如果要使用需要开发者自己解决线程安全问题
//每一个线程单独创建一个KafkaConsumer,用于保证线程安全
private final KafkaConsumer consumer;
public KafkaConsumerRunner() {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
TopicPartition p0 = new TopicPartition(topicName, 0);
TopicPartition p1 = new TopicPartition(topicName, 1);
consumer.assign(Arrays.asList(p0, p1));
}
public void run() {
try {
while (!closed.get()) {
//处理消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
//处理每个分区的消息
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
//返回去告诉kafka新的offset
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
//单个partition中的offset,并且进行提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
consumer.commitSync(offset);
}
}
} catch (WakeupException e) {
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
public class ConsumerThreadSample {
public static void main(String[] args) {
KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner();
Thread thread = new Thread(kafkaConsumerRunner);
thread.start();
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaConsumerRunner.shutdown();
}
}
(2).代码运行结果
运行自定义分区负载均衡发送示例代码向主题名为steven的partition0和partition1各发送5条数据,然后再执行ConsumerThreadSample代码,可以观察到控制台输出如下信息。
parition = 0,offset = 0,key = key-0,value = value-0
parition = 0,offset = 1,key = key-2,value = value-2
parition = 0,offset = 2,key = key-4,value = value-4
parition = 0,offset = 3,key = key-6,value = value-6
parition = 0,offset = 4,key = key-8,value = value-8
partition = 1,offset = 0,key = key-1,value = value-1
partition = 1,offset = 1,key = key-3,value = value-3
partition = 1,offset = 2,key = key-5,value = value-5
partition = 1,offset = 3,key = key-7,value = value-7
partition = 1,offset = 4,key = key-9,value = value-9
2.一个Consumer多个时间处理器
(1).代码示例
public class ConsumerRecordThreadSample {
private static final String topicName = "steven";
private static final String brokerList = "127.0.0.1:9092";
private static final String groupId = "test";
public static void main(String[] args) {
int workerNum = 5;
ConsumerExecutor consumers = new ConsumerExecutor(brokerList, groupId, topicName);
consumers.execute(workerNum);
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumers.shutdown();
}
}
public class ConsumerExecutor {
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
public ConsumerExecutor(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (final ConsumerRecord record : records) {
executors.submit(new ConsumerRecordWorker(record));
}
}
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case.");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}
public class ConsumerRecordWorker implements Runnable {
private ConsumerRecord<String, String> record;
public ConsumerRecordWorker(ConsumerRecord record) {
this.record = record;
}
@Override
public void run() {
//假如说数据入库操作
System.err.printf("Thread-" + Thread.currentThread().getName() + ",partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
(2).代码运行结果
运行自定义分区负载均衡发送示例代码向主题名为steven的partition0和partition1各发送5条数据,然后再执行ConsumerThreadSample代码,可以观察到控制台输出如下信息。
parition = 0,offset = 0,key = key-0,value = value-0
parition = 0,offset = 1,key = key-2,value = value-2
parition = 0,offset = 2,key = key-4,value = value-4
parition = 0,offset = 3,key = key-6,value = value-6
parition = 0,offset = 4,key = key-8,value = value-8
partition = 1,offset = 0,key = key-1,value = value-1
partition = 1,offset = 1,key = key-3,value = value-3
partition = 1,offset = 2,key = key-5,value = value-5
partition = 1,offset = 3,key = key-7,value = value-7
partition = 1,offset = 4,key = key-9,value = value-9