引入依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
编写配置文件:
erver: port: 8080 spring: kafka: bootstrap-servers: 172.16.253.21:9093 producer: # ⽣产者 retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送 batch-size: 16384 buffer-memory: 33554432 acks: 1 # 指定消息key和消息体的编解码⽅式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 listener: # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交 # RECORD # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 # BATCH # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上 次提交时间⼤于TIME时提交 # TIME # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理 record数量⼤于等于COUNT时提交 # COUNT # TIME | COUNT 有⼀个条件满⾜时提交 # COUNT_TIME # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调 ⽤Acknowledgment.acknowledge()后提交 # MANUAL # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种 # MANUAL_IMMEDIATE ack-mode: MANUAL_IMMEDIATE redis: host: 172.16.253.21
生产者:
@RestController public class KafkaController { private final static String TOPIC_NAME = "my-replicated-topic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("/send") public void send() { kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg"); } }
消费者:
@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1") public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //⼿动提交offset ack.acknowledge(); }
==========================================================
标签:监听器,spring,boot,ListenerConsumer,kafka,提交,org From: https://www.cnblogs.com/xiaobaibailongma/p/17280401.html