一、事件驱动架构和原理
(一)事件驱动架构组成结构
1、事件和事件驱动架构
但凡在业务需求中出现如“当…发生…时、一旦出现…”等,描述时,我们就应该考虑是否需要在这些场景中引入事件。
所谓事件(Event),就是将系统中所发生的业务状态变更抽取出来形成一系列独立的对象。
而关于如何在系统中生 成、发布以及消费事件,业界也存在一个非常重要的设计模式,即事件驱动架构(Event-Driven Architecture,EDA)模式。
2、事件驱动架构
事件驱动架构有三个角色两个事件,三个角色分别是发布者、消费者、事件中心,两个事件分别是发布者发布事件,消费者订阅事件。
事件驱动架构的核心价值在于解耦。
3、事件驱动架构示例:
创建问诊过程中需要使用到用户的账户信息,无论是单体系统的模块还是分布式系统的服务都需要考虑耦合度问题。
有两种解耦方案,一种是使用RPC远程调用的方式,一种是使用EDA架构的事件通信来解耦,其中远程调用虽然解耦,但是仍然存在一定的耦合度,而事件通信是一个松耦合的方案。
4、事件驱动架构基础结构
对于事件驱动架构来说,有事件Event和时间处理器Handler:
其中对于Event,会有抽象的事件类AbstractEvent,然后不同的事件都会继承AbstractEvent,例如下图的AccountUpdateEvent和AccountCreateEvent;
而Handler会有不同的实现类,例如图中的AccountUpdateEventHandler和AccountCreateEventHandler
而handler和Event之间有对应关系,例如AccountUpdateEventHandler处理AccountUpdateEvent,AccountCreateEventHandler处理AccountCreateEvent。
除了上述对象外,还有事件分派器EventDispacter,其将不同的Event 交由不同的Handler处理。
5、代码演示
(1)定义事件及其实现类
定义事件类接口Event、抽象实现类AbstractEvent,以及具体的事件类UserCreateEvent、UserUpdateEvent
public interface Event {
Class<? extends Event> getType();
}
public abstract class AbstractEvent implements Event {
@Override
public Class<? extends Event> getType() {
return getClass();
}
}
public class UserCreateEvent extends AbstractEvent{
private User user;
public UserCreateEvent(User user){
this.user = user;
}
public User getUser(){
return this.user;
}
}
public class UserUpdateEvent extends AbstractEvent{
private User user;
public UserUpdateEvent(User user){
this.user = user;
}
public User getUser(){
return this.user;
}
}
(2)定义处理类及其实现类
定义事件处理类接口Handler以及其实现类UserCreateHandler和UserUpdateHandler
public interface Handler<E extends Event> {
String onEvent(E event);
}
public class UserCreateHandler implements Handler<UserCreateEvent> {
@Override
public String onEvent(UserCreateEvent event) {
return String.format("User '%s' has been created", event.getUser().getUserName());
}
}
public class UserUpdateHandler implements Handler<UserUpdateEvent> {
@Override
public String onEvent(UserUpdateEvent event) {
return String.format("User '%s' has been updated", event.getUser().getUserName());
}
}
(3)定义事件分配类
定义事件分配类,提供容器初始化,事件注册和分派事件方法。
public class EventDispatcher {
private Map<Class<? extends Event>, Handler<? extends Event>> handlers;
public EventDispatcher(){
handlers = new HashMap<>();
}
// 注册
public void registerHandler(Class<? extends Event> eventType, Handler<? extends Event> handler){
handlers.put(eventType, handler);
}
// 分派事件
public <E extends Event> String dispatch(E event){
Handler<E> handler = (Handler<E>)handlers.get(event.getClass());
if(handler != null){
String result = handler.onEvent(event);
return result;
}
return "error";
}
}
(4)实际方法调用
EventDispatcher eventDispatcher = new EventDispatcher();
eventDispatcher.registerHandler(UserCreateEvent.class, new UserCreateHandler());
eventDispatcher.registerHandler(UserUpdateEvent.class, new UserUpdateHandler());
User user = new User("lcl");
String result = eventDispatcher.dispatch(new UserCreateEvent(user));
System.out.println(result);
result = eventDispatcher.dispatch(new UserUpdateEvent(user));
System.out.println(result);
6、事件驱动架构扩展:
(1)基于事件驱动架构的系统交互过程
在事件驱动架构的领域事件中,生命周期包括生成(Generate)、存储(Store)、分发(Dispatch)和消费(Consume)四个阶段。 上面的代码演示了生成和分发两个阶段,没有处理存储和消费阶段。更狭义的讲,这四个阶段可以分为两个阶段:发布阶段和处理阶段。
(2)事件订阅者模型
对于事件订阅模型,可以分为简单事件处理者、及时转发事件处理者和存储事件处理者。
简单事件处理者:只做简单的消息处理,例如打印日志等
及时转发事件处理者:将消息处理后转发到消息队列
存储事件处理者:将消息存储后再通过时间转发器发送给消息队列
(二)Spring事件驱动应用方式
1、Spring事件驱动架构:
Spring中事件驱动和上面的案例一样,有事件本身ApplicationEvent、事件发布者ApplicationEventPublisher、事件消费者ApplicationListener。
(1)事件
在Spring中,EventObject是事件的基类,其实现了序列化接口,内部只有一个source属性,表示事件的来源,ApplicationEvent作为事件统一的抽象类,只有一个时间属性。
//事件
public class EventObject implements java.io.Serializable {
protected transient Object source;
}
public abstract class ApplicationEvent extends EventObject {
private final long timestamp;
}
(2)事件发布者
//事件发布者
public interface ApplicationEventPublisher {
void publishEvent(ApplicationEvent event);
}
(3)事件订阅者
和上面的案例没什么区别,不过需要提一下的是,在Spring中,ApplicationContext直接继承了ApplicationEventPublisher接口,那么就意味着所有能获取到ApplicationContext的地方都可以作为事件的订阅者处理事件。
另外Spring还提供了语法糖,可以通过@EventListener注解实现对事件的动态监听。
//事件订阅者
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E event);
}
//ApplicationContext直接继承了ApplicationEventPublisher接口
public interface ApplicationContext extends ApplicationEventPublisher{
}
// 通过@EventListener注解实现对事件的动态监听
@EventListener
@Async
public void placeCustomerStaffSyncNotice(CustomerStaffSyncEvent event) { //这里执行具体邮件发送操作
System.out.println("发送邮件: " + event.getEventContent());
}
2、客服系统演进:Spring事件驱动架构实战
(1)定义事件
@Data
public class CustomerStaffSyncEvent implements Serializable {
private String eventId;
private Date eventTime;
private String eventContent;
public CustomerStaffSyncEvent(String eventContent){
this.eventContent = eventContent;
this.eventId = DistributedId.getInstance().getFastSimpleUUID();
this.eventTime = new Date();
}
}
(2)发布事件
@Override
public List<HangzhouCustomerStaff> findAllCustomerStaffs() {
CustomerStaffSyncEvent customerStaffSyncEvent = new CustomerStaffSyncEvent("findAllCustomerStaffs");
publisher.publishEvent(customerStaffSyncEvent);
return customerStaffRepository.findByIsDeletedFalse();
}
(3)订阅事件
@Override
@EventListener
@Async
public void placecustomerStaffSyncNotice(CustomerStaffSyncEvent event) {
// 模拟执行发送邮件操作
log.info("发送邮件【{}】", JSON.toJSONString(event));
}
@EventListener
public void anotherPlacecustomerStaffSyncNotice(CustomerStaffSyncEvent event) {
// 模拟执行发送邮件操作
log.info("另一个监听【{}】", JSON.toJSONString(event));
}
3、Spring内置事件:
在Spring中也大量的使用了这样的发布订阅事件,最为我们熟悉的就是ContextRefreshedEvent,即每当ApplicationContext被刷新时,Spring 就会发布ContextRefreshedEvent事件。
public abstract class AbstractApplicationContext {
protected void finishRefresh() {
......
// 发布ContextRefreshedEvent事件
publishEvent(new ContextRefreshedEvent(this));
......
}
}
在Spring中ContextRefreshedEvent的发布,首先是AbstarctApplicationContext,其提供了refresh方法,在该方法中,调用了finishRefresh方法,进而调用publishEvent方法来发布一个ContextRefreshedEvent事件。
针对Spring容器启动、停止以及关闭阶段等的生命周期阶段,开发人员可以对对应的事件进行订阅,从而实现对Spring事件的有效监听和处理。
事件处理是集成Spring框架的一种主要手段,如下代码所示,自定义一个类,实现ApplicationListener接口,监听的事件类型是容器停止事件ContextStoppedEvent,然后就可以在自定义类中做自己的业务。
@Component
public class TestApplicationListener implements ApplicationListener<ContextStoppedEvent> {
@Override
public void onApplicationEvent(ContextStoppedEvent contextStoppedEvent) {
System.out.println(contextStoppedEvent);
}
}
4、使用事件与Spring框架集成
(1)Dubbo与Spring框架集成
Dubbo框架基于ContextRefreshedEvent事件完成服务发布,在容器刷新时,重新发布服务。
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
public void afterPropertiesSet() {
} ...
public void setApplicationContext(ApplicationContext applicationContext) {
} ...
public void setBeanName(String name) {
} ...
public void onApplicationEvent(ApplicationEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
export();
}
}...
public void destroy() {
}
}
(2)Nacos与Spring集成
Nacos框架基于WebServerInitializedEvent事件完成服务绑定,在Web容器初始化完成后,做绑定操作,这里不需要关心绑定操作做了什么,只需要知道在这进行的操作即可,后面的文章会对这一块做单独的说明。
public abstract class AbstractAutoServiceRegistration<R extends Registration> ApplicationListener<WebServerInitializedEvent> {
public void onApplicationEvent (WebServerInitializedEvent event){
bind(event);
}
public void bind (WebServerInitializedEvent event){
ApplicationContext context = event.getApplicationContext(); ...
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
}
(三)Spring事件驱动实现原理
AbstractApplicationContext:在容器抽象类AbstractApplicationContext中,调用事件多播器ApplicationEventMulticaster的多播方法multicastEvent来发布事件。
public abstract class AbstractApplicationContext {
public void publishEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
...
// 事件多播器ApplicationEventMulticaster,多播方法multicastEvent
getApplicationEventMulticaster().multicastEvent(event, eventType);
if (this.parent != null) {
this.parent.publishEvent(event);
}
}
}
ApplicationEventMulticaster:ApplicationEventMulticaster相当于观察者模式中的Subject,或是前面样例中的EventDispatcher,维护着一个 ApplicationListener列表,提供事件注册、移除,以及事件发布。
public interface ApplicationEventMulticaster {
void addApplicationListener(ApplicationListener listener);
void addApplicationListenerBean(String listenerBeanName);
void removeApplicationListener(ApplicationListener listener);
void removeApplicationListenerBean(String listenerBeanName);
void removeAllListeners();
void multicastEvent(ApplicationEvent event);
}
SimpleApplicationEventMulticaster:在多播器的具体实现类SimpleApplicationEventMulticaster中,循环所有ApplicationListener,并调用ApplicationListener的onApplicationEvent方法。在这里还判断了是否使用了异步处理如果使用了异步处理,则会通过JDK提供的Executor为每个ApplicationListener启 动一个独立的线程来回调onApplicationEvent方法。
public class SimpleApplicationEventMulticaster {
public void multicastEvent(final ApplicationEvent event) {
for (final ApplicationListener listener : getApplicationListeners(event)) {
Executor executor = getTaskExecutor();
if (executor != null) {
// 通过JDK提供的Executor为每个ApplicationListener启 动一个独立的线程来回调onApplicationEvent方法
executor.execute(new Runnable() {
public void run() {
listener.onApplicationEvent(event);
}
});
} else {
listener.onApplicationEvent(event);
}
}
}
}
注册时机:再回到AbstractApplicationContext,先调用initApplicationEventMulticaster方法初始化自定义事件广播器,即获取当前上下文中的所有ApplicationListener,然后在onRefresh方法后注册监听器,也就是将 ApplicationListener 添加到ApplicationEventMulticaster中。
public abstract class AbstractApplicationContext {
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
...
try {
...
//注册用来拦截Bean创建的BeanPostProcessor
registerBeanPostProcessors(beanFactory);
...
// 初始化自定义事件广播器
initApplicationEventMulticaster(); // 获取当前上下文中的所有ApplicationListener
// 执行刷新
onRefresh();
// 注册监听器
registerListeners(); // 将 ApplicationListener 添加到ApplicationEventMulticaster中
...
}
}
}
}
ApplicationListenerDetector:最适合做添加监听器的地方就是BeanPostProcessor,即Bean的后置处理器,在Bean初始化完成后,添加一些额外的功能,在ApplicationListenerDetector中,其实现了DestructionAwareBeanPostProcessor接口,然后在其postProcessAfterInitialization方法中,判断bean是否实现类ApplicationListener接口,如果实现就将其添加到容器中。
class ApplicationListenerDetector implements DestructionAwareBeanPostProcessor {
// 实现BeanPostProcessor接口的postProcessAfterInitialization 方法,完成ApplicationListener的添加
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof ApplicationListener) {
Boolean flag = this.singletonNames.get(beanName);
if (Boolean.TRUE.equals(flag)) {
this.applicationContext.addApplicationListener((ApplicationListener<?>) bean);
}
...
}
return bean;
}
}
Spring事件驱动架构总结:
AbstractApplicationContext实现类ApplicationEventPublisher接口,而ApplicationEventPublisher又对于ApplicationEvent有依赖。
AbstractApplicationContext通过ApplicationEventMulticaster添加、删除、发布事件,ApplicationListener来监听事件,ApplicationListener继承自EventListener。
多播器ApplicationEventMuticaster的集体实现是SimpleApplicationEventMuticaster,在该类中循环所有监听者并调用监听方法。
二、消息通信机制及中间件
(一)消息通信机制及实现框架
1、为什么要引入消息中间件
消息通信机制的核心优势:系统扩展、分布式解耦、流量削峰、数据最终一致性
(1)分布式解耦
RPC架构或者是Restful风格对于整个分布式交互来说还是有一定的耦合性的,特别是RPC架构,在调用时必须要使用指定的框架和协议。分布式解耦主要是从技术、空间、时间等维度进行解耦。
技术耦合:以Dubbo为例,必须使用Dubbo框架,服务提供方和消费方的协议要一致等等,如下图所示的RMI,服务双方都必须使用Java进行编码。总的来说就是服务双方已经绑定在一起了,必须要使用相同的语言、技术、框架来进行通信。
空间耦合: 以下图为例,服务端提供了一个入参为id的getUserById方法,那么调用方就必须使用该方法签名进行调用,那么对于服务双发都不能随意更改方法签名,如果改了,就需要重新写代码进行调用,并且需要重新测试、上线等。
时间耦合:当服务提供方出现问题,就会导致调用方调用异常,也就是服务双方必须同时在线,这个调用才能成功。
(2)流量削峰:
流量削峰就好比针对交通早高峰和晚高峰的问题所采用的的错峰限行解决方案,其本质是延缓用户请求的处理时机, 以及层层过滤用户的访问需求,让服务端处理变得平稳,同时也可以节省服务器的资源成本。
要对流量进行削峰,最常用的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送, 中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去。
(3)系统扩展:
例如客户服务要做某一个操作,首先需要验证用户的账户是否可以做后续的操作,那么一般的操作流程是客户服务调用账户服务获取账户信息并校验,这样就存在系统耦合、性能、扩展上的问题。
使用消息中间件,账户服务的变更通过消息队列发送并被客户服务消费,客户服务本地存储一份账户数据,做校验时根据本地数据做校验即可,这样解决了耦合和性能的问题,而账户服务的数据可能不止客户服务一个系统会消费,别的系统也可能会进行消费,因此使用消息队列还解决了系统扩展性的问题。
(4)数据最终一致性
主要是通过本地事务与消息发送的原子性,以及事务参与方与接收消息的可靠性来共同解决,这个问题后续会单独展开。
2、消息通信机制的交互模式
(1)消息通信交互模式
一般而言,我们认为消息通信有两种交互模式:点对点模式和发布订阅模式
比较正统的来说,点对点才有队列的概念,发布订阅才有主题的概念,但是目前消息中间件把这两个概念搞得比较模糊了,只要是消息处理都叫队列,因为一对一也是一对多的一种。因此现在很多消息队列都将其叫做主题,而主题中又包含队列。
(2)消息通信机制的核心功能
无论是点对点模式还是发布订阅模式,所做的事情无外乎消息发布和消息消费。
对于消息发布场景来说,存在的消息类型:普通消息、顺序消息、延迟消息、事务消息、单向消息、批量消息
对于消息消费场景来说,存在以下处理方式:拉(Pull)模式消费、推(Push)模式消费、消息过滤(Filter)
3、消息通信规范和框架
针对于不同的通信方式和核心功能,产生了许多通信规范与框架。
(1) JMS
JMS(Java Messaging Service,Java消息服务)是一种规范 ,JMS对应的核心类如下图所示,有点类似 JDBC 规范,其实原理差不多,按照 JMS 规范定义,就可以自己写一套基于 JMS 的消息中间件。
JMS规范的代表性实现框架是 ActiveMQ,ActiveMQ虽然目前仍然还在用,但是一些高并发的互联网场景,由于其存在性能问题,现在已经不怎么使用了;但是对于其设计和思想还是可以借鉴一下,例如 JMS 规范,目前还是比较主流的规范。
(2) AMQP:
AMQP(Advanced Message Queuing Protocol,高级消息队列协议),AMQP中没有点对点和发布订阅的概念,其是使用Exchange和Queue的动态绑定来实现的一对一(Queue)和一对多(Topic)。
AMQP规范的代表性实现框架是RabbitMQ,RabbitMQ在中小型互联网用比较多,因为其维护成本相对较低,开发起来比较简单,性能也不错。
(3)Kafka
Kafka目前是比较火的一款消息队列,在大数据领域和流式计算领域属于主流的消息中间件。
之前的版本是使用Zookeeper来做分布式协调,新的版本已经移除了对于第三方系统(Zookeeper)的依赖。broker注册到分布式协调器上后,生产产推送数据,消费者消费数据,需要注意的是,在 Kafka 中,消费者只有拉模式。
4、RocketMQ
RocketMQ中有Name Server和Broker,Name Server 是命名服务,类似于Kafka中的Zookeeper,主要用来做broker的注册以及生产者消费者针对于broker的路由。
(二)Spring消息通信解决方案
Spring对于消息通信提供了一整套的解决方案,分别有 Spring Messaging、Spring Integration、Spring Cloud Stream。
最底层的消息通信抽象是Spring Messaging,其脱离于任何的框架,可以将其理解为一个框架,Spring容器自带的,在Spring Core中。
在Spring Messaging之上又封装了一层Spring Integration,其是一个独立的框架,主要是在Spring Messaging提供的基础通信能力外,提供了面向系统集成方面的能力,这种能力有点类似之前的总线架构。目前Spring Integration存在感不是很强,用的不是很多,但是在ESB架构中,还是会有一些应用。
最上层的是Sring Cloud Stream,他可以将RocketMQ、Kafka、RabbitMQ等整合成一套,使用起来像是用了同一个中间件。
1、Spring Messaging
首先是Message消息对象的抽象,只有getPayload和getHeaders两个方法,分别是获取消息体和消息头。
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
然后是消息通道MessageChannel,主要定义了消息发送方法,而没有定义消息消费方法,因为消息消费有推、拉、推垃结合等多种方式,每一种的处理都不太一样,因此对于消息通道的顶层抽象,只设计了消息发送。
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
boolean send(Message<?> message, long timeout);
}
通道可以简单理解为是对队列的一种抽象,通道的名称和队列相同,但是作为一种抽象和封装, 各个消息通信系统所特有的队列概念并不会直接暴露在业务代码中,而是通过通道来对队列进行配置。简单的说,同一个内容,在消息中间件的维度叫队列,在业务的角度叫通道,从而保证了不会混淆。
因此通道和队列时一对多的关系,一个队列只能被一个通道使用,而一个通道可以被多个队列使用。
针对于消息消费者来说,有推和拉的模式,因此其提供了PollableChannel和SubscribableChannel。
PollableChannel是拉模式,因此使用receive方法进行接收;
SubscribableChannel是推模式,有subscribe和unsubscribe两个方法,分别是订阅和取消订阅,订阅时传入消息处理器MessageHandler,订阅完成后,处理消息的操作转嫁给了消息处理器MessageHandler的handleMessage方法进行处理。
这里的设计是非常清晰的,不是一个MessageChannel全都搞定的,而是分层、分体系的抽象,该拉的拉、该推的推,该处理消息的处理消息。
public interface PollableChannel extends MessageChannel {
Message<?> receive();
// PollableChannel才有receive的概念
Message<?> receive(long timeout);
}
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
// SubscribableChannel使用回调来响应事件
boolean unsubscribe(MessageHandler handler);
}
public interface MessageHandler {
void handleMessage(Message<?> message) throws MessagingException;
}
2、Spring Integration:
Spring Integration本质上是一种服务总线。对于通过做了很多封装例如:
RendezvousChannel:阻塞式的通道,只能进一个出一个,一个出了之后才能进下一个。
QueueChannel:缓存式通道
ProorityChannel:有优先级的通道
DirectChannel:发送消息和接收消息是同一个线程
ExecutorChannel:多线程通道
3、Spring Cloud Stream:
Spring Cloud Stream 中也有Channel,但是Channel 下不是Queue,而是Binder绑定器,不同的消息中间件两边通过绑定器做适配,从而屏蔽不同消息中间件的差异。
(三)客服系统案例演进
对于客服系统领域事件做了一个抽象,首先是BaseEvent,其是脱离了业务场景的基础事件,包括事件唯一编号和时间发生时间;然后定义了DomainEvent,主要定义了事件类型、操作类型、消息内容,主要是针对业务做了一些扩展。
public abstract class BaseEvent implements Serializable {
//事件唯一编号
private String eventId;
//事件时间
private Date eventTime;
}
public abstract class DomainEvent<T> extends BaseEvent {
//自定义事件类型
private String type;
//事件所对应的操作
private String operation;
//事件对应的领域对象
private T message;
}
针对于客服系统消息通信场景来说,有两个场景可以使用消息队列:
IM消息存储应用场景:原来是 IM 系统自己保存数据到数据库,变为 IM 系统发送消息队列,然后在 IM事件 系统保存数据到数据库。
客服工单创建应用场景:原来的客户工单处理时,需要调用Customer来获取客服信息,可以改成在Customer服务中一旦发生数据变更,则发送消息给工单系统,那么工单系统在创建工单时,就使用本地数据库存储的customer服务。
三、基于RocketMQ实现消息发布
(一)RocketMQ基本概念和架构
1、RocketMQ基本概念:
(1)消息(Message)
消息是RocketMQ生产和消费数据的最小单位,每条消息必须属于一个主题。
可以看到其包含topic、flag等信息,在老的版本中,还会包含Keys、Tags、UserProperty、DelayTimeLevel等信息,在4.9以后的版本中,统一将Keys、Tags、UserProperty、DelayTimeLevel等信息放在了properties中。
public class Message implements Serializable {
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;
}
(2)主题(Topic)
Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ 进行消息订阅的基本单位。
一个生产者可以同时发送多种Topic的消息;
一个消费者只对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic的消息。
(3)队列(Queue)
存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。
一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。一个Topic的Queue中的消息只能被一个消费者组(Consumer Group)中的一个消费者消费。一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。
(4)标签(Tag)
为消息设置的标签,用于同一主题下区分不同类型的消息。
来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。 Topic是消息的一级分类,Tag是消息的二级分类。
2、RocketMQ系统架构:
(1)RocketMQ角色:
生产者组(Producer Group):
RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。 生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。 一个生产者组可以同时发送多个主题的消息,发送过程支持快速失败并且低延迟。
消费者组(Consumer Group):
一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。 RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现。 消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息。
Broker:
Broker充当着消息中转角色,负责存储消息、转发消息。 Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。 Broker同时存储着消息相关的各种元数据。
Name Server:
Broker管理:管理Broker实例的注册,提供心跳检测机制检查Broker是否存活。
路由信息管理:保存Broker集群的整个路由信息和用于客户端查询的队列信息。Producer和 Conumser通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。
(2)RocketMQ执行流程:
a、启动NameServer:NameServer监听端口,等待Broker、生产者、消费者连接,相当于一个路由控制中心
b、启动Broker:跟NameServer 保持长连接,定时发送心跳包
c、创建Topic:创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic
d、发送消息:跟NameServer建立长连接并获取Topic,然后与队列所在的Broker建立长连接从而向Broker发消息
e、消费消息:跟NameServer建立长连接并获取Topic,然后直接跟Broker建立连接通道,开始消费消息
(二)RocketMQ消息发送方式
单向消息:单向消息主要用在不特别关心发送结果的场景,例如日志发送
public interface MQProducer extends MQAdmin {
void sendOneway(final Message msg) throws ...;
void sendOneway(final Message msg, final MessageQueue mq) throws ...;
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws ...;
...
}
同步消息:同步、可靠地发送方式使用的比较广泛,比如:重要的消息通知
public interface MQProducer extends MQAdmin {
SendResult send(final Message msg) throws ...;
SendResult send(final Message msg, final long timeout) throws ...;
SendResult send(final Message msg, final MessageQueue mq) throws ...;
SendResult send(final Message msg, final MessageQueue mq, final long timeout) throws ...;
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) throws ...;
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, final long timeout) throws ...;
...
}
异步消息:异步消息基于回调(Callback)机制实现对发送结果的异步通知
public interface MQProducer extends MQAdmin {
void send(final Message msg, final SendCallback sendCallback) throws ...;
void send(final Message msg, final SendCallback sendCallback, final long timeout) throws ...;
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws ...;
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout) throws ...;
void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws ...;
void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) throws ...;
...
}
批量消息:批量发送消息同样支持同步和异步两种发送方式
public interface MQProducer extends MQAdmin {
SendResult send(final Collection<Message> msgs) throws ...;
SendResult send(final Collection<Message> msgs, final long timeout) throws ...;
SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws ...;
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout) throws ...;
void send(final Collection<Message> msgs, final SendCallback sendCallback) throws ...;
void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws ...;
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws ...;
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws ...;
...
}
普通消息总结:
RocketMQ生产者 - 启动和关闭:
在RocketMQ中,生产者是MQProducer,它有一个默认实现DefaultMQProducer,提供了启动和停止两个方法,启动方法中要与服务端建立长连接并发送心跳,停止方法中要做保证消息发送完成再关闭等处理。
public interface MQProducer extends MQAdmin {
void start() throws MQClientException;
void shutdown();
...
}
(三)客服系统案例演进
上面提到,对于客服系统演示来说,有两个场景可以用消息队列,分别是消息的持久化和工单的创建。
1、消息持久化场景
(1)创建通用事件
ublic abstract class BaseEvent implements Serializable {
private static final long serialVersionUID = 1L;
private String eventId;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private Date eventTime;
public BaseEvent() {
this.eventId = "Event" + UUID.randomUUID().toString().toUpperCase();
this.eventTime = new Date();
}
public String getEventId() {
return eventId;
}
public Date getEventTime() {
return eventTime;
}
}
@Data
public abstract class DomainEvent<T> extends BaseEvent{
//自定义事件类型
private String type;
//事件所对应的操作
private String operation;
//事件对应的领域对象
private T message;
}
(2)创建消息创建事件
@Data
public class ImMessageCreatedEvent extends DomainEvent<ImMessage> {
}
(3)引入RocketMQ依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
(4)配置RocketMQ信息
rocketmq:
producer:
group: producer_im
name-server: 192.168.124.13:9876
(5)事务持久化逻辑
这里的持久化改为发送MQ消息,由于之前已经有了ImMessageService的实现ImMessageServiceImpl,且ImMessageServiceImpl也注入到了Spring容器,因此单独设置其Service名称为event。
@Service("event")
public class MQImMessageServiceImpl implements ImMessageService {
private final String TOPIC_IM = "topic_im";
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void saveImMessage(ImMessage imMessage) {
ImMessageCreatedEvent event = new ImMessageCreatedEvent();
event.setMessage(imMessage);
event.setOperation("CREATE");
event.setType("IM");
rocketMQTemplate.convertAndSend(TOPIC_IM, event);
}
}
(6)调整Controller的调用
在Controller中可以直接注入ImMessageService并调用其saveImMessage方法,由于调用时默认会走ImMessageServiceImpl,因此需要在注入ImMessageService时设置注入的名称。
@Autowired
@Qualifier("event")
private ImMessageService imMessageService;
2、客服工单创建应用场景
(1)消息发送逻辑
@Component
public class CustomerStaffChangedEventProducer {
private final String TOPIC_STAFF = "topic_staff";
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendCustomerStaffChangedEvent(CustomerStaff customerStaff, String operation){
CustomerStaffEventDTO customerStaffEventDTO = new CustomerStaffEventDTO();
customerStaffEventDTO.setId(customerStaff.getId());
customerStaffEventDTO.setStaffName(customerStaff.getStaffName());
customerStaffEventDTO.setAccountId(customerStaff.getAccountId());
customerStaffEventDTO.setPhone(customerStaff.getPhone());
CustomerStaffChangedEvent event = new CustomerStaffChangedEvent();
event.setType("STAFF");
event.setOperation(operation);
event.setMessage(customerStaffEventDTO);
rocketMQTemplate.convertAndSend(TOPIC_STAFF, event);
}
}
(2)业务逻辑
在之前的代码中,对于工单创建和变更,直接存储了系统,这里改为先发送MQ再存储。
@Override
public Boolean createCustomerStaff(CustomerStaff customerStaff) throws BizException {
customerStaffChangedEventProducer.sendCustomerStaffChangedEvent(customerStaff, "CREATE");
return this.save(customerStaff);
}
@Override
public Boolean updateCustomerStaff(CustomerStaff customerStaff) {
customerStaffChangedEventProducer.sendCustomerStaffChangedEvent(customerStaff, "UPDATE");
return this.updateById(customerStaff);
}
四、基于RocketMQ实现消息消费
(一)RocketMQ消息消费方式
1、消费方式
RocketMQ消费方式有推、拉两种模式,拉模式面向于Topic,推模式面向于Queue。
PUSH:实时性高,但会增加服务端负载;对消费端能力有要求,如果PUSH的速度过快,消费端可能会出现限流问题。
PULL:消费者从服务端拉消息,主动权在消费端,可控性好; PULL的时机很重要,间隔过短则空请求会多浪费资源,间隔太长则消息不能及时处理。
2、RocketMQ消费者
RocketMQ的消费者类信息如下图所示,基类是MQConsumer,针对与推模式和拉模式,分别提供了MQPushConsumer和MQPullConsumer,其默认实现类为DefaultMQPushConsumer和DefaultMQPullConsumer。
public interface MQConsumer extends MQAdmin {
//如果消费失败,消息会被重新发送到Broker并在一定时间之后再次被消费
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) throws ...;
//基于Topic从消费者缓存中获取消息队列信息
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws...;
}
可以看到MQConsumer中提供了sendMessageBack和fetchSubscribeMessageQueues两个方法,sendMessageBack方法表示如果消费失败,消息会被重新发送到Broker并在一定时间之后再次被消费;fetchSubscribeMessageQueues方法表示基于Topic从消费者缓存中获取消息队列信息。
对于推拉两个模式来的接口类来说:
MQPullConsumer:Pull消费方式中,抓取消息的过程需要开发自己实现,首先基于Topic拿到MessageQueue的集合, 然后遍历MessageQueue集合,并针对每一个MessageQueue批量取消息。取完一次消息后,记录该队列下一次要取的开始偏移量(Offset),直到取完了再换另外一个MessageQueue。
MQPushConsumer:Push消费方式中,消费者把轮询过程进行封装,并注册MessageListener监听器。一旦获取消息,就唤醒MessageListener的consumeMessage方法进行消费,对用户而言感受消息是被推送过来的。
MQPushConsumer定义:
推模式的消费者提供了启动与关闭、注册监听器、消息订阅这三组方法。
public interface MQPushConsumer extends MQConsumer {
// 开启和关闭
void start() throws MQClientException;
void shutdown();
// 注册监听器
@Deprecated
void registerMessageListener(MessageListener messageListener);
void registerMessageListener(final MessageListenerConcurrently messageListener);
void registerMessageListener(final MessageListenerOrderly messageListener);
// 消息订阅
void subscribe(final String topic, final String subExpression) throws...;
@Deprecated
void subscribe(final String topic, final String fullClassName, final String filterClassSource) ...;
void subscribe(final String topic, final MessageSelector selector) throws ...;
void unsubscribe(final String topic);
}
示例:原生的使用样例如下所示,设置了NameServer的地址、topic、消费者组等信息,然后注册监听器并启动消费者;在监听器中,使用了并发处理的监听器,消费完成后,返回消费成功。
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("topic_im");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 消费消息
MessageExt msg = msgs.get(0);
String message = new String(msg.getBody());
System.out.println(message);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
MQPullConsumer定义:
提供了开启和关闭、消息拉取、处理偏移量这三组方法。
public interface MQPullConsumer extends MQConsumer {
// 开启和关闭
void start() throws MQClientException;
void shutdown();
void registerMessageQueueListener(final String topic, final MessageQueueListener listener);
// Pull重载方法组
PullResult pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums) throws...; ...
PullResult pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset, final int maxNums) throws...;
// 处理消费偏移量
void updateConsumeOffset(final MessageQueue mq, final long offset) throws...;
long fetchConsumeOffset(final MessageQueue mq, final boolean fromStore) throws...;
}
示例:原生的使用方法如下所示,配置信息和推模式一样,然后针对于指定的Queue拉取消息,处理完成后,设置其消费偏移量。
public class PullScheduleService {
public static void main(String[] args) throws MQClientException {
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("myConsumerGroup");
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("127.0.0.1:9876");
scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.registerPullTaskCallback("topic_im", new PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
try {
// 消息偏移量
long offset = consumer.fetchConsumeOffset(mq, false);
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> messages = pullResult.getMsgFoundList();
... // 消息消息
break;
...}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
// 设置重新拉取时机
context.setPullNextDelayTimeMillis(3000);
}
}
}); scheduleService.start();
}
}
3、Spring RocketMQ集成方式
针对于在实际业务代码中如何使用RocketMQ,不太建议使用推模式,因为其比较底层,需要自己设置Queue和偏移量,使用推模式比较好,但是原生的推模式API使用起来仍然是比较麻烦,因此可以使用Spring来集成。
对于消息发送来说,上面的代码已经演示,这里再给个样例:
//消息发布
@Service("event")
public class MQImMessageServiceImpl implements ImMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void saveImMessage(ImMessage imMessage) {
rocketMQTemplate.convertAndSend(TOPIC_IM, event);
}
}
对于消费者而言,直接使用@RocketMQMessageListener注解设置消费者的配置信息,然后实现RocketMQListener接口,并实现onMessage方法即可。
//消息消费
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_im", topic = "topic_im")
public class ImMessageConsumer implements RocketMQListener<ImMessageCreatedEvent> {
@Override
public void onMessage(ImMessageCreatedEvent message) { ...}
}
(二)RocketMQ消息可靠性机制
在考虑消息的可靠性时,主要需要了解在哪个环境可能造成消息的丢失,对于整个消息的处理流程来说,主要包含发送消息、消息存储、消息消费。
在生产者发送消息到Broker时,可能存在消息发送失败的情况;
在Broker内部存储消息到磁盘以及主从复制同步时,可能存在落盘失败和同步失败的情况;
Broker把消息推送给消费者或者消费者主动拉取消息时,可能存在消费者消费失败但是消息队列认为消费者消费成功的情况。
针对于以上三个节点,RocketMQ或者业务侧都要做响应的处理,才能保证消息的可靠性。
1、消息发送的可靠性:
上面提到,消息发送可以分为单项发送、同步发送和异步发送:
单向发送:业务方无法根据发送的状态来判断消息是否发送成功,是一种不可靠的消息发送方式,不推荐使用
同步发送:发送完消息后同步检查Broker返回的状态来判断消息是否持久化成功,如果发送超时或者失败,执行重试,注意幂等性
异步发送:消息发送结果会回传给相应的回调函数,可以根据发送的结果来判断是否需要重试来保证消息的可靠性,注意幂等性
因此对于消息发送来说,要保证一定发送成功,需要使用同步发送或异步发送,但是无论使用哪种发送方式,都要做失败重试,对于失败重试来说,可以有不同的重试策略:
如果同步模式发送失败,则轮转到下一个Broker进行重试;
如果异步模式发送失败,则只会在当前Broker进行重试;
最多重试2次;
如果生产者本身向Broker发送消息产生超时异常,就不会再重试。
另外除了RocketMQ本身的重试策略外,业务系统还可以增加定制化的重试逻辑,如把消息存储下来定时发送到Broker,这个已经不是MQ的范畴了,同时也不太建议这么处理。
2、Broker存储端消息可靠性:
对于存储系统来说,为了保证系统的可靠性和性能,都会采用分区和副本的处理,而RocketMQ也不例外,其使用多个Queue来做分区,而每个分区都有副本来保证数据的可靠性,那么对于Broker存储消息可靠性来说,主要存在单一副本的刷盘可靠性和主从复制的可靠性两个维度,及刷盘和复制。
(1)刷盘操作:
同步刷盘:消息写入内存的 PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。数据绝对安全,但是吞吐量不大。
异步刷盘(默认):消息写入到内存的 PageCache就立刻给客户端返回写操作成功,当PageCache中的消息积累到一定的量或定时触发一次写磁盘操作;吞吐量大,性能高,但是PageCache中数据可能丢失,不保证数据绝对安全。
(2)复制操作:
同步复制(推荐):Master和Slave均写成功后才反馈给客户端写成功状态;如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
异步复制:只要Master写成功,即可反馈给客户端写成功状态;系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢。
实际生产环境一般使用异步刷盘和同步复制,RocketMQ的默认值也是这样的,因为刷盘本身是一个非常耗时的操作,且刷盘失败的概率是非常小的,为了在这个节点保证性能,一般采用异步刷盘;但是采用了异步刷盘后,再采用异步复制,就真的会丢失数据,由于复制主要是内存之间的复制,因此性能损耗相比于刷盘来说要小得多,因此为了保证性能和可靠性的平衡,一般采用异步刷盘同步复制的策略。
3、消费可靠性--重试策略
对于消费者消费数据的可靠性来说,只有返回CONSUME_SUCCESS才算消费完成;如果消费失败,就返回CONSUME_LATER,返回CONSUMER_LATER会按照不同的messageDelayLevel时间进行再次消费,最长时间为2个小时后再次进行消费重试;如果消费满16次之后还是未能消费成功则不再重试,会将消息发送到死信队列 ;
在死信队列的数据并不是丢失,而是换了一个Topic存储,还是可以通过RocketMQ提供的相关接口从死信队列获取到相应的消息进行后续处理的。
4、RocketMQ的可靠性总结
消息发送方:通过不同的重试策略保证了消息的可靠发送
Broker服务端:通过不同的刷盘机制以及主从复制来保证消息的可靠存储
消息消费方:通过至少消费成功一次以及消费重试机制来保证消息的可靠消费
(三)客服系统案例演进
1、IM消息存储应用场景
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_im", topic = "topic_im")
public class ImMessageConsumer implements RocketMQListener<ImMessageCreatedEvent> {
@Autowired
private ImMessageService imMessageService;
@Override
public void onMessage(ImMessageCreatedEvent imMessageCreatedEvent) {
log.info("Receive message:【{}】", JSON.toJSONString(imMessageCreatedEvent));
imMessageService.saveImMessage(imMessageCreatedEvent.getMessage());
}
}
2、客服工单创建应用场景
(1)创建客服工单表和本地的客服人员表
CREATE TABLE `customer_ticket` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`ticket_no` varchar(64) NOT NULL COMMENT '工单编号',
`inquire` varchar(255) NOT NULL COMMENT '工单咨询内容',
`user_id` bigint(20) NOT NULL COMMENT '用户Id',
`staff_id` bigint(20) NOT NULL COMMENT '客服人员Id',
`status` int(4) NOT NULL DEFAULT '1' COMMENT '工单状态,1:初始化,2:进行中,3:结束',
`score` int(11) DEFAULT NULL COMMENT '工单评分',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='客服工单表';
CREATE TABLE `local_customer_staff` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`staff_name` varchar(45) NOT NULL COMMENT '客服姓名',
`account_id` bigint(20) NOT NULL COMMENT '账号Id',
`phone` varchar(11) DEFAULT NULL COMMENT '手机号',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8 COMMENT='客服人员表';
(2)消费逻辑
根据消息的操作类型,调用增删改方法操作数据库。
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_staff", topic = "topic_staff")
public class CustomerStaffConsumer implements RocketMQListener<CustomerStaffChangedEvent> {
@Autowired
private ILocalCustomerStaffService localCustomerStaffService;
@Override
public void onMessage(CustomerStaffChangedEvent message) {
System.out.println("Received message : " + message);
CustomerStaffEventDTO dto = message.getMessage();
LocalCustomerStaff localCustomerStaff = new LocalCustomerStaff();
convertLocalCustomerStaff(dto, localCustomerStaff);
String operation = message.getOperation();
if(operation.equals("CREATE")) {
localCustomerStaffService.insertLocalCustomerStaff(localCustomerStaff);
} else if(operation.equals("UPDATE")) {
localCustomerStaffService.updateLocalCustomerStaff(localCustomerStaff);
} else if(operation.equals("DELETE")) {
localCustomerStaffService.deleteLocalCustomerStaff(localCustomerStaff);
}
}
private void convertLocalCustomerStaff(CustomerStaffEventDTO dto, LocalCustomerStaff localCustomerStaff) {
localCustomerStaff.setStaffId(dto.getId());
localCustomerStaff.setStaffName(dto.getStaffName());
localCustomerStaff.setAccountId(dto.getAccountId());
localCustomerStaff.setPhone(dto.getPhone());
}
}
五、RocketMQ高级特性
(一)事务消息机制
RocketMQ的事务消息主要是解决的消息最终一致性,分别从消息发送方和消息接收方分别进行的处理。
消息发送方:解决执行本地事务与发送消息的原子性问题。保证本地事务执行成功,消息一定发送成功。
消息接收方:解决接收消息与本地事务的原子性问题。保证接收消息成功后,本地事务一定执行成功。
RpcketMQ事务消息的机制是使用了半事务消息Topic,生产者发送消息到半事务消息Topic,本地事务成功后,则将半事务消息放到业务Topic,如果事务回滚,则将半事务Topic中的消息设置为已消费,如果服务发送方长时间不提交或回滚事务,消息队列本身有回查机制,可以回查本地事务状态,从而做出对应的处理。
(二)延迟消息机制
延迟消息:当消息写入到Broker后,不能立刻被消费者消费,需要等待指定的时长后才可被消费处理的消息。
RocketMQ延时消息的延迟时长不支持随意时的延迟,是通过特定的延迟等级来指定的。默认支持18个等级的延迟消息(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)。
message.setDelayTimeLevel(8); // 对应第八个延迟,即 4m
延时消息使用场景:
订单超时未支付:支付超时时延时消息被消费,自动执行取消订单等逻辑
各类活动场景:活动结束时延时消息被消费,灵活实现活动结束触发的逻辑处理
信息提醒类场景:异步发送各种定制化时间的消息通知
延时队列的实现思想和事务消息一样,都是使用内置消息Topic来暂时存储,然后通过一定的手段最终将其放回原业务Topic。
延时消息实现过程:
1、消息发送:消息发送时,使用的Topic是业务Topic,但是需要设置延时级别
2、修改消息Topic名称和队列信息: 消息队列将Topic的名称修改为SCHEDULE_TOPIC_XXX(XXX是原业务Topic名称),并根据延迟级别确定放入哪个Queue,这里可以看出来,RocketMQ默认了18个延迟级别,因此延迟队列Topic下也有18个Queue,而相同延迟级别的消息会进入相同的Queue。
3、转发消息到延迟级别对应的队列中:根据修改后的Topic和Queue进行投递
4、调度服务消费SCHEDULE_TOPIC_XXX 主题中的消息:通过ScheduleMessageService执行定时任务处理延迟消息Topic
5、将信息重新存储到CommitLog中:消息到期后,需要投递到目标Topic,存储到CommitLog
6、将消息投递到目标Topic中:RocketMQ会直接投递到目标Topic的ConsumeQueue中,之后消费者即消费到这条消息
7、消费者消费目标Topic中的数据:通过标准消费处理方式进行消息消费
(三)消息过滤机制
在实际应用时,可能只是用某一类的数据,其他类型的数据我们不关心,但是对于消息发送方来说,发送的消息不是单一针对一个系统的,因此发的是全量的信息,那么就需要在消费者业务代码中进行处理,是我需要的类型,就消费,不是我需要的类型,就抛出异常或者返回消费成功。
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group", topic = "topic")
public class Consumer implements RocketMQListener<Event> {
@Override
public void onMessage(Event message) {
// 只需要消费TYPE1类型的消息, 其他消息都是无效消息
if (message.getType == "TYPE1") { ...} else {
throw new Exception("无效的消息类型");
}
}
}
消息的序列化和反序列化以及网络传输都是非常耗性能的处理,对于不需要的消息也被消费者拿到,通过消费者自己判断的这种方式,则会比较耗性能,因此RocketMQ提供了消息过滤机制,直接将消息进行过滤。
RocketMQ的消息过滤分为表达式过滤和类过滤,表达式过滤又分为 Tag过滤和 SQL过滤,类过滤主要是 Filter Server过滤。
1、Tag 过滤
Tag过滤是RocketMQ应用中的主流过滤方式,生产者发送消息时发送带有Tag的消息,消费者消费时,在Broker和消费者两端都会做过滤处理。
首先在 Broker 端进行了初步的过滤,不满足过滤条件的消息会在 Broker 端被忽略,不进行存储和转发。这样可以减少不符合条件的消息在网络传输中的数据量和消费者端的处理开销,提高了系统的性能。
在某些情况下,消费者端也需要进行一定的过滤操作。这是因为在消息传输过程中,可能存在网络延迟、分区负载均衡等因素,导致消息在传输过程中的状态发生变化。虽然在 Broker 端进行了初步的过滤,但在消息到达消费者端之前,仍然可能会发生变化。
在消费者端进行进一步的过滤,可以确保只有符合消费者需求的消息被真正消费和处理。这样可以提高消息消费的准确性和可靠性,避免不必要的消息处理。
总的来说,Broker 端的过滤是为了减少网络传输和降低消费者端的处理开销,而消费者端的过滤是为了确保只有符合消费者需求的消息被真正消费和处理。两者相互配合,可以提高消息系统的性能和可靠性。
消息发送样例:
//Tag过滤:消息发送
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for(
int i = 0;
i< 10;i++)
{
String tag = tags[i % tags.length];
String msg = "hello, 这是第" + (i + 1) + "条消息";
// 创建消息,并设置Tag内容
Message message = new Message("FilterMessageTopic", tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
消息消费样例:
//Tag过滤:消息消费
pushConsumer.subscribe("FilterMessageTopic","TagA || TagC || TagD"); // 订阅主题,并设置Tag过滤条件
2、SQL过滤
只有使用Push推送模式的消费者才能用使用SQL92标准的SQL语句
SQL语法支持:数值比较,如:>,>=,<,<=,=;字符比较,如:=,<>,IN;IS NULL 或者 IS NOT NULL;逻辑符号 AND,OR,NOT
常量类型支持:数值,比如:123,3.1415;字符,比如:'abc';NULL,特殊的常量;布尔值,TRUE 或 FALSE
首先需要修改broker的配置
//生效配置
conf/broker.conf ->enablePropertyFilter=true
在消息发送时设置属性信息
//SQL过滤:消息发送
Message msg = new Message("topic_a", ("test").getBytes());
// 使用UserProperty设置属性
msg.putUserProperty("age","30");
msg.putUserProperty("name","lcl");
producer.send(msg);
最后在消费时使用SQL过滤消息
//SQL过滤:消息消费
consumer.subscribe("topic_a",MessageSelector.bySql("age > 35 and name = 'lcl'")); // 通过MessageSelector计算属性
3、Filter Server过滤:
Filter Server过滤是指在Broker端运行1个或多个消息过滤服务器(Filter Server), RocketMQ允许消息消费者自定义消息过滤实现类并将其代码上传到Filter Server上。
消息消费者向Filter Server拉取消息,FilterServer将消息消费者的拉取命令转发到Broker,然后对返回的消息执行消息过滤逻辑,最终将消息返回给消费端。
由于Filter Server与Broker运行在同一台机器上,消息的传输是通过本地回环通信,不会浪费Broker端的网络资源。
(四)客服系统案例演进
1、客服系统消息延迟场景:
使用 IM 消息存储应用场景来做延迟消息演示。
延迟消息发送:
普通消息发送使用rocketMQTemplate的convertAndSend方法,其将消息实体转换为Spring-Message中的Message对象进行发送,这是Spring对于消息队列的一种抽象;
发送延迟消息场景,rocketMQTemplate提供了较为原生的方法syncSend,需要传入Topic、Message、超时时间、延迟级别。
@Slf4j
@Service("event_delay")
public class MQDelayImMessageServiceImpl implements ImMessageService {
private final String TOPIC_IM = "topic_im_delay";
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void saveImMessage(ImMessage imMessage) {
ImMessageCreatedEvent event = new ImMessageCreatedEvent();
event.setMessage(imMessage);
event.setOperation("CREATE");
event.setType("IM");
log.info("延时消息发送时间:{}", LocalDateTime.now());
rocketMQTemplate.syncSend(TOPIC_IM, MessageBuilder.withPayload(event).build(), 2000, 4);
}
}
延迟消息消费:
延迟消息消费和普通消息消费没有任何区别,这里主要是需要注意,一个消费者组只能消费一个队列,因此这里要修改队列名称和消费者组,用以区分原来的消费者。
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_im_delay", topic = "topic_im_delay")
public class ImDelayMessageConsumer implements RocketMQListener<ImMessageCreatedEvent> {
@Autowired
private ImMessageService imMessageService;
@Override
public void onMessage(ImMessageCreatedEvent imMessageCreatedEvent) {
log.info("Receive message: 【{}】【{}】", LocalDateTime.now(), JSON.toJSONString(imMessageCreatedEvent));
imMessageService.saveImMessage(imMessageCreatedEvent.getMessage());
}
}
2、客服系统消息过滤场景:
使用客服工单创建应用场景来做消息过滤的演示。
带有tag的消息发送:
消息发送不再是单单使用Topic,而是使用Topic:tag的方式。
@Slf4j
@Component
public class CustomerStaffChangedTagEventProducer {
private final String TOPIC_STAFF = "topic_staff_tag";
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendCustomerStaffChangedEvent(CustomerStaff customerStaff, String operation){
CustomerStaffEventDTO customerStaffEventDTO = new CustomerStaffEventDTO();
customerStaffEventDTO.setId(customerStaff.getId());
customerStaffEventDTO.setStaffName(customerStaff.getStaffName());
customerStaffEventDTO.setAccountId(customerStaff.getAccountId());
customerStaffEventDTO.setPhone(customerStaff.getPhone());
CustomerStaffChangedEvent event = new CustomerStaffChangedEvent();
event.setType("STAFF");
event.setOperation(operation);
event.setMessage(customerStaffEventDTO);
log.info("发送消息:【{}】", JSON.toJSONString(event));
String destination = String.format("%s:%s", TOPIC_STAFF, "STAFF");
rocketMQTemplate.convertAndSend(destination, event);
event.setType("OTHER");
log.info("发送消息:【{}】", JSON.toJSONString(event));
destination = String.format("%s:%s", TOPIC_STAFF, "OTHER");
rocketMQTemplate.convertAndSend(destination, event);
}
}
带有tag的消息消费:
消费逻辑除了要调整消费者组、topic外,最主要是加了selectorExpression。
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_staff_tag", topic = "topic_staff_tag", selectorExpression = "STAFF")
public class CustomerStaffTagConsumer implements RocketMQListener<CustomerStaffChangedEvent> {
@Autowired
private ILocalCustomerStaffService localCustomerStaffService;
@Override
public void onMessage(CustomerStaffChangedEvent message) {
System.out.println("Received message : " + message);
CustomerStaffEventDTO dto = message.getMessage();
LocalCustomerStaff localCustomerStaff = new LocalCustomerStaff();
convertLocalCustomerStaff(dto, localCustomerStaff);
String operation = message.getOperation();
if(operation.equals("CREATE")) {
localCustomerStaffService.insertLocalCustomerStaff(localCustomerStaff);
} else if(operation.equals("UPDATE")) {
localCustomerStaffService.updateLocalCustomerStaff(localCustomerStaff);
} else if(operation.equals("DELETE")) {
localCustomerStaffService.deleteLocalCustomerStaff(localCustomerStaff);
}
}
private void convertLocalCustomerStaff(CustomerStaffEventDTO dto, LocalCustomerStaff localCustomerStaff) {
localCustomerStaff.setStaffId(dto.getId());
localCustomerStaff.setStaffName(dto.getStaffName());
localCustomerStaff.setAccountId(dto.getAccountId());
localCustomerStaff.setPhone(dto.getPhone());
}
}
标签:event,08,public,发送,void,消息,final,分布式 From: https://www.cnblogs.com/liconglong/p/17523561.html