首页 > 其他分享 >SpringBoot集成Kafka构建消息系统

SpringBoot集成Kafka构建消息系统

时间:2024-01-14 22:31:41浏览次数:35  
标签:集成 SpringBoot spring springframework kafka topic import org Kafka

一、前言

在我们当前的互联网应用中,消息驱动已经成为一种不可或缺的模式,Kafka作为一款高性能的分布式消息系统,已经成为很多公司在消息驱动架构选择中很重要的工具。我们使用SpringBoot和Kafka快速构建消息驱动应用,应对高并发的消息处理业务。Kafka是分布式发布-订阅消息系统。主要特点是基于pull模式来处理消息消费,追求高性能、高吞吐量,完全分布式的系统。


二、zookeeper安装

Kafka需要在zookeeper环境下运行,所以我们先进行zookeeper的安装。

下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

我们选择一个版本进行下载,需要下载里面的两个文件。

SpringBoot集成Kafka构建消息系统_Zookeeper

SpringBoot集成Kafka构建消息系统_kafka_02

下载完之后进行解压。然后我们需要把bin中的lib文件夹复制到另一个项目中去。

SpringBoot集成Kafka构建消息系统_SpringBoot_03

1.修改配置

将zoo_sample.cfg修改为zoo.cfg

SpringBoot集成Kafka构建消息系统_SpringBoot_04

然后修改这个zoo.cfg文件,如修改日志的路径等。

SpringBoot集成Kafka构建消息系统_SpringBoot_05

2.启动zookeeper

我们进入cmd到bin目录,启动zkServer.cmd。

SpringBoot集成Kafka构建消息系统_Zookeeper_06

启动成功后会出现以下的日志。

SpringBoot集成Kafka构建消息系统_SpringBoot_07

最后我们进入到bin目录启动客户端,执行以下命令:

zkCli.cmd 127.0.0.1:2181

SpringBoot集成Kafka构建消息系统_SpringBoot_08

三、kafka的安装

下载地址:https://kafka.apache.org/downloads.html

SpringBoot集成Kafka构建消息系统_SpringBoot_09

下载之后解压。

SpringBoot集成Kafka构建消息系统_Zookeeper_10

进入到config目录修改server.properties文件。

SpringBoot集成Kafka构建消息系统_kafka_11

主要也是修改日志保存的位置等。

SpringBoot集成Kafka构建消息系统_Zookeeper_12

1.启动kafka

我们cmd进入到bin的windows目录下,执行以下命令。

kafka-server-start.bat ../../config/server.properties

SpringBoot集成Kafka构建消息系统_kafka_13

SpringBoot集成Kafka构建消息系统_kafka_14

四、SpringBoot集成

1.添加依赖

<!--kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2.application.properties配置

#kafka配置
spring.kafka.bootstrap-servers=127.0.0.1:9092
#发生错误 消息重发的次数
spring.kafka.producer.retries=0
spring.kafka.producer.buffer-memory=33554432
#键的序列化方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#值的序列化方式
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 只要集群的首领节点收到消息 生产者就会收到一个来着服务器的响应
spring.kafka.producer.acks=1
spring.kafka.consumer.auto-commit-interval=1s
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#监听器容器中运行的线程数
spring.kafka.listener.concurrency=5
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false

3.topic初始化

package com.example.nettydemo.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author qx
 * @date 2023/12/22
 * @des Kafka配置
 */
@Configuration
public class KafkaConfig {


    @Bean
    public NewTopic topic() {
        // 创建一个名为topic.test的Topic并设置分区数为8 分区副本数为2
        return new NewTopic("topic.test", 8, (short) 2);
    }
}

4.定义一个消息发送端

package com.example.nettydemo.kafka;

import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * @author qx
 * @date 2023/12/22
 * @des kafka消息发送端
 */
@Component
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void send(Object obj) {
        String json = JSONObject.toJSONString(obj);
        kafkaTemplate.send("topic.test", json).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("topic:{}生产者发送消息失败:{}", "topic.test", ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("topic:{}生产者发送消息成功:{}", "topic.test", result.getProducerRecord().value());
            }
        });
    }

}

5.定义一个消息接收者

package com.example.nettydemo.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2023/12/22
 * @des kafka消息接收者
 */
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "topic.test", groupId = "topic.group1")
    public void topicTest1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        System.out.println("record->value:" + record.value());
        Object value = record.value();
        if (value != null) {
            log.info("客户端消费了:topic:[{}],message:[{}]", topic, value);
            ack.acknowledge();
        }
    }
}

6.测试

@SpringBootTest
class NettyDemoApplicationTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    void testSend(){
        kafkaProducer.send("hello world");
    }
 }

执行测试方法后,控制台日志显示生产者消息发送成功的日志。

SpringBoot集成Kafka构建消息系统_kafka_15

我们也在控制台日志中获取到了客户端消费的日志。

SpringBoot集成Kafka构建消息系统_Zookeeper_16

这样我们就使用SpringBoot集成Kafka实现了分布式发布-订阅消息。

标签:集成,SpringBoot,spring,springframework,kafka,topic,import,org,Kafka
From: https://blog.51cto.com/u_13312531/9242741

相关文章