首页 > 其他分享 >SpringBoot日常:集成Kafka

SpringBoot日常:集成Kafka

时间:2025-01-08 17:00:17浏览次数:3  
标签:集成 SpringBoot props springframework kafka org put import Kafka

文章目录

本章内容主要介绍如何在springboot项目对kafka进行整合,最终能达到的效果就是能够在项目中通过配置相关的kafka配置,就能进行消息的生产和消费。

1、pom.xml文件

原本项目用 Spring Boot 的版本为2.6.X,所以这里用spring-cloud-starter-stream-kafka的版本用的是2.2.1.RELEASE,也可以用其他版本,但是注意兼容性,不然会编译运行报错

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>2021.0.2</version>  <!-- 确保与 Spring Boot 2.6.x 兼容 -->
            <scope>import</scope>
            <type>pom</type>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
	<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        <version>2.2.1.RELEASE</version>
    </dependency>
</dependencies>

2、application.yml

添加kafka的相关配置

spring:
  kafka:
    bootstrap-servers: 192.168.102.179:9092
    producer:
      acks: 1
      retries: 0
      batch-size: 30720000
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #消费者配置
    consumer:
      group-id: test-kafka
      #是否开启手动提交 默认自动提交
      enable-auto-commit: true
      #如果enable.auto.commit为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000  自动提交已消费offset时间间隔
      auto-commit-interval: 5000
      #earliest:分区已经有提交的offset从提交的offset开始消费,如果没有提交的offset,从头开始消费,latest:分区下已有提交的offset从提交的offset开始消费,没有提交的offset从新产生的数据开始消费
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #一次调用 poll() 操作时返回的最大记录数 默认为 500 条
      max-poll-records: 2
      #kafka session timeout
      session:
        timeout:
          ms: 300000
    listener:
      #kafka 没有创建指定的 topic 下  项目启动是否报错 true  false
      missing-topics-fatal: false
      #Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息
      type: single
      ack-mode: manual_immediate

3、生产者配置类

添加一个生产者配置类KafkaProducerConfig ,主要设置消息的序列化方式等消息处理方式

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author 码至终章
 * @Date 2025/1/8 11:33
 * @Version 1.0
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Bean("myProducerKafkaProps")
    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(4);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> newProducerFactory() {
        return new DefaultKafkaProducerFactory<>(getMyKafkaProps());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(newProducerFactory());
    }

}

4、消费者配置类

创建一个消费者配置类KafkaConsumerConfig,主要设置一些消息的接收处理配置

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author 码至终章
 * @Date 2025/1/8 12:09
 * @Version 1.0
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String offsetReset;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitIntervalMs;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean("myConsumerKafkaProps")
    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(12);
        //是否自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //kafak 服务器
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        //消费组id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //一次调用poll()操作时返回的最大记录数,默认值为500
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //自动提交时间间隔 默认 5秒
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        //props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        return props;
    }


    /**
     * 消费者工厂
     */
    @Bean("myContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));
        // 并发创建的消费者数量
        factory.setConcurrency(3);
        // 开启批处理
        factory.setBatchListener(true);
        //拉取超时时间
        factory.getContainerProperties().setPollTimeout(1500);
        //是否自动提交 ACK kafka 默认是自动提交
        if (!enableAutoCommit) {
            //共有其中方式
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
        }
        return factory;
    }
}

5、消息订阅

创建一个消费者监听消息类,里面对主题消息监听,这里的测试主题为testone

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @Author 码至终章
 * @Date 2025/1/8 14:19
 * @Version 1.0
 */
@Slf4j
@Component
public class MyKafkaConsumer {

    @KafkaListener(id = "my-kafka-consumer",
            idIsGroup = false, topics = "topicone",
            containerFactory = "myContainerFactory")
    public void listen(String message) {
        log.info("接收到主题消息,消息内容:{}", message);
    }
}

6、生产者发送消息

为了方便调用测试,这里在controller编写一个方法发送消息

@RestController
@Slf4j
public class TestController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping ("/sendMessage")
    public void sendMessage(@RequestParam String message) {
        this.kafkaTemplate.send("topicone", message);
    }
}

7、测试发送消息

这里简单用postman调用接口发送一条消息
在这里插入图片描述
从idea的程序控制台可以看到消费者监听可以正常接收到消息
在这里插入图片描述

标签:集成,SpringBoot,props,springframework,kafka,org,put,import,Kafka
From: https://blog.csdn.net/qq_32590535/article/details/145008917

相关文章

  • springboot水环境检测系统-计算机设计毕业源码43284
    目录1绪论1.1选题背景1.2选题的目的意义1.3论文结构与章节安排2系统分析2.1.1技术可行性分析2.1.2 经济可行性分析2.1.3法律可行性分析2.2系统流程分析2.2.1添加信息流程2.2.2修改信息流程2.2.3删除信息流程2.3 系统功能分析2.3.1功能......
  • springboot城乡居民医疗信息管理系统-计算机设计毕业源码70573
    目 录摘要Abstract绪论1.1 选题背景1.2研究内容1.3本文的组织结构2相关技术介绍2.1MySQL数据库2.2Java编程语言2.3SpringBoot框架介绍3 系统需求分析与设计3.1可行性分析3.1.1技术可行性分析3.1.2经济可行性分析3.1.3法律可行性分析......
  • ECCV2020 | DEM | 通过调整不同输入、多样性集成和区域拟合来提高对抗样本的可迁移性
    ImprovingtheTransferabilityofAdversarialExampleswithResized-Diverse-Inputs,Diversity-EnsembleandRegionFitting摘要-Abstract引言-Introduction相关工作-RelatedWork方法-Methodology梯度攻击方法-Gradient-BasedAttackMethods观察分析调整输入尺寸......
  • SpringBoot汽车服务系统p79hp(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表用户,汽车类型,汽车信息,汽车品牌,汽车颜色开题报告内容一、研究背景与意义随着汽车保有量的持续增长,消费者对汽车服务的需求日益多样化与个性化。然而,传统汽车......
  • SpringBoot企业员工自助管理系统01ncs(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表员工,考勤信息,工作审批,费用报销,办公资源开题报告内容一、课题背景与意义随着信息技术的飞速发展,企业对于内部管理的信息化需求日益增强。传统的人工管理方式......
  • 基于SpringBoot实现的保障性住房管理系统
     ......
  • 基于SpringBoot的进销存系统
    一、系统概述进销存系统主要为企业生产经营中的进货、出货、批发销售、付款等全过程提供跟踪、管理和辅助的解决方案。从接获订单合同开始,到物料采购、入库、领用,再到产品完工入库、交货、回收货款、支付原材料款等每一个环节,系统都能提供详尽准确的数据。二、技术架构基......
  • 最牛逼的关于SpringBootJpa批量写入的问题讲解
    SpringBootjpa默认批量写入性能很差分析很多人认为是一条sql一条sql执行的问题,故而改为了insertintotablevalues(),()一条sql执行多条数据插入,当然这样没有问题,也是常用的解决办法;但本质上是事务控制的问题,默认save,或者saveAll(本质也是save),一个save就是一条sql的i......
  • 向日葵SDK集成 C#
    用向日葵官方的demo,运行远程桌面一直不行,不知道是我哪里操作有问题还是怎么的,后来找到下面老哥的文章,用他的demo试了就可以。一、前置条件前往向日葵开发平台注册开发者身份花生壳,向日葵,蒲公英软硬件嵌入,DDNS,远程控制,智能组网接入-贝锐(oray.com)创建应用,获取APPID和APP......
  • webapi 集成 之 freesql 注入
    usingEasyCaching.SQLite;usingjxc.Repository;usingjxc.Service;namespacejxc.Api;publicclassProgram{publicstaticvoidMain(string[]args){WebApplicationBuilderbuilder=WebApplication.CreateBuilder(args);//Addser......