pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
测试代码:
package com.kaven.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {
private static final String[] MESSAGE = new String[]{"你好啊", "现在正在测试Kafka的Producer", "先溜了"};
public static void main(String[] args) throws ExecutionException, InterruptedException {
send("new-topic-user");
}
public static void send(String name) throws ExecutionException, InterruptedException {
Producer<String, String> producer = ProducerTest.createProducer();
for (int i = 0; i < MESSAGE.length; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
name,
String.valueOf(System.currentTimeMillis()),
MESSAGE[i]
);
// 异步发送
producer.send(producerRecord);
}
// 要关闭Producer实例
producer.close();
}
public static Producer<String, String> createProducer() {
// Producer的配置
Properties properties = new Properties();
// 服务地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.7:9092");
// KEY的序列化器类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// VALUE的序列化器类
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<>(properties);
}
}
使用kafka-console-consumer.sh
脚本来获取Producer
发送的消息。
[root@192 kafka_2.13-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --from-beginning --topic new-topic-user
你好啊
现在正在测试Kafka的Producer
先溜了
KEY
和VALUE
的序列化器类可以在如下图所示的包中找到,Kafka
提供了常用的序列化器。
Producer
异步发送消息,可以通过get
方法来阻塞它。
// 异步发送并阻塞
producer.send(producerRecord).get();
使用回调。
public static void send(String name) throws ExecutionException, InterruptedException {
Producer<String, String> producer = ProducerTest.createProducer();
for (int i = 0; i < MESSAGE.length; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
name,
String.valueOf(System.currentTimeMillis()),
MESSAGE[i]
);
// 异步发送并回调
producer.send(producerRecord, (metadata, exception) -> {
if(exception == null) {
System.out.println("partition: " + metadata.partition() + " offset: " + metadata.offset());
}
else {
exception.printStackTrace();
}
});
}
// 要关闭Producer实例
producer.close();
}
输出:
partition: 1 offset: 28
partition: 2 offset: 21
partition: 0 offset: 22
Topic
有三个分区,可见每个分区存储了一条消息。ProducerRecord
封装了要发送到Kafka
的消息,包括消息需要发送到的Topic
名称、可选的分区号、可选的键、值等。
指定分区号(ProducerRecord
构造函数中的1
):
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
name,
1,
String.valueOf(System.currentTimeMillis()),
MESSAGE[i]
);
输出:
partition: 1 offset: 29
partition: 1 offset: 30
partition: 1 offset: 31
Producer
异步发送与回调就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。