安装kafka:
Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851
Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353
添加依赖包:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
kafka配置:
在 application.properties 添加以下配置:
### kafka生产者
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
### kafka消费者
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=5
生产者代码:
- KafkaProducerService :
生产者发送消息。
@Component
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 发送消息,处理回调。
* 在发送消息时会自动创建你设置的 topic。
*
*/
public void send() {
MyMsg myMsg = new MyMsg();
myMsg.setName("lin");
myMsg.setId("1234");
//发送消息
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("myTopic1", "key", JSON.toJSONString(myMsg));
//处理回调的结果,比如消息发送失败的处理。如果不需要回调,也可以不处理。
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("消息发送失败." + ex);
}
@Override
public void onSuccess(SendResult<String, String> result) {
ProducerRecord<String, String> producerRecord = result.getProducerRecord();
RecordMetadata recordMetadata = result.getRecordMetadata();
System.out.println("消息发送成功.producerRecord:"+ JSON.toJSONString(producerRecord)
+ ",recordMetadata:" + JSON.toJSONString(recordMetadata));
}
});
}
}
- 调用生产者发送消息:
@RestController
@RequestMapping("/")
public class KafkaController {
@Autowired
private KafkaProducerService kafkaProducerService;
@PostMapping(value = "/kafka/send")
public void send() {
kafkaProducerService.send();
}
}
消费者代码:
- KafkaConsumerService:
@Component
public class KafkaConsumerService {
/**
* Kafka监听器,可以监听消息。
* 指定需要监听的 kafka 主题 topics,可以是多个topic.
* 指定消费者群组 groupId,可以不写.
*
*/
@KafkaListener( topics = {"myTopic1"} , groupId ="myGroup")
public void consume(ConsumerRecord<String, String> consumerRecord) {
System.out.println("消费者接收到信息,内容为:" + consumerRecord.value());
System.out.println("偏移量:" + consumerRecord.offset());
}
}
测试结果 :
调用生产者发送消息,消费者成功接收到消息,类似如下:
消费者接收到信息,内容为:{"id":"1234","name":"lin"}
偏移量:19
标签:示例,spring,代码,kafka,发送,org,consumer,public
From: https://www.cnblogs.com/expiator/p/17795875.html