RocketMQ高可用机制
集群部署模式
1. 单master模式
2. 多master模式
配置
配置文件broker.properties的brokerClusterName需要保持一致
brokerId需要为0,0代表为0
优缺点
优点:多master集群,一个topic在每个master中都有,相当于对topic进行了横向扩展。当有很多生产者往topic中发送消息时,可以负载到多个master节点上,提高写入数据的效率。
缺点:如果某个master宕机,则这个master上的数据将不可用。根本原因还是没有对master上的数据进行主从备份的原因。多个master节点各自存着自己的数据,不会相互备份。
3. 多master多slave模式
上面多master模式提到,没有slave节点的话,master节点的数据很容易造成丢失。所以,我们需要给每个master节点配备slave节点,就形成了多master多slave模式。生产者发送消息至master,master再copy消息至slave节点,进行消息的备份。在copy数据时,又分为同步复制和异步复制两种模式。
异步复制:
生产者发送消息至master后,master就会反馈给生产者消息发送成功。master会单独开启一个线程,将数据copy到slave中。这样造成的风险就是如果master挂了,而数据还没有copy到slave节点,就会造成数据的丢失。
同步复制:
生产者发送消息至master节点后,master节点copy数据到slave节点。当copy成功时,master节点才会告诉生产者,消息发送成功。这样保证了数据的备份,但是会影响性能。为了高可用,还是要选择这种模式,因为选用异步模式,根本没解决多master模式所造成的问题。
结论:
RocketMQ集群模式推荐 使用多主多从同步模式。生产者发送消息第一时间是存入内存中的,要持久化消息,需要刷盘刷到磁盘中。我们做了同步复制消息后,就可以采用异步刷盘的方式,将消息进行持久化。这样避免刷盘对性能造成的影响。
在集群中,NameServer之间是没有通信的,多个NameServer之间只是 数据的备份。所以NameServer服务器选两台就够了
刷盘与主从同步
- 同步刷盘与异步刷盘
- 同步复制与异步复制
一般场景会使用同步复制和异步刷盘
主从复制快,同时不会丢
消息并发度怎么解决
要解决消费并发,就要利用Queue,一个topic可以分出多个Queue,没一个queue可以放到不同的硬件上提高并发。
消息存储结构
- commitlog 消息存储目录
- config 运行期间一些配置信息
- consumerqueue 消息消费队列存储目录
- index 消息索引文件存储目录
- abort 如果存在文件则Broker非正常关闭
- checkpoint 文件检查点
- 存储commitLog文件最后一次刷盘时间戳
- consumerqueue最后一次刷盘时间
- index 索引文件最后一次刷盘时间戳
过期文件删除
非当前写文件在一定时间间隔没有再次被更新,默认42小时,然后被删除
零拷贝与MMAP
在计算机操作执行中,CPU不需要先将数据从某处内存复制到另一个特定区域.
这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽
RocketMQ分布式事务
代码:
public class TransactionProducer {
public static final String PRODUCER_GROUP = "please_rename_unique_group_name";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TopicTest1234";
public static final int MESSAGE_COUNT = 10;
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < MESSAGE_COUNT; i++) {
try {
Message msg =
new Message(TOPIC, tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
tips:
半事务 只存在commitLog 但是不构建索引
执行本地事务 当返回unkonow时执行事务回查
事务回查,默认30s,在事务回查中服务挂掉,会命中其他相同producerGroup进行事务回查
消息生产的默认选择队列策略
-
选择队列
获取这个topic的路由信息,位于哪个broker,哪个queue
-
往路由信息的的指定服务器发送
-
判断是否发送成功,是则结束
-
否->判断是否超过重试次数(默认是2)
如果没有超过重试次数,默认选择规避策略,选择其他broker的队列
故障延迟机制策略
记录Broker发送时长,根据公式算出故障规避时间
生产与消费的负载均衡
-
producer
Roundbin方式轮训
-
consumer
默认平均值 c1:1,2 c2:3,4 c3:5,6
ByCircle c1:1 c2:2 c3:3 c1:4 c2:5 c3:6
死信队列
当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
正常情况下无法被消费的消息称为 死信消息(Dead-Letter Message),存储死信消息的特殊队列称为 死信队列(Dead-Letter Queue)。
对于 无序消息集群消费 下的重试消费,默认允许每条消息最多重试 16 次,如果消息重试 16 次后仍然失败,消息将被投递至 死信队列
特征
- 不会再被消费者正常消费
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除
特性
- 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。名称为 %DLQ%consumerGroup@consumerGroup
- 如果一个 Group ID 未产生死信消息,则不会为其创建相应的死信队列
- 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic
面试题
1.为什么使用消息队列?
解耦 异步 削峰 结合项目业务场景回答
2.消息队列有什么优点和缺点?
-
优点 解耦 异步 削峰
-
缺点
-
系统的可用性降低
-
复杂性提高
-
一致性问题
-
3.为什么选择RocketMQ?
-
性能
经受住阿里 天猫的考验,可用性高;性能高;易拓展
-
功能
功能完善 事务消息 消息重试 死信队列 定时消息
-
易用
跨平台,跨语言.多协议接入