首页 > 其他分享 >【RocketMQ】消息的发送过程之 Broker 故障延迟或者容错机制

【RocketMQ】消息的发送过程之 Broker 故障延迟或者容错机制

时间:2024-10-29 19:31:25浏览次数:5  
标签:队列 Broker private public 容错 mq final RocketMQ

1  前言

上节我们主要看了下消息生产者的启动以及消息的发送过程,内容比较多,篇幅比较长,有一些细节没看到,比如 Broker 的故障延迟机制,所以这节我们就单独来看一下这块内容。

还有我们要知道的是,这个机制默认是关闭的:

// ClientConfig
/**
 * 开启消息发送的客户端容错机制
 * Enable the fault tolerance mechanism of the client sending process.
 * DO NOT OPEN when ORDER messages are required. 顺序消息不要开这个
 * Turning on will interfere with the queue selection functionality, 作用在选择消息队列的时候
 * possibly conflicting with the order message. 可能会跟顺序消息产生冲突
 * SEND_LATENCY_ENABLE = com.rocketmq.sendLatencyEnable
 * START_DETECTOR_ENABLE = com.rocketmq.startDetectorEnable
 */
private boolean sendLatencyEnable = Boolean.parseBoolean(System.getProperty(SEND_LATENCY_ENABLE, "false"));
private boolean startDetectorEnable = Boolean.parseBoolean(System.getProperty(START_DETECTOR_ENABLE, "false"));

2  选择消息队列

DefaultMQProducerImpl 在选择消息队列的时候,是交给了 MQFaultStrategy 来处理:

// DefaultMQProducerImpl#selectOneMessageQueue 选择i消息队列
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName, resetIndex);
}

也就是我们上节看到的这里:

// MQFaultStrategy#selectOneMessageQueue
// tpInfo tryToFindTopicPublishInfo获取的路由信息
// lastBrokerName 就是上一次选择的执行发送消息失败的Broker名称 启用 Broker 故障延迟机制用到
// resetIndex 第一次发送为false 重试发送为true
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
    // 获取当前线程的 Broker 过滤器
    BrokerFilter brokerFilter = threadBrokerFilter.get();
    // 重置当前线程的 Broker 过滤器
    brokerFilter.setLastBrokerName(lastBrokerName);
    // 是否开启Broker 故障延迟机制 默认关闭
    if (this.sendLatencyFaultEnable) {
        // 重置索引 重置 TopicPublishInfo内部的sendWhichQueue队列的索引
        if (resetIndex) {
            tpInfo.resetIndex();
        }
        // 尝试选择一个满足可用性过滤器和Broker过滤器的消息队列
        MessageQueue mq = tpInfo.selectOneMessageQueue(availableFilter, brokerFilter);
        if (mq != null) {
            return mq;
        }
        // 如果上述选择失败,尝试选择一个满足可访问性过滤器和Broker过滤器的消息队列
        mq = tpInfo.selectOneMessageQueue(reachableFilter, brokerFilter);
        if (mq != null) {
            return mq;
        }
        // 如果都选择失败,退而求其次,选择任意一个消息队列
        return tpInfo.selectOneMessageQueue();
    }
    // 如果未启用Broker 故障延迟机制,则直接使用Broker过滤器选择消息队列
    MessageQueue mq = tpInfo.selectOneMessageQueue(brokerFilter);
    if (mq != null) {
        return mq;
    }
    // 如果选择失败,退而求其次,选择任意一个消息队列
    return tpInfo.selectOneMessageQueue();
}

这块就是选择消息队列的核心思想:

对于不开启容错机制的,先根据当前线程的上次执行异常的 Broker 名称过滤筛选一个消息队列,当然 lastBrokerName 也可以是空的(比如第一次发送的时候),是空的话,那就跟下边的空参的 selectOneMessageQueue 类似,随机选择一个消息队列即可。

对于开启了容错机制的,它大概有四步:

(1)重试会重置当前线程的 index 

(2)根据 availiableFilter  + 名称过滤器 筛选消息队列

(3)如果步骤2为空,再次根据 reachableFilte + 名称过滤器r 筛选消息队列

(4)如果步骤3为空,进入兜底空参的 selectOneMessageQueue 随机选择一个消息队列。

可以看到两种方式最后的兜底其实都一样的,无论如何都要选择一个消息队列哈。然后对于开启了容错机制的,会有两次过滤,也是我们本节要理解的重点。

3  容错机制

3.1  核心类

我们先回顾下容错机制相关的核心类:

MQFaultStrategy:选择消息队列入口主逻辑控制以及内部的latencyFaultTolerance起到衔接筛选队列的作用

LatencyFaultTolerance 接口:维护容错信息筛选队列 实现类是 LatencyFaultToleranceImpl 内部的faultItemTable 维护当前出现故障的信息

FaultItem 故障项类:故障明细实例

TopicPublishInfo:从自己当前的队列中以及筛选器过滤出符合条件的消息队列

我这里画了一下这几个类的执行过程:

要理解好执行流程,就要理解好每个核心类的核心属性的由来很重要,接下来我们看看。

3.2  TopicPublishInfo

根据过滤器筛选队列的方法中,涉及到 messageQueueList 和 sendWhichQueue 两个属性,看看它的由来。

// TopicPublishInfo#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(QueueFilter ...filter) {
    return selectOneMessageQueue(this.messageQueueList, this.sendWhichQueue, filter);
}

我们先看看 messageQueueList 属性,TopicPublishInfo 是 tryToFindTopicPublishInfo 方法来的,也就是根据某个 Topic 获取它的路由信息。而远程请求的响应结果是 TopicRouteData 类型的,它是经过 MQClientInstance#topicRouteData2TopicPublishInfo 方法进行转换得到的,

// MQClientInstance#topicRouteData2TopicPublishInfo
// 用于将请求的路由信息 TopicRouteData 转换为生产者适用的 TopicPublishInfo
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
    // List<MessageQueue> messageQueueList = new ArrayList<>();
    TopicPublishInfo info = new TopicPublishInfo();
    info.setTopicRouteData(route);

    if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
        // 处理有序主题配置
        ...
    } else if (route.getOrderTopicConf() == null
        && route.getTopicQueueMappingByBroker() != null
        && !route.getTopicQueueMappingByBroker().isEmpty()) {
        // 处理静态主题配置
        ...
    } else {
        // 处理普通主题配置 我们主要看这里
        // 当前 Topic 的队列信息
        List<QueueData> qds = route.getQueueDatas();
        // 排序下(根据里边的 Broker 名称升序排序
        Collections.sort(qds);
        // 循环处理
        for (QueueData qd : qds) {
            // 队列是可写的话 计算公式:(perm & 2) == 2
            if (PermName.isWriteable(qd.getPerm())) {
                BrokerData brokerData = null;
                // 找这个队列的 Broker 信息
                for (BrokerData bd : route.getBrokerDatas()) {
                    if (bd.getBrokerName().equals(qd.getBrokerName())) {
                        brokerData = bd;
                        break;
                    }
                }
                // 没有的话 忽略掉这个队列
                if (null == brokerData) {
                    continue;
                }

                // broker 没有 master 信息 也忽略这个队列
                if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                    continue;
                }
                // 遍历该队列可写的队列数  默认 4个
                for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                    // 创建队列信息
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    // 放进 info 对象的 messageQueueList 集合中
                    info.getMessageQueueList().add(mq);
                }
            }
        }
        // 不是顺序消息的 Topic
        info.setOrderTopic(false);
    }

    return info;
}

好了,也就是获取到该 Topic 在 Broker 中的分布情况,然后根据队列的数量,创建出来对应的消息队列对象,最后统一存放到 TopicPublishInfo 的 messageQueueList 集合中。

再看下 sendWhichQueue 属性,它是 volatile 修饰的成员变量,也是随着实例化创建出来的:

public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    private List<MessageQueue> messageQueueList = new ArrayList<>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
}

再看下 ThreadLocalIndex 类:

public class ThreadLocalIndex {
    // 本地线程变量
    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<>();
    // 随机数
    private final Random random = new Random();
    private final static int POSITIVE_MASK = 0x7FFFFFFF;

    public int incrementAndGet() {
        // 获取当前线程的计数器
        Integer index = this.threadLocalIndex.get();
        // 第一次为空的话 随机数
        if (null == index) {
            index = random.nextInt();
        }
        // 设置到本地线程去
        this.threadLocalIndex.set(++index);
        // 与 POSITIVE_MASK 取模
        return index & POSITIVE_MASK;
    }
    
    // 重置
    public void reset() {
        // 获取随机数
        int index = Math.abs(random.nextInt(Integer.MAX_VALUE));
        if (index < 0) {
            index = 0;
        }
        // 设置进去
        this.threadLocalIndex.set(index);
    }
}

messageQueueList 存放当前 Topic 分布在所有 Broker 上的队列信息,比如有两个 Broker 都有分布吗,每个 Broker 创建 4个队列存储消息,那么这个集合里有8条数据,每个 Broker 个 4个消息队列。

sendWhichQueue:是一个存放在本地线程的随机数

3.3  FaultItem 故障明细

我们看下这个类:

public class FaultItem implements Comparable<FaultItem> {
    // broker 名称也就是哪个 broker 故障了
    private final String name;
    // 当前的延迟时间
    private volatile long currentLatency;
    // 恢复时间 也就是到哪个时间戳时就不故障了  System.currentTimeMillis() > startTimestamp 说明就不故障了
    private volatile long startTimestamp;
    // 检查时间 搭配 detect 使用 也就是背后有个线程来检查这些故障是不是好了  LatencyFaultToleranceImpl有调度线程池 下边会看
    private volatile long checkStamp;
    // 是否可达
    private volatile boolean reachableFlag;
    public FaultItem(final String name) {
        this.name = name;
    }
    // 更新延迟时间
    public void updateNotAvailableDuration(long notAvailableDuration) {
        // 当前时间戳 + 延迟时间 大于上次设置的恢复时间  说明是需要延长恢复时间
        if (notAvailableDuration > 0 && System.currentTimeMillis() + notAvailableDuration > this.startTimestamp) {
            // 重新设置恢复时间
            this.startTimestamp = System.currentTimeMillis() + notAvailableDuration;
            log.info(name + " will be isolated for " + notAvailableDuration + " ms.");
        }
    }
    ...
}

name、currentLatency、startTimestamp应该比较好理解,我i们看下 checkStamp 和 reachableFlag 两个属性值的由来。

首先看 checkStamp,它是跟检查相关的,在 LatencyFaultToleranceImpl 类里有个 startDetector 方法,提交一个调度任务,用来检查这些故障明细:

// LatencyFaultToleranceImpl#startDetector
// private volatile boolean startDetectorEnable = false;
// private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
//     @Override
//     public Thread newThread(Runnable r) {
//         return new Thread(r, "LatencyFaultToleranceScheduledThread");
//     }
// });
public void startDetector() {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                // 是否开启了检测  默认不开启
                if (startDetectorEnable) {
                    // 检查
                    detectByOneRound();
                }
            } catch (Exception e) {
                log.warn("Unexpected exception raised while detecting service reachability", e);
            }
        }
    // 每隔 3秒 执行一趟
    }, 3, 3, TimeUnit.SECONDS);
}
// private int detectTimeout = 200; 检查超时时间
// private int detectInterval = 2000; 检查间隔时间 默认 2秒
// private final ServiceDetector serviceDetector; 检查的方法
public void detectByOneRound() {
    // 遍历
    for (Map.Entry<String, FaultItem> item : this.faultItemTable.entrySet()) {
        FaultItem brokerItem = item.getValue();
        // checkStamp 发挥作用了 初始为0 后续每检查一次重置检查时间
        if (System.currentTimeMillis() - brokerItem.checkStamp >= 0) {
            // 重置检查时间 = 当前时间 + 检查间隔时间 2秒
            brokerItem.checkStamp = System.currentTimeMillis() + this.detectInterval;
            // 获取到该 Broker 的地址
            String brokerAddr = resolver.resolve(brokerItem.getName());
            // 如果为空 直接移除 说明 Broker 不存在了都
            if (brokerAddr == null) {
                faultItemTable.remove(item.getKey());
                continue;
            }
            // 如果没设置检查 那都不知道怎么检查 还检查啥 不检查了
            if (null == serviceDetector) {
                continue;
            }
            // 判断是不是好了
            boolean serviceOK = serviceDetector.detect(brokerAddr, detectTimeout);
            // 如果好了的话 设置故障明细的 reachableFlag = true 表示可以给我发消息了
            if (serviceOK && !brokerItem.reachableFlag) {
                log.info(brokerItem.name + " is reachable now, then it can be used.");
                brokerItem.reachableFlag = true;
            }
        }
    }
}

那我们顺便看下 serviceDetector 以及启动故障发现 startDetector 的入口:

serviceDetector 是由实例化 MQFaultStrategy 的时候传进来的:

public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector serviceDetector) {
    this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher, serviceDetector);
    this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval());
    this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout());
    this.setStartDetectorEnable(cc.isStartDetectorEnable());
    this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
}

而 MQFaultStrategy 的实例化又是在实例化 DefaultMQProducerImpl 的时候:

// DefaultMQProducerImpl
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    ...
    // 检查
    ServiceDetector serviceDetector = new ServiceDetector() {
        @Override
        public boolean detect(String endpoint, long timeoutMillis) {
            Optional<String> candidateTopic = pickTopic();
            if (!candidateTopic.isPresent()) {
                return false;
            }
            try {
                MessageQueue mq = new MessageQueue(candidateTopic.get(), null, 0);
                mQClientFactory.getMQClientAPIImpl()
                        .getMaxOffset(endpoint, mq, timeoutMillis);
                return true;
            } catch (Exception e) {
                return false;
            }
        }
    };
    // 实例化 MQFaultStrategy
    this.mqFaultStrategy = new MQFaultStrategy(defaultMQProducer.cloneClientConfig(), new Resolver() {
        @Override
        public String resolve(String name) {
            return DefaultMQProducerImpl.this.mQClientFactory.findBrokerAddressInPublish(name);
        }
    }, serviceDetector);
}

那我们再看看 startDetector 的入口,相通的,是在启动生产者的时候,通过调用 MQFaultStrategy 的 startDetector方法继而调用 LatencyFaultToleranceImpl 的 startDetector 方法。

// DefaultMQProducerImpl#start
public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            ...
            if (startFactory) {
                mQClientFactory.start();
            }
            // 启动 MQFaultStrategy 的检查
            this.mqFaultStrategy.startDetector();
            ...
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    RequestFutureHolder.getInstance().startScheduledTask(this);
}
// MQFaultStrategy#startDetector
public void startDetector() {
    this.latencyFaultTolerance.startDetector();
}

看完 checkStamp,它是配合检查的调度任务及时移除已经恢复的故障明细,以及启动故障检查的入口,我们继续看下 reachableFlag 的变动由来,它的值的变动是经过 updateFaultItem 方法来变化的:

// LatencyFaultToleranceImpl#updateFaultItem
// name broker名称
// currentLatency 延迟时间
// notAvailableDuration 不可用时间
// reachable 是否可达
// private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration, final boolean reachable) {
    // map 中看有没有当前 broker 的信息
    FaultItem old = this.faultItemTable.get(name);
    // 如果不存在 直接创建对象放进去
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        faultItem.updateNotAvailableDuration(notAvailableDuration);
        faultItem.setReachable(reachable);
        old = this.faultItemTable.putIfAbsent(name, faultItem);
    }
    // 存在的话  直接更新
    if (null != old) {
        old.setCurrentLatency(currentLatency);
        old.updateNotAvailableDuration(notAvailableDuration);
        old.setReachable(reachable);
    }
    // 不可达 打印日志
    if (!reachable) {
        log.info(name + " is unreachable, it will not be used until it's reachable");
    }
}

那谁来调用 updateFaultItem 的呢?是由 MQFaultStrategy 来的:

// MQFaultStrategy#updateFaultItem
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation, final boolean reachable) {
    // 当开启了故障延迟的话
    if (this.sendLatencyFaultEnable) {
        // 计算延迟时间 isolation 为 true 说明需要延长 每次延长 10秒 为false 直接用参数 currentLatency
        long duration = computeNotAvailableDuration(isolation ? 10000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration, reachable);
    }
}
// 计算延迟时间
// private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L};
// private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 10000L, 30000L};
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i]) {
            return this.notAvailableDuration[i];
        }
    }
    return 0;
}

那它又是谁调用的呢? 是 DefaultMQProducerImpl 来的:

// DefaultMQProducerImpl#updateFaultItem
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation, boolean reachable) {
    this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable);
}

那它又是谁调用的呢?是在消息发送的核心方法里:

private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    ...
    // 获取当前 Topic 的路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        boolean resetIndex = false;
        for (; times < timesTotal; times++) {
            // 选择队列
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    // 1、都发送成功了 说明可达
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } catch (MQClientException e) {
                    endTimestamp = System.currentTimeMillis();
                    // 2、客户端异常 还没开始请求 也可达
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
                    log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (RemotingException e) {
                    endTimestamp = System.currentTimeMillis();
                    // 远程异常但不确定是不是 Broker 造成的 
                    if (this.mqFaultStrategy.isStartDetectorEnable()) {
                        // 3、如果开启了异常发现 设置为不可达 因为发现会让他恢复的
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false);
                    } else {
                        // 4、不明确 设置可达
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, true);
                    }
                    log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
                    if (log.isDebugEnabled()) {
                        log.debug(msg.toString());
                    }
                    exception = e;
                    continue;
                } catch (MQBrokerException e) {
                    endTimestamp = System.currentTimeMillis();
                    // 5、明确 Broker 异常 不可达
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false);
                    log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
                    if (log.isDebugEnabled()) {
                        log.debug(msg.toString());
                    }
                    exception = e;
                    if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
                        continue;
                    } else {
                        if (sendResult != null) {
                            return sendResult;
                        }
                        throw e;
                    }
                } catch (InterruptedException e) {
                    endTimestamp = System.currentTimeMillis();
                    // 6、中断异常  可达
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
                    log.warn("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
                    if (log.isDebugEnabled()) {
                        log.debug(msg.toString());
                    }
                    throw e;
                }
            } else {
                break;
            }
        }
        ...
        throw mqClientException;
    }
    validateNameServerSetting();
    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

总共大概 6个 地方会涉及到可达的更改。

好啦,关于 FaultItem 我们知道 checkStamp 是搭配 defect 来检查故障明细的,reachableFlag 的变动是在消息发送方法里根据当前的异常信息来做更新。

看了 TopicPublishInfo 以及 FaultItem 已经把 MQFaultStrategy 和 LatencyFaultToleranceImpl 都串着看了,就不看了哈。

4  小结

好啦,本节主要对选择消息队列时的 Broker 故障延迟机制的核心类进行了深入的了解以及他们的协同关系,有理解不对的地方还请指正哈。

标签:队列,Broker,private,public,容错,mq,final,RocketMQ
From: https://www.cnblogs.com/kukuxjx/p/18512035

相关文章

  • 【RocketMQ】路由中心 NameServer
    1  前言上节我们准备了源码以及环境的运行,其中我们启动的时候,会首先启动NameServer,那么这节我们先看下组件 NameServer,看看它主要是干什么的,在整个生产消费的链路中充当了什么角色,发挥着什么作用。2  NameServerRocketMQ路由管理、服务注册及服务发现的机制,NameServer......
  • 【RocketMQ】源码以及环境搭建
    1  前言本节我们开始看一下RocketMQ相关的东西,我们主要看一条链路,大致如下:(1)环境的搭建,源码的下载(2)消息的结构以及相关类可能也会看下消息的存储(3)消息的生产以及发送过程(4)消息的消费过程大概看着四方面的内容,本节主要看下源码的下载以及环境的搭建。在看之前,我们顺便回......
  • 【系统设计】构建容错系统的6种有效方法:确保系统稳定性的关键策略
    在当今高度互联和依赖技术的时代,系统的可靠性和稳定性对企业和用户来说至关重要。无论是电子商务平台、金融系统还是社交媒体应用,任何系统的宕机或故障都可能导致用户体验受损,甚至带来巨大的经济损失。容错系统作为提高系统可靠性的重要手段,能够在部分组件或模块出现故障......
  • RocketMQ 消息堆积了怎么解决
    目录引言消息堆积的原因RocketMQ的基本架构解决消息堆积的方法4.1扩大消费者规模4.2调整消息优先级4.3优化消费逻辑4.4消息重试与死信队列4.5监控与报警机制实现解决堆积的步骤5.1扩大消费者规模的配置5.2调整消息优先级的配置5.3优化消费逻辑的示例5.4......
  • 手把手Linux安装RocketMQ教程
    手把手Linux安装RocketMQ教程1.下载rocketmq安装包2.创建目录并将压缩包上传至服务器3.配置RocketMQ4.启动RocketMQ5.关闭RocketMQ6.测试RocketMQ7.mqadmin查看服务状态8.配置启动脚本1.namesrv脚本2.broker脚本3.单脚本启动4.单脚本停止待完善1.开启自启动配置2.安装ro......
  • 浅析RocketMQ
    SpringBoot引入RocketMQ快速构建单机RocketMQhttps://www.haveyb.com/article/3079参考这篇文章,快速构建单机RocketMQ项目引入jar包和配置<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter&......
  • 最新 Seata 集成了RocketMQ事务消息,Seata 越来越 牛X 了! yyds !
    文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+涨薪必备免费赠送:《尼恩技术圣经+高并发系列PDF》,帮你实现技术自由,完成职业升级,薪......
  • RocketMQ - 总结
    1.为什么要使用MQ,使用场景是什么异步:减少请求响应时间,实现非核心流程异步化(架构设计原则,能异步就不要同步)解耦:屏蔽异构平台的细节,生产者消费者可自行扩展修改系统能力只需遵循消息约束,生产者消费者不受对方影响流量削峰:消息堆积能力,消息保存在MQ中,消费端以稳定的速率拉......
  • TS - 运维中容错方式
    容错方式错误始终会出现,不可避免,但可以尽最大可能延迟发生和降低错误的影响。消除单点实现系统更高的可用性,首先要消除单点,通过负载均衡分配流量,部署多个业务服务,存多份数据。节点数越多,可用性就越高,根据实际情况避免浪费资源。特性开关实现简单的特性开关,通过配置文件或者......
  • 面试必备!值得收藏!不容错过的30+ 大语言模型面试问题及答案
    引言大语言模型(LLMs)现在在数据科学、生成式人工智能(GenAI,即一种借助机器自动产生新信息的技术)和人工智能领域越来越重要。这些复杂的算法提升了人类的技能,并在诸多行业中推动了效率和创新性的提升。近年来,大语言模型的发展飞速,在复杂数据分析和自然语言处理等任务中得到了......