顺序消息
生产端顺序生产
消费端顺序消费
一般都是局部顺序消息。生产端根据 shardingkey 对队列数量取模,把同一个 shardingkey 的消息发送到同一个队列
而消费端也要确保消费这个队列时是一个线程消费的
首先是 consumer 中注册的 Listener 来指定是顺序消息消费还是并发消费
public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("C-CLOUD-UPSTREAM-YSS"); consumer.subscribe("MEDIA_MESSAGE_UPSTREAM_YSS", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //wrong time format 2017_0422_221800 consumer.setConsumeTimestamp("20181109221800"); consumer.setNamesrvAddr("10.10.168.3:10812"); // MessageListenerOrderly 或 MessageListenerConcurrently consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
PushConsumer 拉取到的消息提交到线程池后,会根据注册的 Listener 类型来决定是由 ConsumeMessageConcurrentlyService 还是 ConsumeMessageOrderlyService 来处理
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; // 并发消费服务 this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } // 启动 并发消费/顺序消费 服务的定时任务线程池 this.consumeMessageService.start();
ConsumeMessageOrderlyService
广播模式下,订阅组下所有的队列都要被所有的消费者消费,所以不存在问题
集群模式下,要保证消费时的顺序性,就要 同一时刻,一个消费队列只能被一个消费者中的一个线程消费。用加锁来实现:
- 定时在 broker 对队列加 clientId 锁,保证rebalance时一个队列也只能被一个消费者消费
- consumer 端对 MessageQueue 加锁,保证当前线程占有该队列
- consumer 端对 ProcessQueue 加锁,保证当前线程占有该队列
注:
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 是当前消费者订阅的topic分配给当前消费者的所有队列
MessageQueue,ProcessQueue 存的都是当前消费者订阅的topic分配给当前消费者的队列,但是存的内容侧重点有所不同
MessageQueue: (topic,brokerName,queueId) 主要存队列与broker的关系;
ProcessQueue: 存队列在消费者端的一些状态等
1.定时在 broker 对队列加 clientId 锁,保证rebalance时一个队列也只能被一个消费者消费
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#start
初始化了一个定时线程池,然后在ConsumeMessageOrderlyService
启动的时候,创建了一个任务,1s执行一次,lockMQPeriodically
这个方法就是给当前这个客户端所消费的所有队列去borker进行上锁。
这里的客户端是指订阅了某个 topic 并且指定了顺序消费的客户端,全部队列是指这个 topic 下的所有队列中,分配给这个消费者的所有队列。
我们知道rocketmq集群模式一般都是队列粒度的负载均衡,已经是一个队列只能被一个消费者消费的,那为什么还要在 broker 对队列加上client 的锁呢?
分配给这个消费者的所有队列在发生 rebanlance 时可能会被指派给别的消费者,而在 rebalance 时可能会有两个消费者,所以要去 broker 上锁,保证 rebalance 前后只有获取了独占锁的消费者才可以消费。
public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } catch (Throwable e) { log.error("scheduleAtFixedRate lockMQPeriodically exception", e); } } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }
上锁过程:
- 将该消费者订阅的 topic 分配给当前消费者的所有队列,按照 broker 进行分组 HashMap<String, Set<MessageQueue>> brokerMqs 内容为 brockerName-该broker下该消费者消费的所有队列
- 遍历上面根据 broker 分组过的 brokerMqs ,按照不同的 broker,对一个 broker 下该消费者被分配的所有队列,一次性向这个 broker 申请加锁
- 如果加锁成功, ProcessQueue 的 isLocked 设为 true,否则设为 false
注:
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 是当前消费者订阅的topic分配给当前消费者的所有队列
MessageQueue,ProcessQueue 存的都是当前消费者订阅的topic分配给当前消费者的队列,但是存的内容侧重点有所不同
MessageQueue: (topic,brokerName,queueId) 主要存队列与broker的关系;
ProcessQueue: 存队列在消费者端的一些状态等
public abstract class RebalanceImpl { public void lockAll() { // ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 是当前消费者订阅的topic分配给当前消费者的所有队列 // MessageQueue: (topic,brokerName,queueId) 主要存队列与broker的关系; // ProcessQueue: 存队列在消费者端的一些状态等 // 用这个 processQueueTable 构建 brokerMqs: brockerName-该broker下该消费者消费的所有队列 HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName(); Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator(); // 遍历 map ,即根据 broker 对这个 broker 分配给这个消费者的所有队列,一次性发送加锁请求 while (it.hasNext()) { Entry<String, Set<MessageQueue>> entry = it.next(); final String brokerName = entry.getKey(); final Set<MessageQueue> mqs = entry.getValue(); if (mqs.isEmpty()) { continue; } // 找到 broker FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); if (findBrokerResult != null) { // 构建加锁请求体 LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); requestBody.setClientId(this.mQClientFactory.getClientId()); // 对这个 broker 分配给这个消费者的所有队列,一次性发送加锁请求 requestBody.setMqSet(mqs); try { // 向 broker 请求加锁,返回的 lockOKMQSet 是加锁成功的 MessageQueue Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); for (MessageQueue mq : mqs) { // ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 是当前消费者订阅的topic分配给当前消费者的所有队列 // MessageQueue: (topic,brokerName,queueId) 主要存队列与broker的关系; // ProcessQueue: 存队列在消费者端的一些状态等 ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { // 如果加锁成功 if (lockOKMQSet.contains(mq)) { if (!processQueue.isLocked()) { log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq); } // processQueue Locked 状态为 true processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } // 如果加锁失败 else { // processQueue Locked 状态为 false processQueue.setLocked(false); log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq); } } } } catch (Exception e) { log.error("lockBatchMQ exception, " + mqs, e); } } } } }
broker 端:
下面这段代码,对里面的逻辑进行整合概括就是:或者执行时不再包含被重新分配的队列,也就不会对这些已被自己加锁的队列进行续期。那新分配的消费者判断队列锁已经过期,就可以该队列加锁成功。
- 被当前请求的client加锁的队列:进行续期,更新最新加锁时间
- 没有被任何client加锁的队列:加上当前请求的client锁,更新最新加锁时间
- 被其它client加锁的队列:判断是否过期,如果过期,将加锁client替换为当前请求的client,并进行续期,更新最新加锁时间
上面的第三点就保证了,如果rebalance服务将某些队列分配了新的消费者,而旧的消费者不再每秒执行一次(挂掉时)定时去broker加队列锁任务lockMQPeriodically,
或者执行时请求的队列不再包含被重新分配的队列,也就不会对这些之前被自己加锁的队列进行续期。那新分配的消费者判断这些队列上的队列锁已经过期,就可以对它们加锁成功。
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) { Set<MessageQueue> lockedMqs = new HashSet<>(mqs.size()); Set<MessageQueue> notLockedMqs = new HashSet<>(mqs.size()); // mqs 是 requestBody 的客户端所有请求加锁的队列 for (MessageQueue mq : mqs) { // 进行分类: // this.isLocked 还会在里面对已经被clientId加锁了的,进行续期,更新最新加锁时间 if (this.isLocked(group, mq, clientId)) { // 已经被请求的client加锁的队列 lockedMqs.add(mq); } else { // 没有被请求的client加锁的队列,包括两类: // 1.没有被任何client加锁的队列 // 2.被其它client而不是请求的client加锁的队列 notLockedMqs.add(mq); } } if (!notLockedMqs.isEmpty()) { try { // 对下面这段代码加同步锁 this.lock.lockInterruptibly(); try { ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); if (null == groupValue) { groupValue = new ConcurrentHashMap<>(32); this.mqLockTable.put(group, groupValue); } // 遍历没有被请求的client加锁的队列,包括两类: // 1.没有被任何client加锁的队列 // 2.被其它client而不是请求的client加锁的队列 for (MessageQueue mq : notLockedMqs) { LockEntry lockEntry = groupValue.get(mq); // 队列没有被任何client锁上,加上属于该client的锁 if (null == lockEntry) { lockEntry = new LockEntry(); lockEntry.setClientId(clientId); groupValue.put(mq, lockEntry); log.info( "RebalanceLockManager#tryLockBatch: lock a message which has not been locked yet, " + "group={}, clientId={}, mq={}", group, clientId, mq); } // 1.没有被任何client加锁的队列,在上面被当前请求的client加锁成功 // 这种情况需要进行续期,更新最新加锁时间 if (lockEntry.isLocked(clientId)) { lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); lockedMqs.add(mq); continue; } String oldClientId = lockEntry.getClientId(); // 2.被其它client而不是请求的client加锁的队列 // 这种情况需要检查锁是否过期, 如果过期, 将client替换为当前请求的client,并更新加锁时间 if (lockEntry.isExpired()) { lockEntry.setClientId(clientId); lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); log.warn( "RebalanceLockManager#tryLockBatch: try to lock a expired message queue, group={}, " + "mq={}, old client id={}, new client id={}", group, mq, oldClientId, clientId); lockedMqs.add(mq); continue; } log.warn( "RebalanceLockManager#tryLockBatch: message queue has been locked by other client, " + "group={}, mq={}, locked client id={}, current client id={}", group, mq, oldClientId, clientId); } } finally { this.lock.unlock(); } } catch (InterruptedException e) { log.error("RebalanceLockManager#tryBatch: unexpected error, group={}, mqs={}, clientId={}", group, mqs, clientId, e); } } // 所以最后加锁成功的包含: // 1.没被任何client加锁,这里新被请求的client加锁 2.被其它client加锁,但是锁已经过期,这里将加锁的client替换为当前请求的client return lockedMqs; } private boolean isLocked(final String group, final MessageQueue mq, final String clientId) { ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); if (groupValue != null) { LockEntry lockEntry = groupValue.get(mq); if (lockEntry != null) { boolean locked = lockEntry.isLocked(clientId); if (locked) { // 对已经被当前请求的client加了锁的队列进行续期 lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); } return locked; } } return false; }
processQueue 在 broker 获得了队列锁之后,是怎么用的呢?
集群模式在 ConsumeMessageOrderlyService 消费消息时,会判断只有 this.processQueue.isLocked() 才会消费。也就是说消费者获取了这个锁,才能消费消息:
class ConsumeRequest implements Runnable { private final ProcessQueue processQueue; private final MessageQueue messageQueue; public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) { this.processQueue = processQueue; this.messageQueue = messageQueue; } public ProcessQueue getProcessQueue() { return processQueue; } public MessageQueue getMessageQueue() { return messageQueue; } @Override public void run() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || this.processQueue.isLocked() && !this.processQueue.isLockExpired()) { // ....... // 暂时省略中间的消费代码 } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } } }
2.消费时 consumer 对消费代码块加锁,保证同一时间队列只能被获得broker队列锁的消费者的一个线程消费
标签:加锁,队列,broker,client,mq,消息,processQueue,RocketMQ,延迟 From: https://www.cnblogs.com/suBlog/p/17611943.html