首页 > 其他分享 >springboot整合kafka

springboot整合kafka

时间:2023-07-13 14:58:20浏览次数:48  
标签:springboot springframework kafka public 整合 import org properties

一、引入依赖 (kafka的版本和springboot的版本对不上的话,启动会报错,包类不存在)

   <dependency>
       <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.5.1.RELEASE</version>
 </dependency>

我的springboot版本:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

二、配置 yml

spring:

  kafka:
    bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094
    #生产者的配置,大部分我们可以使用默认的,这里列出几个比较重要的属性
    producer:
      #0---表示不进行消息接收是否成功的确认
      #1---表示当Leader接收成功时确认
      #all -1---表示Leader和Follower都接收成功时确认
      acks: all
      #设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
      retries: 2
      #每批次发送消息的数量
      batch-size: 16384
      #producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
      buffer-memory: 33554432
      #key序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger.ms: 1
        enable:
          idempotence: true
    #消费者的配置
    consumer:
      #是否开启自动提交
      enable-auto-commit: false
      #自动提交的时间间隔
      auto-commit-interval: 100ms
      #key的解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 15000
      max-poll-records: 15
    listener:
      ack-mode: manual_immediate
      type: batch

上面我配置的是手动Ack,并且批量消息,一次可以拉取15条记录

yaml中的各个配置参数的意思,参考官方文档 https://kafka.apachecn.org/documentation.html#configuration

 

三、springboot自动装配机制

 

package org.springframework.boot.autoconfigure.kafka;

import java.io.IOException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;

@Configuration(
    proxyBeanMethods = false
)
@ConditionalOnClass({KafkaTemplate.class})
@EnableConfigurationProperties({KafkaProperties.class})
@Import({KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class})
public class KafkaAutoConfiguration {
    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean({KafkaTemplate.class})
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

    @Bean
    @ConditionalOnMissingBean({ProducerListener.class})
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener();
    }

    @Bean
    @ConditionalOnMissingBean({ConsumerFactory.class})
    public ConsumerFactory<?, ?> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());
        customizers.orderedStream().forEach((customizer) -> {
            customizer.customize(factory);
        });
        return factory;
    }

    @Bean
    @ConditionalOnMissingBean({ProducerFactory.class})
    public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory(this.properties.buildProducerProperties());
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }

        customizers.orderedStream().forEach((customizer) -> {
            customizer.customize(factory);
        });
        return factory;
    }

    @Bean
    @ConditionalOnProperty(
        name = {"spring.kafka.producer.transaction-id-prefix"}
    )
    @ConditionalOnMissingBean
    public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
        return new KafkaTransactionManager(producerFactory);
    }

    @Bean
    @ConditionalOnProperty(
        name = {"spring.kafka.jaas.enabled"}
    )
    @ConditionalOnMissingBean
    public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
        KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
        Jaas jaasProperties = this.properties.getJaas();
        if (jaasProperties.getControlFlag() != null) {
            jaas.setControlFlag(jaasProperties.getControlFlag());
        }

        if (jaasProperties.getLoginModule() != null) {
            jaas.setLoginModule(jaasProperties.getLoginModule());
        }

        jaas.setOptions(jaasProperties.getOptions());
        return jaas;
    }

    @Bean
    @ConditionalOnMissingBean
    public KafkaAdmin kafkaAdmin() {
        KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
        kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
        return kafkaAdmin;
    }
}

四、生产者:

@RestController
public class MyController {

   @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    @RequestMapping("/send/msg")
    public String sendMsg(){
        for (int i = 0; i < 100; i++) {
            //不指定分区,会随机发到不同的分区
            kafkaTemplate.send("shop-topic",i+"aaaa");

            //指定了key,会用key的hashcode %分区总数,根据结果发送到指定分区,因此同一个key永远发到同一个分区
            kafkaTemplate.send("shop-topic",i+"aaa",i+"aaaa");
        }
        return "ok";
    }
}

五、消费者批量消费(yaml那里要开启批量配置)

/**
 * author: yangxiaohui
 * date:   2023/7/13
 */
@Service
public class OrderService {

    /**
     * kafka是按照消费者分组来消费的,同一条消息,在同一个分组中,只会被消费一次,例如 order服务部署了5个节点,他们的消费者组是一样,那么一条
     * 消息只会被这5个节点中的一个节点消费,groupId会自动创建
     * ack提交的是offset
     * @param consumerRecordList
     * @param acknowledgment
     */
    @KafkaListener(topics = "shop-topic",groupId = "my-group",clientIdPrefix = "orderService")
    public void listenMsg(List<ConsumerRecord> consumerRecordList, Acknowledgment acknowledgment){
        acknowledgment.acknowledge();
        for (ConsumerRecord consumerRecord : consumerRecordList) {
            System.out.println(consumerRecord.value());
        }

    }

}

六、消费者单条消费:一定要配置单条消费逻辑,不然会报错

spring:

  kafka:
    bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094
    #生产者的配置,大部分我们可以使用默认的,这里列出几个比较重要的属性
    producer:
      #0---表示不进行消息接收是否成功的确认
      #1---表示当Leader接收成功时确认
      #all -1---表示Leader和Follower都接收成功时确认
      acks: all
      #设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
      retries: 2
      #每批次发送消息的数量
      batch-size: 16384
      #producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
      buffer-memory: 33554432
      #key序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger.ms: 1
        enable:
          idempotence: true
    #消费者的配置
    consumer:
      #是否开启自动提交
      enable-auto-commit: false
      #自动提交的时间间隔
      auto-commit-interval: 100ms
      #key的解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 15000
      max-poll-records: 15
    listener:
      ack-mode: manual_immediate
      type: single   

注意上面最后的配置那里 type=single

@Service
public class OrderService {

    /**
     * kafka是按照消费者分组来消费的,同一条消息,在同一个分组中,只会被消费一次,例如 order服务部署了5个节点,他们的消费者组是一样,那么一条
     * 消息只会被这5个节点中的一个节点消费,groupId会自动创建
     *
     * @param
     * @param
     */
    @KafkaListener(topics = "shop-topic",groupId = "my-group",clientIdPrefix = "orderService")
    public void listenMsg(ConsumerRecord consumerRecord,Acknowledgment  acknowledgment){
        acknowledgment.acknowledge();
        System.out.println(consumerRecord.value());
    }

}

 七、需要注意的地方:

 

 

标签:springboot,springframework,kafka,public,整合,import,org,properties
From: https://www.cnblogs.com/yangxiaohui227/p/17550433.html

相关文章

  • springboot - 整合flyway
    一、概念官网:https://flywaydb.org/数据库版本控制管理工具,通过集成Flyway可以实现启动项目时自动执行项目迭代升级。Flyway已经支持数据库包括:Oracle,SQLServer,SQLAzure,DB2,DB2z/OS,MySQL(includingAmazonRDS),MariaDB,GoogleCloudSQL,PostgreSQL(includ......
  • 我开源了团队内部基于SpringBoot Web快速开发的API脚手架stater
    我们现在使用SpringBoot做Web开发已经比之前SprngMvc那一套强大很多了。但是用SpringBootWeb做API开发还是不够简洁有一些。每次WebAPI常用功能都需要重新写一遍。或者复制之前项目代码。于是我封装了这么一个抽出SpringBootWebAPI每个项目必备需要重复写的模块,和......
  • springboot下遇到的跨域问题
    后端跨域处理springboot项目注解@CrossOrigin实现WebMvcConfigurer接口中addCorsMappings(CorsRegistryregistry)方法配置CorsFilterfilter过滤器使用注意都可以单独使用,全部配置可能会导致不生效的问题addCorsMappings方法@Overridepublic......
  • springboot+vue前后端分离项目发布上线
    首先呢不用多说就是买阿里云服务器,但是呢,学生免费一个月。前端呢就是配置与后端端口 然后呢就是要打包:npmrunbuild把你的dist文件东西进行上传到服务器。后端springboot呢就是要打jar包上传。然后服务器是先去下载配置jdk,然后就是点击网站选择springboot项目最后一步呢......
  • jwt整合
    1.token问题目前方案是将token作为key,将登录的信息作为值存入到redis里面。2.使用jwt加入jwt之后就可以把redis去掉2.1.添加依赖2.2.创建工具类2.2.1.有效期30分钟2.2.2.令牌密钥暂时设置为1234562.2.3.创建jwt方法2.2.4.解析jwt方法......
  • SpringBoot中使用Netty开发WebSocket服务-netty-websocket-spring-boot-starter开源项
    场景SpringBoot+Vue整合WebSocket实现前后端消息推送:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/114392573SpringCloud(若依微服务版为例)集成WebSocket实现前后端的消息推送:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/114480731若依前后......
  • springboot 自定义整合caffeine 本地缓存
    1、自定义缓存配置类@Data@ConfigurationProperties(prefix="page.cache")publicclassPageCacheProperties{privateCaffeineConfigPropertiescaffeine=newCaffeineConfigProperties();//本地缓存配置privatePageCacheAsyncExecutorConfigpool=newP......
  • SpringBoot与Freemarker整合
    1.需要导入freemarker的pom文件;<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-freemarker</artifactId></dependency>2.需要在application.properties配置文件中配置一些freemarker的参数;serve.port......
  • 二维码简易实现 Vue+Springboot
    Vue:<template><div><img:src="database64"width="150px"/><div>注:请使用手机微信扫码,并于2分钟内绑定员工账号(二维码为账号独属,请勿分享)。</div></div></template><script>import{getQrCode}from"......
  • 4-基于SpringBoot实现SSMP整合
    1.整合JunitSpring整合JUnit的制作方式//加载spring整合junit专用的类运行器@RunWith(SpringJUnit4ClassRunner.class)//指定对应的配置信息@ContextConfiguration(classes=SpringConfig.class)publicclassAccountServiceTestCase{//注入你要测试的对象......