首页 > 其他分享 >使用微服务Spring Cloud集成Kafka实现异步通信(消费者)

使用微服务Spring Cloud集成Kafka实现异步通信(消费者)

时间:2024-10-01 13:51:43浏览次数:12  
标签:Spring boot springframework kafka cloud org Kafka Cloud

1、本文架构

本文目标是使用微服务Spring Cloud集成Kafka实现异步通信。其中Kafka Server部署在Ubuntu虚拟机上,微服务部署在Windows 11系统上,Kafka Producer微服务和Kafka Consumer微服务分别注册到Eureka注册中心。Kafka Producer和Kafka Consumer之间通过Kafka Server实现异步通信。

出于便于测试的目的,我通过浏览器触发Kafka Producer发送消息,观察Kafka Consumer的后台是否打印出接收到的消息内容。

Ubuntu 上部署Kafka Server,详见博文:Ubuntu下Kafka安装及使用-CSDN博客

Eureka注册中心的搭建过程和完整代码,详见博文:微服务1:搭建微服务注册中心(命令行简易版,不使用IDE)-CSDN博客

Kafka Producer微服务的完整代码,详见博文:使用微服务Spring Cloud集成Kafka实现异步通信-CSDN博客

本文的重点是实现下图中的深蓝色部分:Kafka Consumer微服务。

2、创建Spring boot项目(Kafka Consumer微服务项目):

mvn archetype:generate -DgroupId=com.test -DartifactId=microservice-kafka-consumer -DarchetypeArtifactId=maven-archetype-quickstart

项目代码的完整目录如下图所示:

编辑pom.xml,添加依赖包:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  
  <groupId>com.test</groupId>
  <artifactId>microservice-kafka-consumer</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>microservice-kafka-consumer</name>
  <url>http://maven.apache.org</url>
  
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.0.RELEASE</version>
    <relativePath/> 
  </parent>
  
  <dependencies>
  	<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>         
	 
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>


  </dependencies>
  
    <dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>               
    </dependencies>
  </dependencyManagement>
  
  <build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
  
</project>

编辑application.yml,配置kafka消费者:

consumer:
      #消费的主题
      topic: test-topic
      #消费者组id
      group-id: test-group
      #是否自动提交偏移量
      enable-auto-commit: true
      #提交偏移量的间隔-毫秒
      auto-commit-ms: 1000
      #客户端消费的会话超时时间-毫秒
      session-timeout-ms: 10000
      #实现DeSerializer接口的反序列化类键
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #实现DeSerializer接口的反序列化类值
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

server:
  port: 8030
spring:
  application:
    name: microservice-kafka-consumer
  kafka:
    bootstrap-servers: 192.168.23.131:9092
    consumer:
      group-id: test-group
      enable-auto-commit: true
      auto-commit-ms: 1000
      session-timeout-ms: 10000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8080/eureka/
  instance:
    prefer-ip-address: true            

App.java的完整代码如下:

package com.test;

import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.kafka.annotation.KafkaListener;


@SpringBootApplication
@EnableDiscoveryClient
public class App 
{
	
	  @KafkaListener(topics = "mydemo1")
    public void listen(String msg) throws Exception {
        System.out.println( "-----> Recv a msg: " + msg );
    }
    
    public static void main( String[] args ){
        System.out.println( "Hello World!" );
        SpringApplication.run(App.class, args);
    }
}

3、测试

在浏览器输入,触发Kafka Producer向Kafka Server发送消息:

http://localhost:8020/kafka/sendMsg?msg=测试消息testmsg

在Kafka Consumer的后台打印出收到的消息:

标签:Spring,boot,springframework,kafka,cloud,org,Kafka,Cloud
From: https://blog.csdn.net/catontower/article/details/142662660

相关文章

  • SpringBoot实现社区医院数据集成解决方案
    1系统概述1.1研究背景随着计算机技术的发展以及计算机网络的逐渐普及,互联网成为人们查找信息的重要场所,二十一世纪是信息的时代,所以信息的管理显得特别重要。因此,使用计算机来管理社区医院信息平台的相关信息成为必然。开发合适的社区医院信息平台,可以方便管理人员对社区......
  • 社区医疗健康管理:SpringBoot技术应用
    1系统概述1.1研究背景随着计算机技术的发展以及计算机网络的逐渐普及,互联网成为人们查找信息的重要场所,二十一世纪是信息的时代,所以信息的管理显得特别重要。因此,使用计算机来管理社区医院信息平台的相关信息成为必然。开发合适的社区医院信息平台,可以方便管理人员对社区......
  • Java项目实战II基于Java+Spring Boot+MySQL的大创管理系统(源码+数据库+文档)
    目录一、前言二、技术介绍三、系统实现四、文档参考五、核心代码六、源码获取全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者一、前言在当前创新创业氛围浓厚的背景下,大学生创新创业项目(简称“大创”)如雨后春笋般涌现,为校园内外注入了无限活力。然而,项目......
  • Java项目实战II基于Java+Spring Boot+MySQL的免税商品优选购物商城(源码+数据库+文档)
    目录一、前言二、技术介绍三、系统实现四、文档参考五、核心代码六、源码获取全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者一、前言随着全球贸易的日益繁荣和消费者需求的多样化,免税商品购物已成为众多旅行者和消费者的热门选择。为了提供一个更加便捷......
  • spring 实用小技巧
    1.@PropertySource注解加载properties文件,加载完成之后,放到了环境变量Environment当中,通过Environment对象可以直接获取代码如下所示:定义一个my.properties文件,写入两个属性:my.app.name=rjmy.app.version=1.0.0在启动类上添加注解@PropertySource,引入my.pro......
  • spring aop记录
     使用:importorg.aspectj.lang.JoinPoint;importorg.aspectj.lang.annotation.After;importorg.aspectj.lang.annotation.Before;importorg.aspectj.lang.annotation.Aspect;importorg.springframework.stereotype.Component;@Aspect@ComponentpublicclassLog......
  • 足球青训俱乐部管理:Spring Boot技术革新
    3系统分析3.1可行性分析可行性分析是该平台系统进行投入开发的基础第一步,必须对其进行可行性分析才能够降低不必要的需要从而使资源合理利用,更具有性价比和降低成本,同时也是系统平台的成功的未雨绸缪的一步。3.1.1技术可行性技术可行性一是考虑客观的技术可行性,二是......
  • Spring Boot技术在足球青训管理中的创新应用
    3系统分析3.1可行性分析可行性分析是该平台系统进行投入开发的基础第一步,必须对其进行可行性分析才能够降低不必要的需要从而使资源合理利用,更具有性价比和降低成本,同时也是系统平台的成功的未雨绸缪的一步。3.1.1技术可行性技术可行性一是考虑客观的技术可行性,二是......
  • SSM项目实战II基于Spring Boot的新闻资讯系统的设计与实现(开发文档+数据库+源码)
    目录一、前言二、技术介绍三、系统实现四、论文参考五、核心代码六、源码获取全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者一、前言前言随着互联网技术的迅猛发展,信息传播的速度日益加快,新闻资讯已成为人们日常生活中不可或缺的一部分。为了满足广大用......
  • springboot基于java的智慧点餐系统(源码+文档+调试+vue+前后端分离)
    收藏关注不迷路!!......