首页 > 其他分享 >springcloud stream kafka实践

springcloud stream kafka实践

时间:2023-07-19 18:23:12浏览次数:33  
标签:stream springcloud springframework kafka org import message annotation cloud

Spring Cloud Stream是Spring Cloud提供的一个用于构建消息驱动的微服务的框架。它简化了消息系统(如Kafka,rabbitMQ)的使用和集成,使开发者可以更专注于业务逻辑的实现。

项目结构如下

 

一、移入依赖

创建一个springboot web项目引入依赖

 1 <properties>
 2     <java.version>1.8</java.version>
 3     <spring-cloud.version>Finchley.SR2</spring-cloud.version>
 4   </properties>
 5 
 6   <dependencyManagement>
 7     <dependencies>
 8       <dependency>
 9         <groupId>org.springframework.cloud</groupId>
10         <artifactId>spring-cloud-dependencies</artifactId>
11         <version>${spring-cloud.version}</version>
12         <type>pom</type>
13         <scope>import</scope>
14       </dependency>
15     </dependencies>
16   </dependencyManagement>
17 
18   <dependencies>
19     <dependency>
20       <groupId>org.springframework.boot</groupId>
21       <artifactId>spring-boot-starter-web</artifactId>
22     </dependency>
23 
24     <dependency>
25       <groupId>org.springframework.cloud</groupId>
26       <artifactId>spring-cloud-stream-binder-kafka</artifactId>
27     </dependency>
28   </dependencies>

二、配置消息中间件

这里先以kafka为例, 事先要准备kafka

 1 # 生成者配置
 2 spring:
 3   kafka:
 4     bootstrap-servers: 192.168.3.100:9092
 5   cloud:
 6     stream:
 7       bindings:
 8         output:
 9           destination: ${kafka.topic}
10         input:
11           destination: ${kafka.topic}
12 kafka:
13   topic: cloud-stream

 

三、消息生产者producer

 1 import org.springframework.beans.factory.annotation.Autowired;
 2 import org.springframework.beans.factory.annotation.Qualifier;
 3 import org.springframework.cloud.stream.annotation.EnableBinding;
 4 import org.springframework.cloud.stream.messaging.Source;
 5 import org.springframework.messaging.MessageChannel;
 6 import org.springframework.messaging.support.MessageBuilder;
 7 import org.springframework.stereotype.Component;
 8 
 9 /**
10  * @Classname MessageProducer
11  * @Created by Michael
12  * @Date 2023/7/19
13  * @Description 消息生产者
14  */
15 @Component
16 @EnableBinding(Source.class)
17 public class MessageProducer {
18   @Autowired
19   @Qualifier(Source.OUTPUT)
20   private MessageChannel messageChannel;
21 
22   @Autowired
23   private Source source;
24 
25   /**
26    * 发送消息
27    * @param message
28    */
29   public void send(String message){
30     //通过消息管道发送消息
31 //    messageChannel.send(MessageBuilder.withPayload(message).build());
32     source.output().send(MessageBuilder.withPayload(message).build());
33   }
34 }

四、消息消费者consumer

 1 import org.springframework.beans.factory.annotation.Autowired;
 2 import org.springframework.beans.factory.annotation.Qualifier;
 3 import org.springframework.cloud.stream.annotation.EnableBinding;
 4 import org.springframework.cloud.stream.annotation.StreamListener;
 5 import org.springframework.cloud.stream.messaging.Sink;
 6 import org.springframework.integration.annotation.ServiceActivator;
 7 import org.springframework.messaging.SubscribableChannel;
 8 import org.springframework.stereotype.Component;
 9 
10 import javax.annotation.PostConstruct;
11 
12 /**
13  * @Classname MessageConsumer
14  * @Created by Michael
15  * @Date 2023/7/19
16  * @Description 消息消费者
17  */
18 @Component
19 @EnableBinding({Sink.class})
20 public class MessageConsumer {
21   @Autowired
22   @Qualifier(Sink.INPUT)
23   private SubscribableChannel subscribableChannel;
24 
25   //有3中订阅方式
26   //3.1 当 subscribableChannel注入后完成回调,可以拿到MessageHandler对象
27 //  @PostConstruct
28 //  public void init() {
29 //    subscribableChannel.subscribe(message -> {
30 //      System.out.println(message.getPayload());
31 //    });
32 //  }
33 
34   //3.2 使用ServiceActivator
35   @ServiceActivator(inputChannel = Sink.INPUT)
36   public void messageActivator(String message) {
37     System.out.println("@ServiceActivator -> " + message);
38   }
39 
40   //3.3 使用@StreamListener
41   @StreamListener(Sink.INPUT)
42   public void onMessage(String message) {
43     System.out.println("@StreamListener -> " + message);
44   }
45 
46 }

五、测试结果

创建一个controller,接收请求

 1 import com.mike.study.springcloudstreamkafka.producer.MessageProducer;
 2 import org.springframework.beans.factory.annotation.Autowired;
 3 import org.springframework.web.bind.annotation.GetMapping;
 4 import org.springframework.web.bind.annotation.PostMapping;
 5 import org.springframework.web.bind.annotation.RequestParam;
 6 import org.springframework.web.bind.annotation.RestController;
 7 
 8 /**
 9  * @Classname StreamController
10  * @Created by Michael
11  * @Date 2023/7/19
12  * @Description controller
13  */
14 @RestController
15 public class StreamController {
16   @Autowired
17   MessageProducer messageProducer;
18 
19   @GetMapping("send/msg")
20   public boolean sendMsg(@RequestParam("message") String message){
21     messageProducer.send(message);
22     return true;
23   }
24 
25   @PostMapping("send/msg")
26   public boolean sendMsg1(@RequestParam("message") String message){
27     messageProducer.send(message);
28     return true;
29   }
30 }

发起GET/POST请求,请求3次

 

 

 

控制台查看结果

 结果可以看到,3种订阅方式,轮询优先使用@StreamListener, 然后是@ServiceActivator, 最后是@PostConstruct.。

总结,这里使用了GET/POST这2种请求都可以,同时procedure和consumer中没有kafka的有关代码,后期如果切换消息中间件,只需修改配置文件即可。

 

标签:stream,springcloud,springframework,kafka,org,import,message,annotation,cloud
From: https://www.cnblogs.com/lfhappy/p/17566437.html

相关文章

  • AAudioStream
    AAudioStream的实现流程1.概述在开始之前,我们需要了解AAudioStream是什么以及它的作用。AAudioStream是AndroidNDK中的一个类,用于实现低延迟音频数据的读取和写入。它提供了一种高效的方式来进行音频处理和音频录制。在本文中,我们将教会你如何使用AAudioStream这个类。2.实现......
  • keycloak~EventListenerProvider初始化kafka引出的类加载问题
    EventListenerProvider初始keycloak提供的事件处理机制,可以通过实现EventListenerProvider接口来实现自定义的事件处理逻辑。在keycloak启动时,会通过ServiceLoader机制加载所有的EventListenerProvider实现类,并将其注册到keycloak的事件处理机制中。构造方法,在每个keycloak后台......
  • HTTP/2 stream 1 was not closed cleanly before end of the underlying stream解决
    通过gitclone文件时报错HTTP/2stream1wasnotclosedcleanlybeforeendoftheunderlyingstream解决:gitconfig--globalhttp.versioinHTTP/1.1重新gitclone就可以了。 了解HTTP/2与HTTP/1.1区别:https://www.cnblogs.com/flydean/p/15187719.html有问题......
  • JAVA-- 在Java8 Parallel Stream中如何自定义线程池?
    使用ParallelStream时,在适当的环境中,通过适当地使用并行度级别,可以在某些情况下获得性能提升。如果程序创建一个自定义ThreadPool,必须记住调用它的shutdown()方法来避免内存泄漏。ParallelStream默认使用的线程池如下代码示例,ParallelStream并行处理使用的线程池是ForkJoi......
  • java parallelStream 线程堵塞问题笔记
    定义:Stream(流)是JDK8中引入的一种类似与迭代器(Iterator)的单向迭代访问数据的工具。ParallelStream则是并行的流,它通过Fork/Join框架(JSR166y)来拆分任务,加速流的处理过程。最开始接触parallelStream很容易把其当做一个普通的线程池使用,因此也出现了上面提到的开始的时候打标,结束......
  • springcloud - kafka实践
    springcloud可以通过KafkaTemplate来发布消息,让后消费者使用来订阅@KafkaListener主题消息。一、添加依赖1<dependencyManagement>2<dependencies>3<dependency>4<groupId>org.springframework.cloud</groupId>5<artifactId&g......
  • docker kafka-manger
    实现"DockerKafkaManager"的过程及代码解释:整个过程可以分为以下几个步骤:步骤描述步骤一安装Docker步骤二下载KafkaManager镜像步骤三创建并启动KafkaManager容器步骤四配置Kafka集群连接下面是每个步骤具体需要做的事情以及相应的代码:步骤一......
  • springboot - kafka实践
    Kafka是一个开源的分布式流处理平台,由Apache软件基金会开发和维护。它是一种高性能、可持久化、可扩展的消息队列系统,常用于解决大规模数据传输和处理的问题。以下是Kafka的一些核心概念和主要特点:消息和主题:Kafka基于发布订阅模式,消息被发布到一个或多个主题(Topic)中。每条消......
  • Java使用Stream函数对集合进行分组
    1List<Map<String,String>>list=newArrayList<>();2Map<String,String>map1=newHashMap<>();3map1.put("name","卢俊义");4map1.put("book","水浒传"......
  • 深入解析 C++ 中的 ostringstream、istringstream 和 stringstream 用法
    引言:在C++中,ostringstream、istringstream和stringstream是三个非常有用的字符串流类,它们允许我们以流的方式处理字符串数据。本文将深入探讨这三个类的用法和特性,帮助读者更好地理解和应用字符串流操作。1.ostringstream(输出字符串流)ostringstream是C++中用于输出字......