首页 > 其他分享 >Spring Boot 整合 Kafka

Spring Boot 整合 Kafka

时间:2023-04-11 18:56:00浏览次数:37  
标签:-- Spring Boot kafka 消息 server public Kafka

Kafka 环境搭建

kafka 安装、配置、启动、测试说明:

1. 安装:直接官网下载安装包,解压到指定位置即可(kafka 依赖的 Zookeeper 在文件中已包含)
下载地址:https://kafka.apache.org/downloads
示例版本:kafka_2.13-2.8.0.tgz
下载后可本地解压安装,解压位置自选,如 D:\Java 下
解压命令:tar -zxvf kafka_2.13-2.8.0.tgz
PS:可在 idea 命令行窗口或 git 提供的命令窗口中进行命令操作
使用 git 提供的命令窗口:空白文件夹中右键——》Git Bash Here 即可打开

2. 添加地址配置
在 D:\Java\kafka_2.13-2.8.0\config\server.properties 中搜索添加以下两行配置:
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
说明:以上配置默认是注释掉的,可搜索找到,根据需求进行自定义地址配置

重要说明:以下命令操作默认都是在 D:\Java\kafka_2.13-2.8.0\ 即 kafaka 根目录下进行!

3. 使用配置文件方式后台启动/关闭 Zookeeper 服务
启动:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
关闭【自选】:bin/zookeeper-server-stop.sh -daemon config/zookeeper.properties

4. 使用配置文件方式后台启动/关闭 kafka 服务
启动:bin/kafka-server-start.sh -daemon config/server.properties
关闭【自选】:bin/kafka-server-stop.sh -daemon config/server.properties 

5. 服务测试

5.1 创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka

5.2 查看主题(可能需要查一会儿)
bin/kafka-topics.sh --list --zookeeper localhost:2181

说明:发送消息和监听消息需要打开两个窗口进行测试!

5.3 发送消息(kafka 根目录下新建窗口)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
输入以上命令回车后,可继续输入内容测试消息发送

5.4 监听消息(kafka 根目录下新建窗口)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Hello-Kafka --from-beginning
输入以上命令后,可观察消息接收情况,并且可在消息发送窗口继续发送消息测试此监听窗口的接收情况,正常接收,则服务环境搭建成功。

Spring Boot 整合 Kafka

环境:自行创建 Spring Boot 项目,添加测试依赖,并启动 Zookeeper 和 kafka 服务。

注意:Zookeeper 默认好像占用 8080 端口,自己注意端口占用问题。

1. 添加依赖

<!-- spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 添加配置

# kafka 配置
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 1
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      # listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

3. 创建消息生产者

@Component
public class KafkaProducer {

    private Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    public static final String TOPIC_TEST = "Hello-Kafka";

    public static final String TOPIC_GROUP = "test-consumer-group";

    public void send(Object obj) {
        String obj2String = JSON.toJSONString(obj);
        logger.info("准备发送消息为:{}", obj2String);

        // 发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败的处理
                logger.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                logger.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }

}

4. 创建消息消费者

@Component
public class KafkaConsumer {

    private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP)
    public void topicTest(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional<?> message = Optional.ofNullable(record.value());
        if (message.isPresent()) { // 包含非空值,则执行
            Object msg = message.get();
            logger.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge(); // 确认成功消费一个消息
        }
    }

}

5. 消息发送测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {

    private Logger logger = LoggerFactory.getLogger(KafkaProducerTest.class);

    @Resource
    private KafkaProducer kafkaProducer; // 注意使用自己创建的,看清楚!

    /*
      测试之前需要开启 Kafka 服务
      启动 Zookeeper:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
      启动 Kafka:bin/kafka-server-start.sh -daemon config/server.properties

      测试结果数据:

      准备发送消息为:"你好,我是Lottery 001"
      Hello-Kafka - 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=Hello-Kafka, partition=null,
      headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=你好,我是Lottery 001, timestamp=null),
      recordMetadata=Hello-Kafka-0@47]

      topic_test 消费了: Topic:Hello-Kafka,Message:你好,我是Lottery 001
     */
    @Test
    public void test_send() throws InterruptedException {
        // 循环发送消息
        while (true) {
            kafkaProducer.send("你好,我是Lottery 001");
            Thread.sleep(3500);
        }
    }

}

标签:--,Spring,Boot,kafka,消息,server,public,Kafka
From: https://www.cnblogs.com/luisblog/p/17307292.html

相关文章

  • Springboot集成dubbo完整过程(三)
    准备工作1,准备mysql服务环境2,准备redis服务环境3,准备zookeeper服务环境4,准备逆向生成bean的xml配置文件5,准备slf4j日志xml配置文件6,准备一个sql脚本1,搭建创建服务工程1,创建一个空的父工程,用来统一管理依赖2,创建一个interface接口工程,主要存放业务bean,接口类3,创建一......
  • #yyds干货盘点 springboot和vue搭建前后端项目实现员工的增删改查
    前言我是歌谣今天继续带来前后端项目的开发上次已经开发了部门管理,今天继续开发员工管理后端第一步empcontroller代码packagecom.itheima.controller;importcom.itheima.pojo.Emp;importcom.itheima.pojo.PageBean;importcom.itheima.pojo.Result;importcom.itheima.s......
  • SpringBoot整合ElasticSearch8.x 踩坑记录
    背景jdk版本openjdk-17springboot版本2.6.11pom.xml<!--ElasticSearch提供的依赖--><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.6.2</version>......
  • C# Kafka重置到最新的偏移量,即从指定的Partition订阅消息使用Assign方法
    在使用Kafka的过程中,消费者断掉之后,再次开始消费时,消费者会从断掉时的位置重新开始消费。场景再现:比如昨天消费者晚上断掉了,今天上午我们会发现kafka消费的数据不是最新的,而是昨天晚上的数据,由于数据量比较多,也不会及时的消费到今天上午的数据,这个时候就需要我们对偏移量进行重置......
  • Spring-Security
    SecurityConfig@Configuration@EnableWebSecuritypublicclassSecurityConfigimplementsWebMvcConfigurer{@BeanpublicSecurityFilterChainfilterChain(HttpSecurityhttpSecurity)throwsException{//httpSecurity.authorizeHttpRe......
  • spring事务传播行为
      ......
  • spring声明式事务(XML格式)
             ......
  • 【Spring MVC】简单数据绑定
    实体类:  spring-mvc.xml<?xmlversion="1.0"encoding="UTF-8"?><beansxmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:......
  • 自定义SpringBoot Starter
    1.Starter加载原理Springboot通过SpringBootApplication注解启动项目,springboot启动的时候,会将项目中所有声明为bean对象的实例加载到IOC容器。除此之外也会将starter里的bean信息加载到ioc容器,从而做到0配置,开箱即用。1.1加载starter:Springboot项目启动时,Springboot通过@Spri......
  • 【Spring boot】 @Value注解
    一、不通过配置文件的注入属性1.1注入普通字符串直接附在属性名上,在Bean初始化时,会赋初始值@Value("normal")privateStringnormal;1.2注入java系统变量@Value("#{systemProperties['os.name']}")privateStringsystemPropertiesName;//注入操作系统属性1.3注......