首页 > 其他分享 >SpringBoot30 - 整合Kafka

SpringBoot30 - 整合Kafka

时间:2023-02-24 20:23:55浏览次数:70  
标签:SpringBoot30 -- zookeeper kafka topic 整合 服务器 Kafka

SpringBoot整合Kafka

安装

​ windows版安装包下载地址:https://kafka.apache.org/downloads

​ 下载完毕后得到tgz压缩文件,使用解压缩软件解压缩即可使用,解压后得到如下文件

​ 建议使用windows版2.8.1版本。

启动服务器

​ kafka服务器的功能相当于RocketMQ中的broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录中自带一个类似于命名服务器的工具,叫做zookeeper,它的作用是注册中心,相关知识请到对应课程中学习。

zookeeper-server-start.bat ..\..\config\zookeeper.properties		# 启动zookeeper
kafka-server-start.bat ..\..\config\server.properties				# 启动kafka

​ 运行bin目录下的windows目录下的zookeeper-server-start命令即可启动注册中心,默认对外服务端口2181。

​ 运行bin目录下的windows目录下的kafka-server-start命令即可启动kafka服务器,默认对外服务端口9092。

创建主题

​ 和之前操作其他MQ产品相似,kakfa也是基于主题操作,操作之前需要先初始化topic。

# 创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic itheima
# 查询topic
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list					
# 删除topic
kafka-topics.bat --delete --zookeeper localhost:2181 --topic itheima

测试服务器启动状态

​ Kafka提供有一套测试服务器功能的测试程序,运行bin目录下的windows目录下的命令即可使用。

kafka-console-producer.bat --broker-list localhost:9092 --topic itheima							# 测试生产消息
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic itheima --from-beginning	# 测试消息消费
整合

步骤①:导入springboot整合Kafka的starter,此坐标由springboot维护版本

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

步骤②:配置Kafka的服务器地址

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order

​ 设置默认的生产者消费者所属组id。

步骤③:使用KafkaTemplate操作Kafka

@Service
public class MessageServiceKafkaImpl implements MessageService {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列(kafka),id:"+id);
        kafkaTemplate.send("itheima2022",id);
    }
}

​ 使用send方法发送消息,需要传入topic名称。

步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息

@Component
public class MessageListener {
    @KafkaListener(topics = "itheima2022")
    public void onMessage(ConsumerRecord<String,String> record){
        System.out.println("已完成短信发送业务(kafka),id:"+record.value());
    }
}

​ 使用注解@KafkaListener定义当前方法监听Kafka中指定topic的消息,接收到的消息封装在对象ConsumerRecord中,获取数据从ConsumerRecord对象中获取即可。

总结

  1. springboot整合Kafka使用KafkaTemplate对象作为客户端操作消息队列

  2. 操作Kafka需要配置Kafka服务器地址,默认端口9092

  3. 企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@KafkaListener。接收消息保存在形参ConsumerRecord对象中

标签:SpringBoot30,--,zookeeper,kafka,topic,整合,服务器,Kafka
From: https://www.cnblogs.com/Ashen-/p/17153011.html

相关文章

  • 整合第三方技术
    整合JUnit整合过程及准备在整合JUnit时不需要勾选任何依赖1.导入测试对应的starter创建模块时,boot自动导入了starter;但在纯手写创建模块时,需要手动导入2.测试类使用@......
  • 一文读懂Kafka Connect核心概念
    概览KafkaConnect是一种用于在ApacheKafka和其他系统之间可扩展且可靠地流式传输数据的工具。它使快速定义将大量数据移入和移出Kafka的连接器变得简单。KafkaC......
  • Kafka 序列化
    序列化生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成......
  • Kafka 上手实战
    参数配置bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以......
  • 整合jUnit4和jUnit5
    整合jUnit41.引入依赖<dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>5.2.6.RELEASE</version></......
  • spring整合框架
    循环引用BeanCurrentlyInCreationException当A对象需要引用B对象,而B对象也需要A对象的时候就是双向,当spring属性填充为双向注入的时候叫做循环依赖,也叫做循环引用spring......
  • 想完全弄懂kafka?看这篇就够了
    有人说世界上有三个伟大的发明:火,轮子,以及Kafka。发展到现在,ApacheKafka无疑是很成功的,Confluent公司曾表示世界五百强中有三分之一的企业在使用Kafka。今天便和大家分......
  • vue总结与整合(走过路过,千万不要错过,进来看看,全篇皆是精华)
    vue总结与整合vue简介 vue是一款用于构建用户界面的js框架,基于标准HTML、CSS和JavaScript构建,并提供了一套声明式的、组件化的编程模型,帮助开发者高效地开发用户界面。......
  • kafka常用操作
    如果把一个项目/微服务当成一个消费组,那么一个topic可能在多个消费组【一个topic被多个项目订阅】,一个消费组可能有多个topic。【一个项目订阅了多个topic】。一个消费组内......
  • 整合mybatis-spring
    一.整合mybatis步骤:第一步:导入相关的jar包:junitmybatismysql数据库spring相关的aop植入mybatis-spring【新包,兼容mybatis和spring】<dependencies>......