一、RocketMQ 核心的四大组件:
Producer:就是消息生产者,可以集群部署。它会先和 NameServer 集群中的随机一台建立长连接,得知当前要发送的 Topic 存在哪台 Broker Master上,然后再与其建立长连接,支持多种负载平衡模式发送消息。
Consumer:消息消费者,也可以集群部署。它也会先和 NameServer 集群中的随机一台建立长连接,得知当前要消息的 Topic 存在哪台 Broker Master、Slave上,然后它们建立长连接,支持集群消费和广播消费消息。
Broker:主要负责消息的存储、查询消费,支持主从部署,一个 Master 可以对应多个 Slave,Master 支持读写,Slave 只支持读。Broker 会向集群中的每一台 NameServer 注册自己的路由信息。
NameServer:类似Zookeeper,是一个很简单的 Topic 路由注册中心,支持 Broker 的动态注册和发现,保存 Topic 和 Borker 之间的关系。通常也是集群部署,但是各 NameServer 之间不会互相通信, 各 NameServer 都有完整的路由信息,即无状态。
二、rocketmq基本工作流程:
1、先启动 NameServer 集群,各 NameServer 之间无任何数据交互,Broker在启动的时候会注册自己配置的Topic信息到NameServer集群的每一台机器中。即每一个NameServer均有该broker的Topic路由配置信息,并向所有 NameServer 定期(每 30s)发送心跳包,包括:IP、Port、TopicInfo;NameServer 也会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 相关信息,代表下线。
2、这样每个 NameServer 就知道集群所有 Broker 的相关信息,此时 Producer 上线会根据配置文件中的NameServer 地址自动连接一个NameServer ;每 30s 会从连接的 NameServer 获取 Topic 和 Broker 的映射关系存在本地内存中,从 NameServer 就可以得知它要发送的某 Topic 消息在哪个 Broker 上,和对应的 Broker (Master 角色的)建立长连接,发送消息。
3、Consumer 上线也可以从 NameServer 得知它所要接收的 Topic 是哪个 Broker ,和对应的 Master、Slave 建立连接,接收消息。
可以理解为如下:
name server:注册中心broker:消息处理
procucer:生成消息
consumer:消费消息
每个组件都可以部署成集群模式进行水平扩展。
消息由topic区分消息类型(一级分类):如订单消息,物流消息等
tag为二级分类
message queue为消息类型下的消息队列。
用于并行发送和接受消息。
四、基础
分布式事务:
对于分布式事务,通俗地说就是,一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证操作结果的一致性。
事务消息:
RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式。
半事务消息:
暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息。
本地事务状态:
Producer回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认令。
// 描述本地事务执行状态 public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
}
RocketMQ中的消息回查设置:
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如:
transactionTimeout=20,指定TM在 20 秒内应将最终确认状态发送给TC,否则引发消息回查。默认为 60 秒
transactionCheckMax=5,指定最多回查 5 次,超过后将丢弃消息并记录错误日志。默认 15 次。
transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为 10 秒。默认为 60 秒。
五、Topic与Broker的关系:
- Borker中有一个或多个Topic
- Topic中有一个或多个MessageQueue
Topic可以自动创建和手动创建;
1、手动创建也叫预先创建,就是在使用Topic之前就创建,可以通过命令行或者通过RocketMQ的管理界面(可视化控制台)创建Topic。
/**
* 创建topic,参数分别是:borker的名称,topic的名称,queue的数量
* broker要和虚拟机broker.conf配置文件中brokername的名字一致
* newTopic的名字随便起,queueNum8的意思是新建的消息队列数为8个
*/
producer.createTopic("broker_haoke_im","my-topic",8);
2、自动创建就是设置了autoCreateTopicEnable =true;
TBW102 是啥用的?就是一个接受自动创建主题的 Broker, 启动会把这个默认Topic(主题)的Broker登记到 NameServer,这样当 Producer 发送新 Topic 的消息时候就得知哪个 Broker 可以自动创建主题,然后发往那个 Broker。
而 Broker 接受到这个消息的时候发现没找到对应的主题,但是它接受创建新主题,这样就会创建对应的 Topic 路由信息。
假设此时发送方还在连续快速的发送消息,那 NameServer 上其实还没有关于这个 Topic 的路由信息,所以有机会让别的允许自动创建的 Broker 也创建对应的 Topic 路由信息,这样集群里的 Broker 就能接受这个 Topic 的信息,达到负载均衡的目的,但也有个别 Broker 可能,没收到。
如果发送方这一次发了之后 30s 内一个都不发,之前的那个 Broker 随着心跳把这个路由信息更新到 NameServer 了,那么之后发送该 Topic 消息的 Producer 从 NameServer 只能得知该 Topic 消息只能发往之前的那台 Broker ,这就不均衡了,如果这个新主题消息很多,那台 Broker 负载就很高了。
所以不建议线上开启允许自动创建主题,即 autoCreateTopicEnable 参数。
Tags的使用
tag(标签): 标签可以被认为是对topic的进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。区分相同topic下不同种类的消息。生产到哪个topic的哪个tag下,消费者也是从topic的哪个tag进行消费,即实现消息的过滤。
建议一个应用一个 Topic,利用 tages 来标记不同业务,因为 tages 设置比较灵活,且一个应用一个 Topic 很清晰,能直观的辨别。
Keys的使用
如果有消息业务上的唯一标识,请填写到 keys 字段中,方便日后的定位查找。
queue(队列): queue是消息的物理管理单位,而topic是逻辑管理单位。一个topic下可以有多个queue,默认自动创建是4个,手动创建是8个
六、下面以windows服务器为例演示使用rocketmq如下:
1、下载rocketmq的安装包:https://rocketmq.apache.org/zh/download
2、下载rocketmq仪表盘(也就是可视化操作界面,是一个完整的java项目可以用idea运行)
3、修改conf/broker.conf配置在末尾添加如下配置(IP使用自己的),并保存。
brokerIP1=192.168.31.199
namesrvAddr=192.168.31.199:9876
4、配置ROCKET_HOME环境变量,路径使用下载路径;path中配置%ROCKET_HOME%\bin即可
5、启动Namesrv
在rocketmq文件的bin目录下,进入cmd使用如下命令:start mqnamesrv.cmd
6、启动Broker:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true (也就是说,producer使用RocketMQTemplate
发送的消息,就算Booker上的topic之前不存在,rocket也会帮我们创建好)
7、将仪表盘项目导入idea,然后打开application.properties文件修改rocket.config.namesrvAddr=localhost:9876;
8、启动仪表盘项目:浏览器输入http://localhost:8080/#/即可看到可视化界面;
9、java代码创建生产者和消费者:
创建普通springboot项目,添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
10、修改配置文件
# 应用名称
spring:
application:
name: rocket-producer
# 应用服务 WEB 访问端口
server:
port: 8002
rocketmq:
name-server: localhost:9876
producer:
group: my-group
11、创建测试代码
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class SendMessage {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Scheduled(fixedRate = 5000)
public void run(){
//发送消息
rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
}
}
12、创建消费者项目(同上)
消费端测试代码:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
class MyConsumer1 implements RocketMQListener<String> {
/**
*需要注意的是,onMessage()封装了ACK机制,消费者往外抛异常时,RocketMQ认为消费失败,重新发送该条消息,否则默认消费成功
*/
@Override
public void onMessage(String s) {
System.out.println(s);
}
}