首页 > 其他分享 >RocketMQ高可用机制

RocketMQ高可用机制

时间:2023-03-07 13:22:05浏览次数:51  
标签:可用 队列 死信 master 消息 机制 节点 RocketMQ 刷盘

RocketMQ高可用机制

集群部署模式

1. 单master模式

2. 多master模式

配置

配置文件broker.properties的brokerClusterName需要保持一致

brokerId需要为0,0代表为0

优缺点

优点:多master集群,一个topic在每个master中都有,相当于对topic进行了横向扩展。当有很多生产者往topic中发送消息时,可以负载到多个master节点上,提高写入数据的效率。

缺点:如果某个master宕机,则这个master上的数据将不可用。根本原因还是没有对master上的数据进行主从备份的原因。多个master节点各自存着自己的数据,不会相互备份。

3. 多master多slave模式

上面多master模式提到,没有slave节点的话,master节点的数据很容易造成丢失。所以,我们需要给每个master节点配备slave节点,就形成了多master多slave模式。生产者发送消息至master,master再copy消息至slave节点,进行消息的备份。在copy数据时,又分为同步复制和异步复制两种模式。

异步复制:

生产者发送消息至master后,master就会反馈给生产者消息发送成功。master会单独开启一个线程,将数据copy到slave中。这样造成的风险就是如果master挂了,而数据还没有copy到slave节点,就会造成数据的丢失。

同步复制:

生产者发送消息至master节点后,master节点copy数据到slave节点。当copy成功时,master节点才会告诉生产者,消息发送成功。这样保证了数据的备份,但是会影响性能。为了高可用,还是要选择这种模式,因为选用异步模式,根本没解决多master模式所造成的问题。

结论:

RocketMQ集群模式推荐 使用多主多从同步模式。生产者发送消息第一时间是存入内存中的,要持久化消息,需要刷盘刷到磁盘中。我们做了同步复制消息后,就可以采用异步刷盘的方式,将消息进行持久化。这样避免刷盘对性能造成的影响。
在集群中,NameServer之间是没有通信的,多个NameServer之间只是 数据的备份。所以NameServer服务器选两台就够了

刷盘与主从同步

  1. 同步刷盘与异步刷盘
  2. 同步复制与异步复制

一般场景会使用同步复制和异步刷盘

主从复制快,同时不会丢

消息并发度怎么解决

要解决消费并发,就要利用Queue,一个topic可以分出多个Queue,没一个queue可以放到不同的硬件上提高并发。

消息存储结构

  • commitlog 消息存储目录
  • config 运行期间一些配置信息
  • consumerqueue 消息消费队列存储目录
  • index 消息索引文件存储目录
  • abort 如果存在文件则Broker非正常关闭
  • checkpoint 文件检查点
    • 存储commitLog文件最后一次刷盘时间戳
    • consumerqueue最后一次刷盘时间
    • index 索引文件最后一次刷盘时间戳

过期文件删除

非当前写文件在一定时间间隔没有再次被更新,默认42小时,然后被删除

零拷贝与MMAP

在计算机操作执行中,CPU不需要先将数据从某处内存复制到另一个特定区域.

这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽

RocketMQ分布式事务

代码:

public class TransactionProducer {

    public static final String PRODUCER_GROUP = "please_rename_unique_group_name";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TopicTest1234";

    public static final int MESSAGE_COUNT = 10;

    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);

        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
//        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            try {
                Message msg =
                    new Message(TOPIC, tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

tips:

  • 半事务 只存在commitLog 但是不构建索引

  • 执行本地事务 当返回unkonow时执行事务回查

  • 事务回查,默认30s,在事务回查中服务挂掉,会命中其他相同producerGroup进行事务回查

消息生产的默认选择队列策略

  • 选择队列

    获取这个topic的路由信息,位于哪个broker,哪个queue

  • 往路由信息的的指定服务器发送

  • 判断是否发送成功,是则结束

  • 否->判断是否超过重试次数(默认是2)

    如果没有超过重试次数,默认选择规避策略,选择其他broker的队列

故障延迟机制策略

记录Broker发送时长,根据公式算出故障规避时间

生产与消费的负载均衡

  • producer

    Roundbin方式轮训

  • consumer

    默认平均值 c1:1,2 c2:3,4 c3:5,6

    ByCircle c1:1 c2:2 c3:3 c1:4 c2:5 c3:6

死信队列

当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

正常情况下无法被消费的消息称为 死信消息(Dead-Letter Message),存储死信消息的特殊队列称为 死信队列(Dead-Letter Queue)。

对于 无序消息集群消费 下的重试消费,默认允许每条消息最多重试 16 次,如果消息重试 16 次后仍然失败,消息将被投递至 死信队列

特征

  • 不会再被消费者正常消费
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除

特性

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。名称为 %DLQ%consumerGroup@consumerGroup
  • 如果一个 Group ID 未产生死信消息,则不会为其创建相应的死信队列
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic

面试题

1.为什么使用消息队列?

解耦 异步 削峰 结合项目业务场景回答

2.消息队列有什么优点和缺点?

  • 优点 解耦 异步 削峰

  • 缺点

    1. 系统的可用性降低

    2. 复杂性提高

    3. 一致性问题

3.为什么选择RocketMQ?

  • 性能

    经受住阿里 天猫的考验,可用性高;性能高;易拓展

  • 功能

    功能完善 事务消息 消息重试 死信队列 定时消息

  • 易用

    跨平台,跨语言.多协议接入

Broker启动流程

消息发送者启动流程

并发消费流程

标签:可用,队列,死信,master,消息,机制,节点,RocketMQ,刷盘
From: https://www.cnblogs.com/Acaak/p/17187791.html

相关文章

  • 亲测可用,ChatGPT 对话技巧
     “Linux终端”“我希望你充当一个linux终端。我会输入命令,你会回复终端应该显示的内容。我希望你只回复一个唯一代码块内的终端输出,没有别的。不要写解释.除非我......
  • JVM内存回收机制
    JVM内存回收机制JVM内存回收机制标签:JVMGC垃圾回收内存管理  0.说明当JVM创建对象遇到内存不足的时候,JVM会自动触发垃圾回收garbagecollecting(简称GC)操作,......
  • MVCC 是什么?InnoDB 是如何实现 MVCC 机制的?
    概念MVCC全称Multi-VersionConcurrencyControl,多版本并发控制。指维护一个数据的多个版本,使得读写操作没有冲突。解决问题MVCC主要解决的问题是可以保证MySQL在读的......
  • 多头自注意力机制实现及代码
    注意力机制是一种在给定文本词向量中查找重要词,并赋予一定重要权值的机制。假设输入序列为X,三个随机初始的矩阵键值K(Key)、查询值Q(Query)和值V(Value)。当Query、K......
  • Reflection反射机制原理、使用场景 及 缺陷
    (目录)反射一个需求引出反射需求如下:根据配置文件re.properties中的指定信息,创建Cat对象并调用方法hi在配置文件中代码:classfullpath=com.panyujie.reflection.Cat,m......
  • Kubernetes的工作机制
    云计算时代的操作系统Kubernetes是一个生产级别的容器编排平台和集群管理系统,能够创建、调度容器,监控、管理服务器。Kubernetes的基本架构操作系统的一个重要功能就是......
  • java基础语法-包机制
    包机制定义包(为了更好的组织类,java提供了包机制,用于区别类名的命名空间。)包语法的语法格式packagepkg1[.pkg2[.pkg3···]];一半利用公司域名倒置作为包名......
  • 微信小程序结合php后台实现登录授权机制详解
    微信小程序应用的用户登录授权机制相当复杂,官方给出了下面一张流程图来解释:下面结合这张图来详细讲述下小程序的登录验证授权机制。首先,小程序应用实现登录验证的前提是......
  • 2.JavaScript如何实现异步编程,可以详细描述EventLoop机制
    单线程和异步js的任务分为同步和异步两种,它们的处理方式也不同,同步任务是直接在主线程上排队执行,异步任务则会被放在任务队列中,若有多个任务(异步任务)则要在任务队列中排......
  • 1.为何try里面放return,finally还会执行,理解其内部机制
    涉及到了一种数据类型:Completion,是js七大标准类型之一为何try里面放return,finally还会执行这种行为就是因为CompletionRecord,在js中,每条语句的执行完成状态都是由Comple......