首页 > 其他分享 >springboot集成kafka

springboot集成kafka

时间:2023-01-21 07:55:05浏览次数:42  
标签:集成 提交 value kafka 监听器 org poll springboot

步骤:

1、引入依赖

<dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 </dependency>

2、编写配置文件

spring:
  kafka:
    # kafka集群地址
    bootstrap-servers: localhost:9092 #,172.16.253.38:9093,172.16.253.38:9094
    producer: # ⽣产者
      retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
    # 指定消息key和消息体的编解码⽅式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500
    listener:
      # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交
      # TIME
      # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有⼀个条件满⾜时提交
      # COUNT_TIME
      # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交
      # MANUAL
      # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
      # MANUAL_IMMEDIATE
      ack-mode: MANUAL_IMMEDIATE
View Code

3、实现生产者&消费者

生产者:

@Resource
    private KafkaTemplate<String, String> KafkaTemplate;    

/**
     * springboot集成kafka
     * @return
     */
    @RequestMapping("/sendMsg")
    public String sendMsg() {
        KafkaTemplate.send(kafkaProperties.getTopic(), "haha");
        return "消息发送成功";

    }

消费者:

@Component
@Slf4j
public class MyConsumer {

    @KafkaListener(topics = "kafka-demo-topic", groupId = "test-group-002")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack){
        String value = record.value();
        log.info("value: {}", value);
        // 手动提交offset,否则消息会重复消费
        ack.acknowledge();
    }
}

 

标签:集成,提交,value,kafka,监听器,org,poll,springboot
From: https://www.cnblogs.com/caesar-the-great/p/17063494.html

相关文章

  • 230120_50_SpringBoot入门
    springboot自动配置原理总结(参考狂神说)以HttpEncodingAutoConfiguration(Http编码自动配置)为例解释自动配置原理;//表示这是一个配置类,和以前编写的配置文件一样,也可以给......
  • spring事件机制,异步发送消息到kafka
    步骤:1、创建eventpublicclassKafkaSendMsgEventextendsApplicationEvent{privateListdtoList;publicKafkaSendMsgEvent(Objectsource,ListdtoL......
  • SpringBoot
    SpringBoot原理初探狂神说:狂神说SpringBoot02:运行原理初探(qq.com)yaml配置注入代替@value赋值法语法对比properties与yamlSpringBoot使用一个全局的配置文件,配置......
  • SpringBoot
    SpringBoot原理初探狂神说:狂神说SpringBoot02:运行原理初探(qq.com) yaml配置注入代替@value赋值法 语法对比properties与yamlSpringBoot使用一个全局的配置文件......
  • springboot允许跨域访问
    前后端开发学习中,vue里面需要跨域访问后台数据可在springboot后台里面添加个配置类即可:packagecom.springboottest.config;importorg.springframework.beans.factor......
  • Kafka的架构
    Kafka的整体架构非常简单,是显式分布式架构,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个......
  • springboot统一处理异常
    增加业务异常处理类:packagecom.example.demo.config;importlombok.Data;@DatapublicclassBizExceptionextendsRuntimeException{protectedIntegererr......
  • 如何安装配置kafka
    最近项目需要用到kafa进行数据流处理,下面将安装部署kafka的方法简单介绍下。1:配置java环境修改/etc/bashrc文件,添加JAVA_HOMEcat/etc/bashrcexportJAVA_HOME=/root/jdk-......
  • 伯俊ERP与金蝶云星空对接集成连通应收单新增
    伯俊ERP与金蝶云星空对接集成表头表体组合查询连通应收单新增(应收单-标准应收单(KD应收单销售退)数据源系统:伯俊ERP未来,伯俊科技也会砥砺前行,不断为品牌提供更全面的零售终......
  • 42-Springboot整合HignLevelClient----构建复杂检索
    @Test voidsearchTest()throwsIOException{ SearchRequestsearchRequest=newSearchRequest(); //1、指定索引 searchRequest.indices("bank"); //2.1、......