参考:https://blog.csdn.net/zhengzaifeidelushang/article/details/121984379
深入浅出理解kafka原理系列之:java实现kafka消费者
一、pom.xml引入kafka依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.2</version>
</dependency>
二、kafka消费者程序
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MyConsumer {
private final static String TOPIC_NAME = "optics-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username=\"debezium\" password=\"NGFlM2I1NTJlNmFk\";");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
//消费者组
props.put("group.id", "opticsgroup");
//反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
//订阅主题
consumer.subscribe(Arrays.asList(TOPIC_NAME));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String,String> record : records){
System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
三、kafka生产者程序
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MyProducer {
private final static String TOPIC_NAME = "optics-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username=\"debezium\" password=\"NGFlM2I1NTJlNmFk\";");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
//ack模式,all是最慢但最安全的
props.put("acks", "-1");
props.put("retries", 3);
//每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
props.put("batch.size", 16384);
//生产者客户端能发送消息的最大值,默认值为1048576B,1MB
//props.put("max.request.size",10);
//消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
props.put("linger.ms", 10);
//整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
//buffer.memory要大于batch.size,否则会报申请内存不足的错误
props.put("buffer.memory", 33554432);
//序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
for (int i = 0; i < 10; i++) {
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, Integer.toString(i), "dd:" + i)).get();
System.out.println("同步方式发送消息结果:" + "topic名称:" + metadata.topic() + " | partition分区:" + metadata.partition() + " | offset偏移量:" + metadata.offset());
}
}
}
四、先运行kafka生产者程序,再查看kafka消费者程序
标签:java,生产者,import,kafka,apache,props,put,org From: https://www.cnblogs.com/qsds/p/16643553.html