首页 > 其他分享 >RocketMQ - 消费者Rebalance机制

RocketMQ - 消费者Rebalance机制

时间:2023-02-28 09:02:11浏览次数:48  
标签:消费者 MessageQueue final protected Rebalance public RocketMQ

客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、Topic 扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡,以支持全部队列的正常消费的呢?

RebalancePullImpl 和 RebalancePushImpl 两个重平衡实现类,分别被 DefaultMQPullConsumer 和DefaultMQPushConsumer 使用。下面讲一下 Rebalancelmpl 的核心属性和方法
image

核心属性

public abstract class RebalanceImpl {
    protected static final InternalLogger log = ClientLogger.getLog();
    //记录MessageQueue和ProcessQueue的关系。MessageQueue可以简单地理解为ConsumeQueue的客户端实现;ProcessQueue是保存Pull消息的本地容器
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
    //Topic 路由信息 。保存 Topic 和 MessageQueue的关系。
    protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();
    //真正的订阅关系,保存当前消费者组订阅了哪些Topic的哪些Tag
    protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
    protected String consumerGroup;
    protected MessageModel messageModel;
    //消费分配策略的实现
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    //client实例对象
    protected MQClientInstance mQClientFactory;
}

核心方法

public abstract class RebalanceImpl {
    //为MessageQueue加锁
    public boolean lock(final MessageQueue mq) {}
    //执行Rebalance操作
    public void doRebalance(final boolean isOrder) {}
    //通知Message发生变化,这个方法在Push和Pull两个类中被重写
    public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
                                             final Set<MessageQueue> mqDivided);
    //去掉不再需要的 MessageQueue
    public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
    //执行消息拉取请求
    public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
    //在Rebalance中更新processQueue
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                       final boolean isOrder)

}

Rebalancelmpl 、 RebalancePushImpl 、 RebalancePullImpl 是Rebalance的核心实现,主要逻辑都在Rebalancelmpl中,因为Pull消费者和Push消费者对Rebalance的需求不同,在各自的实现中重写了部分方法,以满足自身需求

如果有一个消费者实例下线了,Broker和其他消费者是怎么做Rebalance的呢
image

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }

    log.info(this.getServiceName() + " service end");
}

目前队列分配策略有以下5种实现方法

  • AllocateMessageQueueAveragely:平均分配,也是默认使用的策略(强烈推荐)。
  • AllocateMessageQueueAveragelyByCircle:环形分配策略。
  • AllocateMessageQueueByConfig:手动配置。
  • AllocateMessageQueueConsistentHash:一致性Hash分配。
  • AllocateMessageQueueByMachineRoom:机房分配策略

标签:消费者,MessageQueue,final,protected,Rebalance,public,RocketMQ
From: https://www.cnblogs.com/vipsoft/p/17150065.html

相关文章