首页 > 其他分享 >SpringBoot-kafka集成

SpringBoot-kafka集成

时间:2024-05-27 16:57:19浏览次数:31  
标签:集成 SpringBoot private kafka topic props CONFIG public

代码运行版本

springboot.version=2.7.7
spring-kafka.version=2.8.11

1 POM

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<!-- springboot依赖管理中有kafka的预管理版本,这里不用写版本就可以-->
	<version>2.8.11</version>
</dependency>

2 JavaConfig

KafkaConfiguration.java 生产者消费者配置类
TopicProperties 主题属性类

2.1 KafkaConfiguration.java

@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.retries}")
    private Integer retries;

    @Value("${spring.kafka.producer.batch-size}")
    private Integer batchSize;

    @Value("${spring.kafka.producer.buffer-memory}")
    private Integer bufferMemory;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${spring.kafka.consumer.batch.concurrency}")
    private Integer batchConcurrency;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;


    /**
     * 生产者配置信息
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        // 配置Kafka生产者参数
        Map<String, Object> props = new HashMap<>();
        // 设置消息确认模式为无需等待确认
        props.put(ProducerConfig.ACKS_CONFIG, "0");
        // 指定Kafka集群的初始连接地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 设置消息发送失败时的重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        // 设置批次发送的消息大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        // 设置消息在发送缓冲区中等待的最长时间
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 设置发送缓冲区的大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        // 设置键的序列化方式为字符串序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 设置值的序列化方式为字符串序列化
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     * 生产者工厂
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * 生产者模板
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }


    /**
     * 消费者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // 消费者组ID
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // 自动重置偏移量的策略
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        // Kafka服务器初始连接地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 单次最大拉取记录数
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        // 是否自动提交偏移量
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        // 消费者会话超时时间
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        // 请求超时时间
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 键的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 值的反序列化器
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    /**
     * 消费者批量工厂
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //设置并发量,小于或等于Topic的分区数
        factory.setConcurrency(batchConcurrency);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }
}

3.2 TopicProperties.java

@ConfigurationProperties(prefix = "spring.kafka.topic")
@Component
@Data
public class TopicProperties {

    /**
     * testTopic
     */
    private String testTopic;
}

3.Producer & Consumer

3.1 发送消息

生产者发送消息,或者直接使用kafkaUtils中的方法

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

...

private boolean sendKafkaMsg(Source source) {
        kafkaTemplate.send(topicProperties.getSourceTopic(), JSONObject.toJSONString(source)).addCallback(new CommonListenableFutureCallback());
        return true;
    }

3.2 消费消息

@Slf4j
@Component
public class SourceTopicListener {

    @Autowired
    private SourceService sourceService;

    @Autowired
    private RobotEventBuilder robotEventBuilder;

	// 这里的batchFactory是前面配置的bean
    @KafkaListener(topics = {"${spring.kafka.topic.source-topic}"}, containerFactory = "batchFactory")
    public void consumer(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
        List<Source> sources = new ArrayList<>();
        for (ConsumerRecord<String, String> record : consumerRecords) {
            Source source = JSONObject.parseObject(record.value(), Source.class);
            sources.add(source);
        }
        
		// 消息确认
        ack.acknowledge();
    }
}

3.3 消息发送回调

@Slf4j
public class CommonListenableFutureCallback implements ListenableFutureCallback<SendResult<String, String>> {

    @Override
    public void onFailure(Throwable ex) {
        log.error("Kafka 消息发送失败", ex);
    }

    @Override
    public void onSuccess(SendResult<String, String> result) {
        log.info("kafka 消息发送成功");
    }
}

4 application.yml

# [kafka]
spring:
  kafka:
    # 指定kafka server的地址,集群配多个,中间,逗号隔开
    bootstrap-servers: 100.25.199.94:9092
    producer:
      # 重试次数
      retries: 3
      # 批量发送的消息数量
      batch-size: 1000
      # 32MB的批处理缓冲区
      buffer-memory: 33554432

    # 默认消费者组
    consumer: 
      group-id: analyze-service-dev
      # 最早未被消费的offset
      auto-offset-reset: earliest
      # 批量一次最大拉取数据量
      max-poll-records: 10
      # 是否自动提交
      enable-auto-commit: false
      # 自动提交时间间隔,单位ms
      auto-commit-interval: 1000
      # 批消费并发量,小于或等于Topic的分区数
      batch.concurrency: 3
    topic: 
      # source
      source-topic: evt_source_topic_dev
      # event
      event-topic: evt_event_topic_dev

5 Utils

@Component
public class KafkaUtils {

    @Value("${spring.kafka.bootstrap-servers}")
    private String springKafkaBootstrapServers;

    private AdminClient adminClient;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 初始化AdminClient
     * '@PostConstruct该注解被用来修饰一个非静态的void()方法。
     * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
     * PostConstruct在构造函数之后执行,init()方法之前执行。
     */
    @PostConstruct
    private void initAdminClient() {
        Map<String, Object> props = new HashMap<>(1);
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, springKafkaBootstrapServers);
        adminClient = KafkaAdminClient.create(props);
    }

    /**
     * 新增topic,支持批量
     */
    public void createTopic(Collection<NewTopic> newTopics) {
        adminClient.createTopics(newTopics);
    }

    /**
     * 删除topic,支持批量
     */
    public void deleteTopic(Collection<String> topics) {
        adminClient.deleteTopics(topics);
    }

    /**
     * 获取指定topic的信息
     */
    public String getTopicInfo(Collection<String> topics) {
        AtomicReference<String> info = new AtomicReference<>("");
        try {
            adminClient.describeTopics(topics).all().get().forEach((topic, description) -> {
                for (TopicPartitionInfo partition : description.partitions()) {
                    info.set(info + partition.toString() + "\n");
                }
            });
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return info.get();
    }

    /**
     * 获取全部topic
     */
    public List<String> getAllTopic() {
        try {
            return adminClient.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return Lists.newArrayList();
    }

    /**
     * 往topic中发送消息
     */
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }

    public void sendMessage(String topic, String key, String message) {
        kafkaTemplate.send(topic, key, message);
    }
}

标签:集成,SpringBoot,private,kafka,topic,props,CONFIG,public
From: https://www.cnblogs.com/a999/p/18215639

相关文章

  • SpringBoot项目增加日志
    1、需求:在项目下每天创建一个日志文件1、在项目resources目录下新建logback-spring.xml文件2、编辑logback-spring.xml<?xmlversion="1.0"encoding="UTF-8"?><configuration><!--控制台输出--><appendername="CONSOLE"class="ch.......
  • springboot2.7.18配置跨域
    一.springboot整合security后一般要配置security的跨域和mvc的跨域才能实现跨域1.spring-security配置跨域@BeanpublicSecurityFilterChainfilterChain(HttpSecurityhttp)throwsException{http.csrf().disable().cors();returnhttp.build();......
  • springboot入门
    springboot创建的方式方式一:访问网站构建项目,下载到本地导入开发工具方式二:使用IDEASpringInitializr创建初始化(推荐使用)方式二的步骤: 发现不能选择java8,修改ServerURL为https://start.aliyun.com/ 这样就可以选择java8了,点击下一步 点击创建 就创建好了新的......
  • Springboot Redis 性能优化(基于 Lettuce)
    1.SpringbootRedis性能优化(基于Lettuce)1.1.为什么是Lettuce1.2.参数优化1.2.0.1.SpringbootRedis所有参数项1.2.1.最终参数配置1.SpringbootRedis性能优化(基于Lettuce)1.1.为什么是LettuceSpringboot2.x.x开始默认使用lettuce作为redis客户......
  • 基于SpringBoot+Vue的实验室管理系统设计与实现毕设(文档+源码)
            目录一、项目介绍二、开发环境三、功能介绍四、核心代码五、效果图六、源码获取:        大家好呀,我是一个混迹在java圈的码农。今天要和大家分享的是一款基于SpringBoot+Vue的实验室管理系统,项目源码请点击文章末尾联系我哦~目前有各类成品......
  • 基于SpringBoot+Vue的火车订票管理系统设计与实现毕设(文档+源码)
            目录一、项目介绍二、开发环境三、功能介绍四、核心代码五、效果图六、源码获取:        大家好呀,我是一个混迹在java圈的码农。今天要和大家分享的是一款基于SpringBoot+Vue的火车订票管理系统,项目源码请点击文章末尾联系我哦~目前有各类成品......
  • FFmpeg开发笔记(二十四)Linux环境给FFmpeg集成AV1的编解码器
    ​AV1是一种新兴的免费视频编码标准,它由开放媒体联盟(AllianceforOpenMedia,简称AOM)于2018年制定,融合了GoogleVP10、MozillaDaala以及CiscoThor三款开源项目的成果。据说在实际测试中,AV1标准比H.265(HEVC)的压缩率提升了大约27%。由于AV1具有性能优势,并且还是免费授权,因此各大流......
  • 计算机毕业设计springboot+vue学生档案学籍信息管理系统java
    本文所设计的学籍系统的设计与实现拥有前端和后端,前端使用Vue.js框架和创建,后端使用Springboot框架创建,开发语言采用Java,使用Mysql数据库对后台数据进行存储。将IDEA作为主要的开发工具。接着进行系统的需求分析、功能设计、数据库设计,最后进行编码实现。技术栈ide工具:IDEA......
  • 1915springboot VUE 宠物寄养平台系统开发mysql数据库web结构java编程计算机网页源码m
    一、源码特点 springbootVUE宠物寄养平台系统是一套完善的完整信息管理类型系统,结合springboot框架和VUE完成本系统,对理解JSPjava编程开发语言有帮助系统采用springboot框架(MVC模式开发),系统具有完整的源代码和数据库,系统主要采用B/S模式开发。springbootVUE宠物寄养......
  • springboot宠物领养管理系统论文
    目录摘要IAbstractII第1章绪论31.1项目研究的背景31.2开发目的和意义31.3国内外研究现状4第2章系统开发工具52.1Java编程语言52.2B/S模式52.3MySQL数据库62.4后端框架介绍72.4.1SpringBoot介绍72.4.2Mybatis介绍72.4.3SpringMvc介......