首页 > 其他分享 >聊聊 RocketMQ 生产者

聊聊 RocketMQ 生产者

时间:2023-11-02 16:32:31浏览次数:31  
标签:顺序 生产者 发送 mq 消息 聊聊 final RocketMQ

这篇文章,我们从源码的角度探寻 RocketMQ Producer 的实现机制。

1 基础配置

我们先展示生产者发送消息的示例代码。

// 1. 初始化默认生产者,传递参数生产者组名
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
// 2. 设置名字服务地址 
producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
// 3. 启动生产者服务 
producer.start();
// 4. 定义消息对象 
Message msg = new Message(*TOPIC* /* Topic */,
        *TAG* /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.*DEFAULT_CHARSET*) /* Message body */
);
msg.setKeys("");
// 5. 发送消息
// 示例普通消息
SendResult sendResult = producer.send(msg);
// 示例异步回调
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // do something
    }
    @Override
    public void onException(Throwable e) {
        // do something
    }
});
// 示例oneway发送
producer.sendOneway(msg);

发送流程如下:

  1. 初始化默认生产者,传递参数生产者组名;
  2. 设置名字服务地址 ;
  3. 启动生产者服务;
  4. 定义消息对象 ;
  5. 生产者支持普通发送oneway 发送异步回调三种方式发送消息 。

2 发送消息流程

2.1 构造函数

下图展示了生产者DefaultMQProducer 类的构造函数,包装类 DefaultMQProducerImpl 是我们这一小节的核心。

构造函数包含两个部分:

  1. 初始化实现类 DefaultMQProducerImpl ;

  2. 根据是否开启消息轨迹参数 enableMsgTrace 判断是否增加消息轨迹逻辑 。

2.2 启动生产者

DefaultMQProducer 类的 start 方法,本质上是调用包装类 DefaultMQProducerImpl 的 start 方法。

进入 DefaultMQProducerImpl 类,查看该类的逻辑 。

01 检测配置

判断生产者组是否合法,生产者名称不能和默认生产者组名称相同。

02 创建客户端实例

MQClientInstance 对象通过 MQClientManager 这个单例类创建 ,标志着一个客户端实例,是非常核心的类,每一个实例对象有一个唯一的 clientId

  • 生产者表/消费者表引用

  • 路由信息

03 注册本地生产者

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

注册本地生产者的本质是修改客户端实例的生产者表引用:

MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);

04 启动客户端实例

实例启动后,会启动通讯模块、定时任务、负载均衡服务、消费者拉取服务。

下图展示了生产者发送消息时,IDEA 里的线程 DUMP 图:

我们需要重点讲讲定时任务 startScheduledTask方法 , 定时任务如下图:

我们重点关注发送心跳更新路由两个任务。

  • 发送心跳: 定时任务每隔 30 秒将客户端信息发送到 Broker 。

当 Broker 收到心跳请求之后,会通过生产者管理器 ProducerManager、消费者管理器ConsumerManager分别更新生产者客户端缓存、消费者客户端缓存。

  • 更新路由

对于生产者来讲,它需要知道需要发送消息的主题对应的路由信息 , 因此需要定时更新路由信息。

更新逻辑比较简单,首先从名字服务获取主题路由信息对象 topicRoute,然后更新 DefaultMQProducerImpl主题发布信息topicPublishInfoTable对象 。

2.3 发送消息

进入 DefaultMQProducerImpl 类,查看发送消息方法 sendDefaultImpl

笔者将发送消息流程简化如下:

  • 获取主题发布信息;

  • 根据路由算法选择一个消息队列,也就是 selectOneMessageQueue方法;

  • 调用 sendKernelImpl发放消息对象,封装成发送结果对象 sendResult

01 尝试获取主题发布信息

我们知道 MQClientInstance 的定时任务每隔30秒会更新生产者实现类的topicPublishInfoTable,但若第一次发送消息时,若缓存中无数据时候,还是要重新拉取一次。

02 根据路由算法选择一个消息队列

RocketMQ 存储模型包含三部分: 数据文件 commitlog消费文件 consumequeue索引文件 indexfile

因此根据 RocketMQ 的存储模型设计,**对于生产者来讲,发送消息时,必须指定该主题对应的队列。**路由算法,我们会在路由机制这一节重点讲解。

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

03 调用实例客户端 API 发送消息

通过路由机制选择一个 messageQueue 之后,调用实例客户端 API 发送消息。

Broker 端在收到发送消息请求后,调用处理器 SendMessageProcessor处理请求,处理完成后,将响应结果返回给生产者客户端,客户端将接收到的数据组装成 SendResult对象。

3 路由机制

进入DefaultMQProducerImpl#selectOneMessageQueue 方法:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

路由机制通过调用 MQFaultStrategyselectOneMessageQueue 方法 ,这里有一个 sendLatencyFaultEnable 开关变量,默认为 false 。

public class MQFaultStrategy {
    //省略部分代码 日志
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
    private boolean sendLatencyFaultEnable = false;
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    //省略部分代码 get/set方法 
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // 发送延迟错误策略
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            return tpInfo.selectOneMessageQueue();
        }
        // 默认策略
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }
    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }
        return 0;
    }
}

这里有两个逻辑分支 :

  1. sendLatencyFaultEnable 为 false , 通过 TopicPublishInfo 中的 messageQueueList 中选择一个队列(MessageQueue)进行发送消息 ;
  2. sendLatencyFaultEnable 为 true ,开启延迟容错机制

3.1 默认机制

// TopicPublishInfo 类
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}
public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.incrementAndGet();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
}

默认机制有两个要点:

  1. 循环遍历该主题下所有的队列 ;
  2. 若上一个失败的 Broker 参数值存在,需要过滤掉上一个失败的 Broker 。

3.2 延迟容错机制

所谓延迟容错机制,是指发送消息时,若某个队列对应的 Broker 宕机了,在默认机制下很可能下一次选择的队列还是在已经宕机的 broker ,没有办法规避故障的broker,因此消息发送很可能会再次失败,重试发送造成了不必要的性能损失。

因此 producer 提供了延迟容错机制来规避故障的 Broker 。

sendLatencyFaultEnable 开关为 true 时,在随机递增取模的基础上,代码逻辑会再去过滤掉 not available 的 Broker 。

if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
     return mq;

所谓的" latencyFaultTolerance ",是指对之前失败的,按一定的时间做退避。

例如,如果上次请求的latency超过 550Lms,就退避 3000Lms;超过1000L,就退避 60000L ;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance 机制是实现消息发送高可用的核心关键所在。

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

发送消息时捕捉到异常同样会调用 updateFaultItem 方法:

endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

endTimestamp - beginTimestampPrev等于消息发送耗时,如果成功发送第三个参数传的是 false ,发送失败传 true。

继续查看 MQFaultStrategy#updateFaultItem 源码:

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}
private long computeNotAvailableDuration(final long currentLatency) {
     for (int i = latencyMax.length - 1; i >= 0; i--) {
         if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
     }
     return 0;
}

computeNotAvailableDuration方法会判断当前消息发送耗时,位于哪一个延迟级别,然后选择对应的 duration 。

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

如果isolation 为 true,该 broker 会得到一个10分钟规避时长 ,也就是 600000L 毫秒 。

如果 isolation 为 false,假设 currentLatency 为 600L , 那么规避时间 30000L 毫秒。

查看 LatencyFaultToleranceImpl#updateFaultItem 源码:

public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    // 从缓存中获取失败条目
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        //若缓存中没有,则创建
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        // broker的开始可用时间=当前时间+规避时长
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        // 更新旧的失败条目
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}

FaultItem 为存储故障 broker 的类,称为失败条目,每个条目存储了 broker 的名称、消息发送延迟时长、故障规避开始时间。

该方法主要是对失败条目的一些更新操作,如果失败条目已存在,那么更新失败条目,如果失败条目不存在,那么新建失败条目,其中失败条目的startTimestamp为当前系统时间加上规避时长,startTimestamp 是判断 broker 是否可用的时间值:

public boolean isAvailable() {
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}

4 顺序消息

顺序消息可以保证消息的消费顺序和发送的顺序一致,即先发送的先消费,后发送的后消费,常用于金融证券、电商业务等对消息指令顺序有严格要求的场景。

4.1 如何保证顺序消息

消息的顺序需要由以下三个阶段保证:

  • 消息发送

    如上图所示,A1、B1、A2、A3、B2、B3 是订单 A 和订单 B 的消息产生的顺序,业务上要求同一订单的消息保持顺序,例如订单A的消息发送和消费都按照 A1、A2、A3 的顺序。

    如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时 RocketMQ 支持将 Sharding Key 相同(例如同一订单号)的消息序路由到一个队列中。

    RocketMQ 版服务端判定消息产生的顺序性是参照同一生产者发送消息的时序。不同生产者、不同线程并发产生的消息,云消息队列 RocketMQ 版服务端无法判定消息的先后顺序。

  • 消息存储

    顺序消息的 Topic 中,每个逻辑队列对应一个物理队列,当消息按照顺序发送到 Topic 中的逻辑队列时,每个分区的消息将按照同样的顺序存储到对应的物理队列中。

    对于 kafka 来讲,1个主题会有多个分区,数据存储在每个分区,分区里文件以 Segment 文件串联起来。

    对于 RocketMQ 来讲 , 存储模型包含三部分: 数据文件 commitlog消费文件 consumequeue索引文件 indexfile

    kafka 和 RocketMQ 文件模型很类似,只不过 kafka 的文件数据都会存储在不同的分区里,而 RocketMQ 的数据都存储在 CommitLog 文件里 ,不同的消息会存储在不同的消费队列文件里,便于提升消费者性能(索引)。

    所以我们只需要将特定的消息发送到特定的逻辑队列里,对于 kafka 来讲是分区 partition ,对于 RocketMQ 来讲,就是消费队列 messageQueue 。

  • 消息消费

    RocketMQ 按照存储的顺序将消息投递给 Consumer,Consumer 收到消息后也不对消息顺序做任何处理,按照接收到的顺序进行消费。

    Consumer 消费消息时,同一 Sharding Key 的消息使用单线程消费,保证消息消费顺序和存储顺序一致,最终实现消费顺序和发布顺序的一致。

4.2. 生产者发送顺序消息

下面的代码展示生产者如何发生顺序消息 。

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();

 String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  for (int i = 0; i < 100; i++) {
        int orderId = i % 10;
        Message msg =
                new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                 @Override
                 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);
        System.out.printf("%s%n", sendResult);
}
producer.shutdown();

发送顺序消息需要定制队列选择器 MessageQueueSelector

SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

进入 DefaultMQProducerImpl#sendSelectImpl, 查看顺序消费发送的实现逻辑。

private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
     // 省略代码
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        try {
            List<MessageQueue> messageQueueList =
       mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
            Message userMessage = MessageAccessor.cloneMessage(msg);
            String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
            userMessage.setTopic(userTopic);

    // 调用 selector 的select 方法,传递相关参数,选择某一个队列 
            mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
        } catch (Throwable e) {
            throw new MQClientException("select message queue threw exception.", e);
        }
     
        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTime) {
            throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
        }
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
        } else {
            throw new MQClientException("select message queue return null.", null);
        }
    }
    validateNameServerSetting();
    throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

从上面的顺序消息发送代码,我们得到两点结论:

  1. 顺序消息发送时,需要实现 MessageQueueSelectorselect方法 ;
  2. 发送顺序消息时,若发送失败没有重试。

参考文档:

https://developer.aliyun.com/article/918025

标签:顺序,生产者,发送,mq,消息,聊聊,final,RocketMQ
From: https://blog.51cto.com/u_15564913/8150159

相关文章

  • 聊聊RNN与Attention
    RNN系列:聊聊RNN&LSTM聊聊RNN与seq2seqattentionmechanism,称为注意力机制。基于Attention机制,seq2seq可以像我们人类一样,将“注意力”集中在必要的信息上。Attention的结构seq2seq存在的问题seq2seq中使用编码器对时序数据进行编码,然后将编码信息传递给解码器。此时,编码器的......
  • 聊聊性能测试的左移右移
    前面的文章《测试左移右移,到底是什么》中,分享过我对于测试左移右移的一些思考和实践方法。有同学在后台留言问我:常规的性能测试一般都是在测试阶段集成测试时候才开始介入,很容易测试时间不够,可不可以借鉴测试左移右移的思路,更早的介入和发现性能风险,然后在测试阶段更专注于分析......
  • Kafka-生产者性能调优
    (一)参数调优参数调优相关代码在实际的kafka开发中,我们会发现,无论是生产者还是消费者,都需要构建一个Properties对象,里面设置了很多参数。在这段代码中有很多常用的参数配置,在线上使用时,我们要根据实际的数据量和数据大小来决定这些配置的具体值。Propertiesprops=newProperti......
  • Kafka-生产者、broker、消费者的调优参数总结
     生产环境下,为了尽可能提升Kafka的整体吞吐量,可以对Kafka的相关配置参数进行调整,以达到提升整体性能的目的。本文主要从Kafka的不同组件出发,讲解各组件涉及的配置参数和参数含义。一、生产者(producer.properties或者代码中)1、acks:Producer需要Leader确认的Producer请求的应答......
  • 聊聊多层嵌套的json的值如何解析/替换
    前言前阵子承接了2个需求,一个数据脱敏,一个是低代码国际化多语言需求,这两个需求有个共同特点,都是以json形式返回给前端,而且都存在多层嵌套,其中数据脱敏的数据格式是比较固定,而低代码json的格式存在结构固定和不固定2种格式。最后不管是数据脱敏或者是多语言,业务抽象后,都存在需要......
  • 生产者消费者模式下实现多batch延时推理
    生产者消费者模式下实现多batch延时推理需求分析在实际推理过程中为了实现较高的吞吐量和较高的资源利用率,往往会使用多线程来收集多次请求,并组合形成多batch下的模型推理,一种常见的实现便是生产者和消费者模式,其需求如下:生产者收集提交的请求,消费者对请求进行消费,并将结果返......
  • 聊聊昨日ChatGPT全球宕机事件,带给我们的警示
    作者|卖萌酱,王二狗昨日,ChatGPT崩了!许多人发现无论是ChatGPT或是ChatGPTPLUS都不能正常工作了。还连带了全球数以万计的依赖ChatGPTAPI的热门AI应用也纷纷崩溃。有Twitter网友调侃到,昨日受ChatGPT宕机的影响,全球的生产力下降了50%,打工人一片哀嚎。更有网友上传了一段视频,真......
  • 聊聊RNN&LSTM
    RNN用于解决输入数据为,序列到序列(时间序列)数据,不能在传统的前馈神经网络(FNN)很好应用的问题。时间序列数据是指在不同时间点上收集到的数据,这类数据反映了某一事物、现象等随时间的变化状态或程度,即输入内容的上下文关联性强。整体结构x、o为向量,分别表示输入层、输出层的值......
  • Java基础 什么是生产者和消费者
    在Java中,"生产者-消费者"(Producer-Consumer)是一种常见的并发编程模型,用于协调多个线程之间的工作,其中一些线程充当生产者,而其他线程充当消费者。这模型通常用于处理共享数据的情况,其中生产者线程生成数据并将其放入共享缓冲区,而消费者线程则从缓冲区中取出数据并进行处理。主要特......
  • Java基础 等待唤醒机制——生产者代码实现
    packagepojo.xc01;publicclassCookextendsThread{@Overridepublicvoidrun(){while(true){synchronized(Desk.lock){if(Desk.count==0)break;if(Desk.foodFlog==1){//桌子上有食物,就等待......