步骤:
1、引入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
2、编写配置文件
spring: kafka: # kafka集群地址 bootstrap-servers: localhost:9092 #,172.16.253.38:9093,172.16.253.38:9094 producer: # ⽣产者 retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送 batch-size: 16384 buffer-memory: 33554432 acks: 1 # 指定消息key和消息体的编解码⽅式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 listener: # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交 # RECORD # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 # BATCH # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交 # TIME # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交 # COUNT # TIME | COUNT 有⼀个条件满⾜时提交 # COUNT_TIME # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交 # MANUAL # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种 # MANUAL_IMMEDIATE ack-mode: MANUAL_IMMEDIATEView Code
3、实现生产者&消费者
生产者:
@Resource private KafkaTemplate<String, String> KafkaTemplate; /** * springboot集成kafka * @return */ @RequestMapping("/sendMsg") public String sendMsg() { KafkaTemplate.send(kafkaProperties.getTopic(), "haha"); return "消息发送成功"; }
消费者:
@Component @Slf4j public class MyConsumer { @KafkaListener(topics = "kafka-demo-topic", groupId = "test-group-002") public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack){ String value = record.value(); log.info("value: {}", value); // 手动提交offset,否则消息会重复消费 ack.acknowledge(); } }
标签:集成,提交,value,kafka,监听器,org,poll,springboot From: https://www.cnblogs.com/caesar-the-great/p/17063494.html