首页 > 其他分享 >使用SpringBoot对接Kafka

使用SpringBoot对接Kafka

时间:2024-06-17 09:57:10浏览次数:13  
标签:SpringBoot Spring 对接 KafkaTemplate springframework Kafka import org

Kafka是什么,以及如何使用SpringBoot对接Kafka

一、Kafka与流处理

我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为了流处理和实时数据管道的首选解决方案

介绍其实是比较清晰的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流进行实时处理、计算和分析的过程。

假设你正在玩一款在线游戏,其他玩家的动作和游戏事件会实时地传到服务器上。这些事件就形成了一条数据流。在流处理中,我们会对这条数据流进行实时处理,例如计算每个玩家的分数、监控游戏区域内的异常情况、统计玩家在线时长等等。这样,游戏管理员就可以实时地监控和管理游戏,而不需要等到游戏结束才进行操作。
类似的,流处理还可以应用在其他实时性要求比较高的场景中,例如金融交易、物联网、实时监测等。通过对数据流进行实时处理,我们可以更加精准地掌握数据变化的情况,并及时做出反应和调整,

二、Spring Boot与Kafka的整合Demo

1. 新建springboot工程

如果你没有现成的Spring boot项目,那么我们可以使用IDEA自带的Spring Initializr 来创建一个spring-boot的项目

此时我们可以直接选择使用Apache Kafka,另外项目还可以加个Spring Web准备让前台调用

2. 添加Kafka依赖

如果你不是像上述一样新建的项目,那你也可以选择在已有的Spring Boot应用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依赖:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.8.11</version>
</dependency>

3. 配置Kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

这里我们指定了Kafka服务器的地址和端口,并配置了消费者组的ID,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。

4. 创建Kafka生产者

在Kafka中,生产者是发送消息的应用程序或服务。在Spring Boot中,我们可以使用KafkaTemplate类来创建Kafka生产者

package com.zhanfu.kafkademo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("test_topic", message);
    }
}

这里我们使用@Autowired注解来自动注入KafkaTemplate,并使用send方法将消息发送到名为“test_topic”的Kafka主题中。

5. 创建Kafka消费者

在Kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在Spring Boot中,我们可以使用@KafkaListener注解来创建Kafka消费者。

package com.zhanfu.kafkademo.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaLis {

    @KafkaListener(topics = "test_topic", groupId = "test_group")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

6. 应用程序入口

现在我们已经完成了Spring Boot和Kafka的整合。我们可以启动Spring Boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与Kafka集成。

package com.zhanfu.kafkademo.controller;

import com.zhanfu.kafkademo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private KafkaService kafkaService;

    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        kafkaService.sendMessage(message);
        return "Message sent successfully";
    }
}

在这个例子中,我们使用@Autowired注解来自动注入KafkaProducer,并通过发送消息的方法来调用sendMessage方法。最终项目整体框架如图:

三、启动与验证

首先自然是启动 Kafka ,然后是启动我们的Spring Boot项目

然后在浏览器中输入

http://127.0.0.1:8080/send/hello

最后检查我们的项目日志:

可以看到,整个发送和接收的流程都走通了

四、KafkaTemplate 介绍

不难看出,在Springboot中,使用kafka的关键在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生产者模版,用于向 Kafka 集群发送消息。并且把 Kafka 的生产者客户端封装成了一个 Spring Bean,提供更加方便易用的 API。

它有三个主要属性:

        producerFactory:生产者工厂类,用于创建 KafkaProducer 实例。
        defaultTopic:默认主题名称,如果在发送消息时没有指定主题名称,则使用该默认主题。
        messageConverter:消息转换器,用于将消息对象转换为 Kafka ProducerRecord

它的主要方法:

        send(ProducerRecord<K,V> record):向指定的 Kafka 主题发送一条消息。ProducerRecord 包含了主题名称、分区编号、Key 和 Value 等信息。
        send(String topic, V data):向指定的 Kafka 主题发送一条消息。
        send(String topic, K key, V data):向指定的 Kafka 主题发送一条消息,并指定消息的 Key。
        execute(ProducerCallback<K,V> callback):使用回调方式发送消息,可以自定义消息的创建过程和错误处理过程。
        inTransaction():启用事务,多个 send 方法调用将被包装在一个事务中,保证 Kafka 事务的原子性。

除了上述方法外,KafkaTemplate 还提供了其他方法,如 sendDefault()sendOffsetsToTransaction() 等,可以根据实际需要进行选择和使用。

需要注意的是,在使用 KafkaTemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。

当然,很多同学可能还注意到一个细节,我们在上面的Demo中,我们直接将其 @Autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 KafkaTemplate 为什么自己就会被spring 容器管理的呢?其实这得益于SpringBoot中对Kafka有了很多自动配置的内容。如下:

如上图,相信对Spring Boot熟悉的同学看到 ConditionalOnClass ConditionalOnMissingBean 应该就明白了。其实Spring Boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 KafkaTemplate 类,那么它就能被自动配置并存入Spring 容器内

总结
        今天我们通过一个Demo讲解了在SpringBoot中如何对接Kafka,也介绍了下关键类 KafkaTemplate ,得益于Spring Boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的API,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 Kafka 的原理与结构

标签:SpringBoot,Spring,对接,KafkaTemplate,springframework,Kafka,import,org
From: https://blog.csdn.net/2401_82767224/article/details/139716451

相关文章

  • 基于springboot的球队训练信息管理系统源码数据库
    传统办法管理信息首先需要花费的时间比较多,其次数据出错率比较高,而且对错误的数据进行更改也比较困难,最后,检索数据费事费力。因此,在计算机上安装球队训练信息管理系统软件来发挥其高效地信息处理的作用,可以规范信息管理流程,让管理工作可以系统化和程序化,同时,球队训练信息管理系......
  • 基于springboot的青年公寓服务平台源码数据库
    传统信息的管理大部分依赖于管理人员的手工登记与管理,然而,随着近些年信息技术的迅猛发展,让许多比较老套的信息管理模式进行了更新迭代,房屋信息因为其管理内容繁杂,管理数量繁多导致手工进行处理不能满足广大用户的需求,因此就应运而生出相应的青年公寓服务平台。本青年公寓服务......
  • 基于springboot的无人智慧超市管理系统【附源码】
    效果如下:首页注册商品信息个人中心管理员主页面仓储信息管理订单管理界面研究背景近年来,由于计算机技术和互联网技术的快速发展,使得所有企事业单位内部都是数字化、信息化、无纸化的发展趋势,随着趋势的发展,各种决策系统、辅助系统也应运而生,其中无人智慧超......
  • 基于springboot的学生宿舍管理系统(带 1w+字文档)
    基于springboot的学生宿舍管理系统(带1w+字文档)基于springboot+vue前后端分离的学生宿舍管理系统:前端vue2、elementui,后端maven、springmvc、spring、mybatis;项目简介本项目可供学习参考,商业慎用。项目带完整安装部署教程+1w+文档。技术栈IntelliJIDEA/Ecl......
  • 【JavaWeb】SpringBootWeb请求响应
    前言在上一次,我们开发了springbootweb的入门程序。基于SpringBoot的方式开发一个web应用,浏览器发起请求/hello后,给浏览器返回字符串“HelloWorld~”。其实呢,是我们在浏览器发起请求,请求了我们的后端web服务器(也就是内置的Tomcat)。而我们在开发web程序时呢,定义了一个控......
  • springboot与flowable(3):启动、审批、各个Service服务
    一、启动流程        流程定义与实例的关系类似于Java的类与对象,通过定义的id创建流程实例,编写测试代码:packageorg.example.flowabledemo2;importorg.flowable.engine.RuntimeService;importorg.flowable.engine.runtime.ProcessInstance;importorg.ju......
  • Docker部署SpringBoot项目
    准备服务器安装Docker下载dockerWindows版本并登录根据项目需要在项目根目录下创建Dockerfile文件#使用官方的OpenJDK8作为基础镜像FROMopenjdk:8-jdk-alpine#维护者信息LABELmaintainer="name"#添加一个应用程序的工作目录WORKDIR/app#将JAR文件添加到......
  • 【JavaWeb】SpringBoot基础
    SpringBootWeb入门HTTP协议Web服务器-Tomcat前言Spring的官网(https://spring.io)。Spring的官方提供很多开源的项目,我们可以点击上面的projects,看到spring家族旗下的项目。Spring发展到今天已经形成了一种开发生态圈,Spring提供了若干个子项目,每个项目用于完成特定......
  • springboot + uniapp 头像上传功能及样式模版
    springboot+uniapp头像上传和预览功能及样式模版该头像上传使用uni.chooseImage方法从本地相册选择图片或使用相机拍照。再通过uni.uploadFile将本地资源上传到服务器。具体使用方法请参考uniapp官网API。以下是前端效果图:上传之后效果点击头像预览效果......
  • SpringBoot源码学习之AOP切面编程原理
    1.前言&目录AOP切面编程主要用于抽取与具体业务逻辑无关的逻辑并组织起来以另一种方式重新与业务逻辑“耦合”在一起。比如,在WEB项目中,往往需要对接口做鉴权、性能统计、日志记录、事务处理等,这些逻辑跟业务逻辑无关、是独立的,但它也是多数业务逻辑都需要的。将这些横跨多种......