Spring Cloud系列目前已经有了Spring Cloud五大核心组件:分别是,Eureka注册中心,Zuul网关,Hystrix熔断降级,openFeign声明式远程调用,ribbon负载均衡。这五个模块,对了,有没有发现,其实我这五个模块中ribbon好像还没有案例例举,目前只有一个Ribbon模块的搭建,后边我会完善的。
今天我们不主要围绕Spring Cloud的五大组件,本篇会以新的模块进行,完成一个以Rabbit MQ消息队列为核心的模块功能设计。在模块进行之前,我们先了解Spring Cloud 的Stream,这个很重要。
Spring Cloud Steam 是一个可以用来作为微服务应用构建消息驱动能力的框架,他可以基于Spring Boot来进行创建一个独立的,并且可以用来生产的Spring 应用程序。他通过使用Spring Integration(一体化)来链接消息嗲了中间件用以实现消息事件驱动的微服务应用。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动装配的实现,并且熟悉Spring的都知道他有个发布-订阅模式,消费组,以及消息分区核心部分。
Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。由于Spring Cloud Stream基于Spring Boot实现,所以它秉承了Spring Boot的优点。
- Binder:Binder是Spring Cloud Stream的核心组件之一,它提供了与消息中间件的连接和交互。通过Binder,开发者可以将应用程序与特定的消息中间件进行集成,而无需关注底层的细节。
- 消息通道:Spring Cloud Stream使用消息通道作为应用程序中消息的传输媒介。消息通道可以是发布-订阅模式或点对点模式,开发者可以根据需求选择合适的通道类型。
- 绑定注解:通过使用绑定注解,开发者可以将消息通道与应用程序中的方法进行绑定。例如,@Input注解用于将方法绑定到输入通道,@Output注解用于将方法绑定到输出通道。
- 消息转换:Spring Cloud Stream支持自动的消息转换,使得开发者可以使用不同的数据格式和协议进行消息的传输。它提供了一些内置的消息转换器,同时也支持自定义的消息转换器。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
在应用程序中配置Rabbit MQ的链接信息,在application.yml或者application.properties中添加配置。
spring.cloud.stream.bindings.input.destination=myInputQueue
spring.cloud.stream.bindings.output.destination=myOutputQueue
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
这里的myInputQueue和myOutputQueue是你自定义的队列名称,你可以根据自己的需求进行命名。
接下来,创建一个消息处理器来处理输入和输出的消息。你可以使用@StreamListener
注解来监听输入消息,并使用@EnableBinding
注解来绑定输入和输出通道。
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
public class MessageProcessor {
@StreamListener(Sink.INPUT)
public void processMessage(String message) {
// 处理输入消息
System.out.println("Received message: " + message);
// 处理完后可以发送输出消息
// output.send(MessageBuilder.withPayload("Output message").build());
}
}
@EnableBinding(Sink.class)
将输入通道绑定到Sink,@StreamListener(Sink.INPUT)
注解用于监听输入消息。你可以在processMessage
方法中处理输入消息。
如果你想发送输出消息,你可以注入MessageChannel
并使用它发送消息。例如,你可以在MessageProcessor
类中注入MessageChannel
:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(Source.class)
public class MessageProcessor {
private final MessageChannel output;
@Autowired
public MessageProcessor(Source source) {
this.output = source.output();
}
@StreamListener(Sink.INPUT)
public void processMessage(String message) {
// 处理输入消息
System.out.println("Received message: " + message);
// 处理完后发送输出消息
output.send(MessageBuilder.withPayload("Output message").build());
}
}
这样,当有消息发送到输入通道时,processMessage方法将被调用,并处理输入消息。在处理完输入消息后,它将发送一个输出消息到输出通道。