Bus 简介
Spring Cloud Bus 是 Spring Cloud 体系内的消息总线,支持 RabbitMQ 和 Kafka 两种消息中间件。所谓消息总线,简单理解就是一个消息中心,众多微服务实例都可以连接到总线上,实例可以往消息中心发送或接收信息,例如:实例 A 发送一条消息到总线上,总线上的实例 B 可以接收到信息(实例 B 订阅了实例 A),消息总线充当一个中间者的角色,使得实例 A 和实例 B 解耦
Spring Cloud Bus 实战
Spring Cloud Bus 可以将 Spring 事件机制和 Stream 结合在一起,具体机制如下:
- 在需要发布或者监听事件的应用中增加
@RemoteApplicationEventScan
注解,通过该注解
可以启动 Stream 中消息通道的绑定 - 对于事件发布,需要承
ApplicationEvent
的扩展类RemoteApplicationEvent
,通过ApplicationContext.publishEvent()
发布事件时,Spring Cloud Bus 会对所要发布的事件进行包装,形成消息,通过默认的 Spring Cloud Bus 消息通道发送到消息中间件 - 对于事件监听者,则不需要进行任何变更,仍旧按照 Spring 的方式实现消息的监听i
安装并启动 ZooKeeper 和 Kafka,创建事件发布者项目,引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
定义用户事件类 UserEvent,实现 RemoteApplicationEvent
@Data
@Slf4j
@EqualsAndHashCode(callSuper = true)
public class UserEvent extends RemoteApplicationEvent {
public UserEvent(Object source, String originService, String destination) {
super(source, originService, destination);
}
}
- originService:对于事件发布者来说 originService 就是自己
- destinationService:将事件发布到哪些微服务实例,配置的格式为
{serviceld):{appContextId)
,在配置时 serviceld 和 appContextld 可以使用通配符,比如userservice:**
会将事件发布给 userservice 微服务
发布消息代码如下
@Slf4j
@RestController
public class TestCon {
@Autowired
private ApplicationContextHolder holder;
@GetMapping("/test/userEvent")
public void userAdd() {
User user = new User();
user.setId("2");
user.setName("tom");
user.setAge(50);
ApplicationContext ctx = ApplicationContextHolder.getApplicationContext();
UserEvent event = new UserEvent(user, ctx.getId(), "*:**");
ctx.publishEvent(event);
}
}
在配置文件中添加如下配置:
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
brokers: localhost:9092
在启动类添加 @RemoteApplicationEventScan
注解
@SpringBootApplication
@RemoteApplicationEventScan
public class Server01Application {
public static void main(String[] args) {
SpringApplication.run(Server01Application.class, args);
}
}
创建事件接收者项目,引入和事件发布者同样的依赖,将 UserEvent 类复制到该模块下,实现事件监听类UserEventListener
@Slf4j
@Component
public class UserEventListener implements ApplicationListener<UserEvent> {
@Override
public void onApplicationEvent(UserEvent event) {
log.info("收到用户事件: {}", event);
}
}
加上事件发布者同样的配置和启动类注解
启动两个项目,请求事件发布者的 /test/userEvent
接口,即可发布和接收事件