首页 > 其他分享 >springboot整合Kafka的快速使用教程

springboot整合Kafka的快速使用教程

时间:2024-05-27 23:30:13浏览次数:20  
标签:教程 springboot zhuoye -- kafka topic 消息 Kafka

       

目录

一、引入Kafka的依赖

二、配置Kafka

三、创建主题

1、自动创建(不推荐)

2、手动动创建

四、生产者代码

五、消费者代码  

六、常用的KafKa的命令


        Kafka是一个高性能、分布式的消息发布-订阅系统,被广泛应用于大数据处理、实时日志分析等场景。Spring Boot作为目前最流行的Java开发框架之一,其简洁的配置和丰富的工具使得与Kafka的集成变得更加容易。本文将介绍如何使用Spring Boot整合Kafka,实现高效的数据处理和消息传递。

一、引入Kafka的依赖

       <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

二、配置Kafka

spring:
  kafka:
    bootstrap-servers: 156.65.20.76:9092,156.65.20.77:9092,156.65.20.78:9092 #指定Kafka集群的地址,这里有三个地址,用逗号分隔。
    listener:
      ack-mode: manual_immediate #设置消费者的确认模式为manual_immediate,表示消费者在接收到消息后立即手动确认。
      concurrency: 3  #设置消费者的并发数为3
      missing-topics-fatal: false  #设置为false,表示如果消费者订阅的主题不存在,不会抛出异常。
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 设置消息键的序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #设置消息值的序列化器
      acks: 1  #一般就是选择1,兼顾可靠性和吞吐量 ,如果想要更高的吞吐量设置为0,如果要求更高的可靠性就设置为-1
    consumer:
      auto-offset-reset: earliest #设置为"earliest"表示将从最早的可用消息开始消费,即从分区的起始位置开始读取消息。
      enable-auto-commit: false #禁用了自动提交偏移量的功能,为了避免出现重复数据和数据丢失,一般都是手动提交
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置消息键的反序列化器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #设置消息值的反序列化器

注:kafka的acks有三个值,可以根据实际情况和需求平衡消息系统的吞吐量和数据安全性,来选择对应的值。

  • acks=0:这是最不可靠的模式。当设置为acks=0时,生产者在发送消息后不会等待任何服务器端的确认响应。这种模式下,生产者可以迅速继续发送下一批消息,效率最高,但风险也最大。如果在此模式下发生网络问题或broker故障,发送的消息可能会永久丢失,生产者无法得知消息是否成功到达Kafka broker。因此,这种配置适合于能够容忍少量数据丢失的场景,例如实时数据分析或生成非关键的实时报表。
  • acks=1:这是默认的配置模式,也是一种折衷方案。在这种模式下,生产者会等待分区的领导者节点(leader)确认消息已经成功写入磁盘,才会发送确认信息给生产者。这提高了数据的安全性,因为只要领导者节点保存了消息,即使跟随者(replicas)没有及时同步,消息也不会丢失。然而,如果领导者在同步给所有追随者之前崩溃,那么尚未同步的副本将无法获取该消息,仍然存在消息丢失的风险。
  • acks=all或-1:这是最可靠的模式。在这个模式下,生产者不仅需要领导者节点确认,还会等待所有同步副本(In-sync replicas, ISR)都确认写入消息后才会收到确认。这极大地增强了数据的持久性保证,确保了即使在多个节点故障的情况下,消息也不会丢失。此模式适用于数据可靠性要求非常高的场景,如金融交易系统或重要的日志记录

三、创建主题

    1、自动创建(不推荐)

         不存在的主题,会自动创建,分区数和副本数均为默认值。而默认值可能会不符合某些场景的要求。

在kafka的安装目录conf目录下找到该配置文件server.properties,添加如下配置:
num.partitions=3 #默认3个分区
auto.create.topics.enable=true #开启自动创建主题
default.replication.factor=3 #默认3个副本

    2、手动动创建

         在kafka的安装目录bin目录下,执行如下命令: 

//创建一个有三个分区和三个副本,名为zhuoye的主题
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye 

四、生产者代码

@Slf4j
@Component
public class ALiYunServiceImpl implents IALiYunService {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Autowired
    private ExecutorService executorService;
    String topicName = "zhuoye";
    @Override
    public void queryECSMetricInfo() {
        //发送到kafka的消息集合,因为使用了多线程,并且在多线程中往该集合进行添加操作,所以需要线程安全的
        List<Message> messages = Collections.synchronizedList(new ArrayList<>());
        boolean flag = true;
        //获取上次查询时间
        Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;
        Long endTime = System.currentTimeMillis();
        try {
            //查询出所有的运行中的实例
            List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");
            if (CollectionUtils.isEmpty(cloudInstances)) {
                return;
            }
            //定义计数器
            CountDownLatch latch = new CountDownLatch(cloudInstances.size());
            //遍历查询
            for (CloudInstanceAssetDto instance : cloudInstances) {
                executorService.submit(() -> {
                    try {
                        //获取内网流出带宽,并将结果封装到消息集合中
                        dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,
                                startTime, endTime, instance, messages);
                    } catch (Exception e) {
                        log.error("获取ECS的指标数据-多线程处理任务异常!", e);
                    } finally {
                        latch.countDown();
                    }

                });
            }
            //等待任务执行完毕
            latch.await();
            //将最终的消息集合发送到kafka
            if (CollectionUtils.isNotEmpty(messages)) {
                for (int i = 0; i < messages.size(); i++) {
                    if (StringUtils.isNotBlank(messages.get(i).getValue())
                            && "noSuchInstance".equals(messages.get(i).getValue())) {
                        continue;
                    }
                  kafkaTemplate.send(topicName,  messages.get(i));
                }
            }
        } catch (Exception e) {
            flag = false;
            log.error("获取ECS的指标数据失败", e);
        }
        //更新记录上次查询时间
        if (flag) {
            QueryTimeRecord queryTimeRecord = new QueryTimeRecord();
            queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //开始时间往前推1分钟
            queryTimeRecordMapper.updateByBelongId(queryTimeRecord);
        }
    }

这个时候,如果你想看有没有把消息发送到kafka的指定主题可以使用如下命令:

kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye 

五、消费者代码  

@Slf4j
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = "zhuoye",groupId ="zhuoye-aliyunmetric")
    public void consumeExtractorChangeMessage(ConsumerRecord<String, String> record, Acknowledgment ack){
        try {
            String value = record.value();
            //处理数据,存入openTsDb
            .................
            ................
            ack.acknowledge();//手动提交
        }catch (Exception e){
            log.error("kafa-topic【zhuoye】消费阿里云指标源消息【失败】");
            log.error(e.getMessage());
        }

    }
}

六、常用的KafKa的命令

//创建主题
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye
//查看kafka是否接收对应的消息
 kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye
// 修改kafka-topic分区数
./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 6 --topic zhuoye
// 查看topic分区数
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic zhuoye
// 查看用户组消费情况
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group zhuoye-aliyunmetric --describe

标签:教程,springboot,zhuoye,--,kafka,topic,消息,Kafka
From: https://blog.csdn.net/weixin_50348837/article/details/139250156

相关文章

  • 【2023全网最全最火】Selenium WebDriver教程(建议收藏)
    在本教程中,我将向您介绍SeleniumWebdriver,它是当今市场上使用最广泛的自动化测试框架。它是开源的,可与所有著名的编程语言(如Java、Python、C#、Ruby、Perl等)一起使用,以实现浏览器活动的自动化。通过本文,我将告诉您开始使用SeleniumWebDriver测试Web应用程序所需了解的所有信......
  • springboot项目中数据库连接加密方法
    1、maven添加相应版本的依赖,比如com.github.ulisesbocchiojasypt-spring-boot-starter2.1.22、设置项目启动参数,此参数作为加密的盐值,比如-Djasypt.encryptor.password=盐值3、下载jasypt-xxx.jar包,用此jar生成加密后的数据库连接密码从这里下载http://www.jasypt.org/do......
  • Stable Diffusion 本地部署教程(附一键整合包)
    在上一篇文章中,我们介绍了StableDiffusion模型的基本原理和本地部署的重要性。今天,我们将继续深入探讨如何在本地成功部署StableDiffusion模型,并分享一些实用的技巧和建议。一、环境准备首先,确保你的计算机满足StableDiffusion模型的基本要求。这通常包括足够的内存、......
  • AI大模型入门基础教程(非常详细),AI大模型入门到精通,收藏这一篇就够了!
    什么是AI大模型?AI大模型是指使用大规模数据和强大的计算能力训练出来的人工智能模型。这些模型通常具有高度的准确性和泛化能力,可以应用于各种领域,如自然语言处理、图像识别、语音识别等。为什么要学AI大模型?2024人工智能大模型的技术岗位与能力培养随着人工智能技术......
  • 基于SpringBoot+Vue+uniapp的IT技术交流和分享平台的详细设计和实现(源码+lw+部署文档
    文章目录前言具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus系统测试系统测试目的系统功能测试系统测试结论为什么选择我代码参考数据库参考源码获取前言......
  • AI 艺术码绘画实操教程
    一、概念说明艺术二维码是一种结合了AI绘画艺术设计元素的二维码,它在保持传统二维码功能的基础上,通过视觉设计上的创新,使得二维码本身成为一种视觉艺术作品。与传统的黑白二维码相比,艺术二维码通常具有以下特点和区别:艺术二维码的特点视觉吸引力:艺术二维码通过颜色、......
  • SpringBoot继承JWT token实现权限的验证(从头开始)
    目录概述前提:我们需要知道的文件的用处第1步:数据库的连接第2步:定义一个标准化响应对象的类第3步:编写请求数据库数据代码第4步:自定义异常处理第5步:导入依赖第6步:自定义拦截器第7步:配置拦截器第8步:生成token第9步:开始测试代码第10步:vue请求示例扩展:自定义注解AuthAc......
  • Unleashing Robotics: Mastering Quaternion Kinematics with Python - Chapter6(原创
    UnleashingRobotics:MasteringQuaternionKinematicswithPython-Chapter6(原创系列教程)(最关键一章)本系列教程禁止转载,主要是为了有不同见解的同学可以方便联系我,我的邮箱[email protected]第6章旋转的数值积分方法和角误差理论1.Runge-Kutta数值积分方法我......
  • 树莓派安装向日葵教程
    树莓派安装向日葵教程RaspberryPi版本:2024-03-15-raspios-bookworm-arm64-full.img下载麒麟arm版本客户端向日葵远程控制app官方下载-贝锐向日葵官网安装依赖包sudoapt-getupdatesudoapt-getinstalllibgtk-3-0或者sudoaptinstalllibappindicator3-1......
  • scrapy教程-本人实测
    scrapydscrapyd介绍Scrapyd是一个用于部署和运行Scrapy爬虫项目的应用程序,由Scrapy的开发者开发。以下是Scrapyd的主要用法和作用:用法:安装Scrapyd服务器:使用pip命令安装Scrapyd,然后在命令行中启动Scrapyd服务。安装Scrapyd客户端:同样使用pip命令安装Scrapyd的客户端,这样......