首页 > 其他分享 >spring boot 结合 kafaka

spring boot 结合 kafaka

时间:2023-09-01 16:55:57浏览次数:38  
标签:spring boot springframework kafka kafaka org Kafka public

Spring Boot 可以与 Apache Kafka 集成:

添加 Maven 依赖:
在您的 Spring Boot 项目的 pom.xml 文件中添加以下 Maven 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.1</version> <!-- 请根据实际情况使用最新版本 -->
</dependency>

配置 Kafka 连接:
在 application.properties 或 application.yml 文件中配置 Kafka 的连接信息,例如:

application.properties:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group-id
spring.kafka.consumer.auto-offset-reset=earliest

application.yml:

yaml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group-id
      auto-offset-reset: earliest

创建 Kafka Producer:
创建一个继承自 org.springframework.kafka.core.KafkaTemplate 的自定义类,并重写 send() 方法。在 send() 方法中,您可以将消息发送到 Kafka。例如:

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

创建 Kafka Consumer:
创建一个实现 org.springframework.kafka.core.ConsumerRecordHandler 接口的自定义类,并重写 handle() 方法。在 handle() 方法中,您可以处理接收到的 Kafka 消息。例如:

import org.springframework.kafka.core.ConsumerRecordHandler;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer implements ConsumerRecordHandler<String, String> {
    @Override
    public void handle(ConsumerRecord<String, String> record) {
        System.out.println("Received message: " + record.value());
    }
}

配置 Kafka Listener:
在需要接收消息的地方,例如服务层或控制器层,注入您的 Kafka Consumer,并配置消息监听器。例如:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class MyService {
    private final KafkaConsumer kafkaConsumer;

    @Autowired
    public MyService(KafkaConsumer kafkaConsumer) {
        this.kafkaConsumer = kafkaConsumer;
    }

    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void listenMessage() {
        kafkaConsumer.handle(new ConsumerRecord<>("my-topic", 0, 0, "", "Hello from Kafka!"));
    }
}

标签:spring,boot,springframework,kafka,kafaka,org,Kafka,public
From: https://www.cnblogs.com/ArthurHenry/p/17672356.html

相关文章

  • SpringSecurity简明教程
    SpringSecurity主要实现UserDetailsService来验证登录的用户信息,和Security的配置类来对登录方式和资源进行限制。案例包含利用数据库进行登录验证、URL访问限制、自定义登录页和利用ajax方式登录、实现自定义过滤器对验证码进行验证,完整代码在https://github.com/say-hey/sprin......
  • Spring源码分析(十三)ApplicationContext详解(下)
    前面两篇文章,已经对ApplicationContext的大部分内容做了介绍,包括国际化,Spring中的运行环境,Spring中的资源,Spring中的事件监听机制,还剩唯一一个BeanFactory相关的内容没有介绍,这篇文章就来介绍BeanFactory,这篇文章介绍,关于ApplicationContext相关的内容总算可以告一段落了。本文对应......
  • SpringCloud 支持 超大上G,多附件上传
    ​ 这里只写后端的代码,基本的思想就是,前端将文件分片,然后每次访问上传接口的时候,向后端传入参数:当前为第几块文件,和分片总数下面直接贴代码吧,一些难懂的我大部分都加上注释了:上传文件实体类:看得出来,实体类中已经有很多我们需要的功能了,还有实用的属性。如MD5秒传的信息。pub......
  • spring容器加载
    1:准备加载Bean工厂---首先肯定告诉我们的程序,我需要加载容器了,从哪里开始加载,可能是从classpath(XML)或者Annotation(注解),接着spring会执行refresh()方法这个方法首先会判断当前是否有容器,如果有的话就关闭,没有就创建2:获得Bean工厂-----spring会解析我们的配置文件,把配置信息,解析成Be......
  • springcloud 跨域问题解决
    问题原因跨域本质是浏览器基于同源策略的一种安全手段同源策略(Sameoriginpolicy),是一种约定,它是浏览器最核心也最基本的安全功能所谓同源(即指在同一个域)具有以下三个相同点协议相同(protocol)主机相同(host)端口相同(port)反之非同源请求,也就是协议、端口、主机其中一项不相同的......
  • Spring 相关 Maven 依赖包
    <?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache......
  • SpringAMQP--消息转换器
        ......
  • SpringAMQP--TopicExchange
             ......
  • SpringAMQP-WorkQueue模型
          ......
  • SpringBoot使用protobuf格式的接口方式
    建立SpringBoot项目,pom.xml内容如下:<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="......