首页 > 其他分享 >Kafka 集成SpringBoot

Kafka 集成SpringBoot

时间:2024-02-24 17:22:56浏览次数:32  
标签:集成 SpringBoot -- spring springframework kafka 192.168 org Kafka

1.环境准备

1.Kafka集群环境准备

1.准备一个Kafka集群环境并启动

Kafka 3.6.1 集群安装与部署

2.创建first Topic

/usr/kafka/kafka_2.13-3.6.1/bin/kafka-topics.sh --bootstrap-server 192.168.58.130:9092 --create --partitions 1 --replication-factor 3 --topic first

2.SpringBoot环境准备

通过Spring Initializr新建SpringBoot项目
image

image

初始化后的POM文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.2.3</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>cn.coreqi</groupId>
	<artifactId>springboot_kafka</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>springboot_kafka</name>
	<description>springboot_kafka</description>
	<properties>
		<java.version>21</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

2.SpringBoot 生产者

1.修改 SpringBoot 配置文件 application.propeties, 添加生产者相关信息

# 应用名称
spring.application.name=springboot_kafka

# 指定 kafka 的地址
spring.kafka.bootstrap-servers=192.168.58.130:9092,192.168.58.131:9092,192.168.58.132:9092

#指定 key 和 value 的序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

2.创建 controller 从浏览器接收数据, 并写入指定的 topic

package cn.coreqi.springboot.controller;

import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {
    // Kafka 模板用来向 kafka 发送数据
    @Resource
    KafkaTemplate<String, String> kafka;

    @RequestMapping("/coreqi")
    public String data(String msg) {
        kafka.send("first", msg);
        return "ok";
    }
}

3.启动 Kafka 消费者

/usr/kafka/kafka_2.13-3.6.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.58.130:9092 --topic first

4.在浏览器中访问/coreqi 接口并向其发送数据,观察 kafka 消费者控制台情况

http://localhost:8080/coreqi?msg=hello

3.SpringBoot 消费者

1.修改 SpringBoot 配置文件 application.propeties, 添加消费者相关信息

# 应用名称
spring.application.name=springboot_kafka

# 指定 kafka 的地址
spring.kafka.bootstrap-servers=192.168.58.130:9092,192.168.58.131:9092,192.168.58.132:9092

# 指定 key 和 value 的反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#指定消费者组的 group_id
spring.kafka.consumer.group-id=coreqi

2.创建类消费 Kafka 中指定 topic 的数据

package cn.coreqi.springboot.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

@Configuration
public class KafkaConsumer {
    // 指定要监听的 topic
    @KafkaListener(topics = "first")
    public void consumeTopic(String msg) { // 参数: 收到的 value
        System.out.println("收到的信息: " + msg);
    }
}

3.启动 kafka 生产者

/usr/kafka/kafka_2.13-3.6.1/bin/kafka-console-producer.sh --bootstrap-server 192.168.58.130:9092 --topic first

4.启动应用 观察 IDEA 控制台数据打印

标签:集成,SpringBoot,--,spring,springframework,kafka,192.168,org,Kafka
From: https://www.cnblogs.com/fanqisoft/p/18031324

相关文章

  • SpringBoot + Redis 的配置及使用
    一、SpringBoot配置Redis1.1pom引入spring-boot-starter-data-redis包<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></......
  • Kubernetes配合Jenkins实现轻量自动持续集成
    Kubernetes配合Jenkins实现轻量自动持续集成原创 王先森sec 王先森Sec 2024-02-1913:16 北京 听全文这是一个分享运维,DevOps,安全等知识的微信公众号。王先森Sec王先森Sec分享运维,DevOps,安全等知识。23篇原创内容公众号背景介绍在当今的软件开......
  • Kafka消息生产消费的过程
    生产消息流程:创建Topic首先,需要创建一个或多个Topic,它们是消息的存储单元。Topic定义了消息的类别。配置生产者在生产者端,需要配置生产者客户端,指定要连接的Kafka集群的地址和相关配置,比如序列化方式、消息发送确认策略等。生产消息:生产者将消息发送到指定的Topic。生......
  • 利用DevOps和ITSM的集成能促进IT技术
    在当今这个快节奏和技术驱动的世界里,DevOps和IT服务管理(ITSM)理念在培养心理健康和减少技术压力的工作环境中正在发挥重要作用。混合最佳实践,实现更好、更快、更安全的服务。 什么是DevOps?DevOps专注于采用协作、自动化和持续改进的方式,使团队能够执行工作流程。通过跨职......
  • Spring集成Nacos配置中心
    spring版本4.2.8  nacos:1.1.0 jdk1.8引入依赖<dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-spring-context</artifactId><version>1.1.1</version>......
  • Kafka 集成Flume
    1.环境准备1.准备一个Kafka集群环境并启动Kafka3.6.1集群安装与部署2.启动Kafka消费者bin/kafka-console-consumer.sh--bootstrap-server192.168.58.130:9092--topicfirst3.在任意Kafka集群节点上安装Flume......
  • 持续集成工具Jenkins
    1从装修厨房看项目开发效率优化1.1持续部署装修厨房全部装好之后发现灯不亮,电路有问题;冷热水装反了,管路有问题。这些问题要解决就必须把地砖、墙砖拆掉——一个环节有问题,其他环节跟着返工。那怎么做会好一些呢?任何安装完成及时测试,确保其可以正常工作。项目......
  • 玩转SpringBoot:动态排除Starter配置,轻松部署
    引言在软件开发中,进行本地单元测试是一项常规且必要的任务。然而,在进行单元测试时,有时需要启动一些中间件服务,如Kafka、Elasticjob等。举例来说,我曾经遇到过一个问题:项目中使用了Redisson锁,但由于Redisson版本较低,在Mac环境下偶尔会报错#RedisConnectionException:Unabletoin......
  • 得物面试:Kafka消息0丢失,如何实现?
    文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+涨薪必备免费赠送:《尼恩技术圣经+高并发系列PDF》,帮你实现技术自由,完成职业升级,薪......
  • flink实时读取kafka数据到mysql flink 读取kafka 依赖 Flink 1.8.0
    flink实时读取kafka数据到mysqlflink读取kafkaFlink提供了Kafka连接器,用于从或向Kafka读写数据。本文总结Flink与Kafka集成中的问题,并对一些疑点进行总结和梳理。问题一:读Kafka的方式登录后复制##读取一个TopicFlinkKafkaConsumer010#FlinkKafkaConsumer010(Stringtopi......