首页 > 其他分享 >RockerMq发送消息之顺序消息

RockerMq发送消息之顺序消息

时间:2023-09-23 17:44:57浏览次数:30  
标签:RockerMq OrderStep orderList orderDemo 发送 消息 new setDesc

顺序消息

        消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

        顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

        下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

image

消费者监听消息:会独立的开出一个线程去消费,例如独立开出一个线程去消费张三的队列消息。

创建一个订单类:用于模拟业务

package com.zgf.mq.rocketmq.order;

import java.util.ArrayList;
import java.util.List;

/**
 * 订单的步骤
 */
public class OrderStep {
    private long orderId;
    private String desc;

    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString() {
        return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\'' +
                '}';
    }


    /**
     * 生成模拟订单数据
     */
    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();
        /**
         *  15103111039L: 创建、付款、推送、完成
         *  15103111065L:创建、付款、完成
         *  15103117235L:创建、付款、完成
         */
        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

生产者代码:


/**
 * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        // 1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 2.指定NameServer地址
        producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 3.启动producer
        producer.start();
        // 构建消息集合
        List<OrderStep> orderSteps = OrderStep.buildOrders();
        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        // 发送消息
        for (int i = 0;i<orderSteps.size();i++) {
            // 需要发送的消息实体
            String body = dateStr+" Hello RocketMQ " + orderSteps.get(i);
            Message message = new Message("OrderTopic","Order",""+i,body.getBytes());
            /**
             * 参数1:消息对象
             * 参数2:MessageQueueSelector 消息队列的选择器
             * 参数3:选择队列的业务标识(订单id)
             */
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                /**
                 * 来实现消息的自定义路由逻辑,以确保消息被按照特定的规则发送到合适的队列。这对于需要发送顺序消息或者自定义消息路由的场景非常有用。
                 * @param mqs 一个包含了所有可用消息队列的列表。你可以在这个列表中选择一个队列来发送消息。
                 * @param msg 要发送的消息对象。
                 * @param arg 自定义参数,可以用于根据业务逻辑来选择消息队列。通常情况下,你可以使用这个参数来实现选择队列的逻辑。
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 这里的arg,就是我们的 orderSteps.get(i).getOrderId()
                    Long orderId = (Long) arg;
                    // 根据订单id选择要发送的queue
                    long index = orderId % mqs.size(); // 例如 1 % 10  =1(可用的队列,即1号队列)
                    return mqs.get((int) index);
                }
            }, orderSteps.get(i).getOrderId());// 订单id,没有特殊的情况,都是唯一的
            System.out.println("发送结果:"+sendResult);
        }
        producer.shutdown(); // 关闭
    }
}

这里引入了新的内容:MessageQueueSelector接口,该接口注释也写的很清楚,就不介绍了。

消费者代码:

/**
 * 顺序消息的消费
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者Consumer,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 3.订阅主题Topic和Tag
        consumer.subscribe("OrderTopic","*");//消费Tag1中的消息,通过 || xxx 可以消费多个消息,通过*可以消费OrderTopic下的所有消息
		 /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 4.设置回调函数,处理消息,用于监听消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            /**
             * 如果消耗失败,不建议抛出异常,而不是返回ConsumerOrderlyStatus.SSUSPEND_CURRENT_QUEUE_A_MOMEN
             * @param msgs    msgs.size() >= 1 <br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
             * @param context
             * @return 消息状态
             */
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("线程:【"+Thread.currentThread().getName()+"】: 消费消息:"+new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 5.启动消费者consumer
        consumer.start();
    }
}

结果:
image
从上面的结果,我们可以看出来实现了分区有序,即一个线程只完成唯一标识的订单消息。

顺序消息缺陷

  • 消费顺序消息的并行度依赖于队列的数量 ;
  • 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题;
  • 遇到消息失败的消息,无法跳过,当前队列消费暂停;

标签:RockerMq,OrderStep,orderList,orderDemo,发送,消息,new,setDesc
From: https://www.cnblogs.com/zgf123/p/17724787.html

相关文章

  • RocketMQ发送消息之同步异步单向
    官网教程:https://rocketmq.apache.org/zh/docs/quickStart/01quickstart基于双主双从异步方式开启的前提下,在maven项目中引入下列依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1&l......
  • SQL Server 发送邮件功能
    execsp_configure'showadvancedoptions',1RECONFIGUREWITHOVERRIDEgoexecsp_configure'databasemailxps',1RECONFIGUREWITHOVERRIDEgo--2.创建邮件帐户信息EXECmsdb..Sysmail_add_account_sp@ACCOUNT_NAME='OCTMamiETL'......
  • Kafka的消息传递保证和一致性
    前言通过前面的文章,相信大家对Kafka有了一定的了解了,那接下来问题就来了,Kafka既然作为一个分布式的消息队列系统,那它会不会出现消息丢失或者重复消费的情况呢?今天咱们就来一探。实现机制Kafka采用了一系列机制来实现消息传递的保证和一致性,关键点:至少一次的消息传递(AtLeastOnceD......
  • 消息队列中,如何保证消息的顺序性?
    本文选自:advanced-java作者:yanglbme问:如何保证消息的顺序性?面试官心理分析其实这个也是用MQ的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这是生产系统中常见的问题。面试题剖析我举个例子,我们以前做过一个mysqlbinlog同步的系统,压......
  • 消息队列中,如何保证消息的顺序性?
    本文选自:advanced-java作者:yanglbme问:如何保证消息的顺序性?面试官心理分析其实这个也是用MQ的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这是生产系统中常见的问题。面试题剖析我举个例子,我们以前做过一个mysqlbinlog同步......
  • Kafka的消息传递保证和一致性
    前言通过前面的文章,相信大家对Kafka有了一定的了解了,那接下来问题就来了,Kafka既然作为一个分布式的消息队列系统,那它会不会出现消息丢失或者重复消费的情况呢?今天咱们就来一探。实现机制Kafka采用了一系列机制来实现消息传递的保证和一致性,关键点:至少一次的消息传递(AtLeas......
  • redis消息队列——发布订阅
    一、相关依赖<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><grou......
  • Kafka消息压缩算法性能调优与选择
    前言Kafka作为一款高性能的分布式消息队列,其消息压缩算法的选择和调优对于系统性能的提升至关重要。本文将深入探讨Kafka消息压缩算法的性能调优和选择。压缩算法的选择Kafka支持多种压缩算法,包括gzip、snappy和lz4。这些算法各有优缺点,需要根据实际情况进行选择。gzipgzip是......
  • Kafka消息过期与清理策略深入研究
    背景Kafka是一个高性能、高可靠、分布式的消息队列系统,被广泛应用于大数据领域。在Kafka中,消息的过期与清理是一个非常重要的问题,本文将深入探讨Kafka中的消息过期与清理策略。Kafka消息过期在Kafka中,消息的过期是通过消息的时间戳(timestamp)来实现的。Kafka支持两种时间戳:消息创......
  • Kafka消息消费者位移存储性能测试
    背景Kafka是一个高性能、分布式的消息队列,被广泛应用于大数据领域。在Kafka中,消费者位移存储是非常重要的一部分,它记录了消费者消费消息的位置,以便在消费者宕机或者重启后能够继续消费未消费的消息。在实际应用中,消费者位移存储的性能对于Kafka的整体性能有着重要的影响。本文将......