首页 > 其他分享 >消息队列MQ的使用

消息队列MQ的使用

时间:2024-08-28 14:52:46浏览次数:17  
标签:发送 producer 队列 Queue MQ 消息 new message

承接我的另一篇博客 消息队列MQ-CSDN博客

启动服务

1.启动mqnamesrv

2.启动mqbroker

mqbroker -n 127.0.0.1:9876


应用

1.普通消息

同步发送
public class EasyA {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("easygroup");

        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        //发送消息
        Message message = new Message("easytopic","rocketmq message".getBytes());
        SendResult result = producer.send(message);
        System.out.println(result);

        producer.shutdown();
    }
}
异步发送
public class EasyB {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        DefaultMQProducer producer=new DefaultMQProducer("easygroup");

        producer.setNamesrvAddr("127.0.0.1:9876");

        Message message=new Message("easytopic","Helloworld".getBytes());

        producer.start();

        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
                System.out.println("OK");
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
                System.out.println("Error");
            }
        });

        System.out.println("发送了一个消息");

        Thread.sleep(3000);
        producer.shutdown();
    }
}
单向发送

单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ不返回ACK。该方式的消息发送效率最高,但消息可靠性较差

public class EasyC {

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("easyGroup");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        Message message=new Message("easytopic","单向发送消息".getBytes());
        producer.sendOneway(message);

        producer.shutdown();
    }
}

发送成功


定义消息消费者
public class EasyD {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("easyGroup");

        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //指定消费主题
        consumer.subscribe("easytopic","*");
        //设置监听器
        consumer.registerMessageListener(EasyD::consumeMessage);

        consumer.start();
        System.out.println("开始执行");
    }

    //处理方法
    static ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
            final ConsumeConcurrentlyContext context){
        for (MessageExt m:msgs){
            System.out.println(new String(m.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}


2.顺序消息

什么是顺序消息

顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。

默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性


为什么需要顺序消息

例如,现在有TOPIC ORDER STATUS(订单状态),其下有4个Queue队列,该Topic中的不同消息用于描述当前订单的不同状态。假设订单有状态:未支付、已支付、发货中、发货成功、发货失败。

根据以上订单状态,生产者从时序上可以生成如下几个消息:

订单T0000001:末支付->订单T0000001:已支付->订单T0000001:发货中->订单T0000001:发货失败

消息发送到MQ中之后,Queue的选择如果采用轮询策略,消息在MQ的存储可能如下:

这种情况下,我们希望Consumer消费消息的顺序和我们发送是一致的,然而上述MQ的投递和消费方式,我们无法保证顺序是正确的。对于顺序异常的消息,Consumer即使设置有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。

基于上述的情况,可以设计如下方案:对于相同订单号的消息,通过一定的策略,将其放置在一个Queue中,然后消费者再采用一定的策略(例如,一个线程独立处理一个queue,保证处理消息的顺序性),能够保证消费的顺序性。


有序性分类

根据有序范围的不同,RocketMQ可以严格地保证两种消息的有序性:分区有序与全局有序。

全局有序

当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序,称为全局有序。

在创建Topic时指定Queue的数量。有三种指定方式:

  1. 在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量
  2. 在RocketMQ可视化控制台中手动创建Topic时指定Queue数量
  3. 使用mqadmin命令手动创建Topic时指定Queue数量


分区有序

如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,则称为分区有序。

把需要有序的消息指定一个队列进行发送

一般性的选择算法是,让选择key(或其hash值)与该Topic所包含的Queue的数量取模,其结果即为选择出的Queue的QueueId。


代码举例

在前面同步发送消息时指定队列:

        //发送消息
        Message message = new Message("easytopic","rocketmq message".getBytes());

        MessageQueue queue=new MessageQueue("easytopic","LAPTOP-93F8NUKL",1);
        SendResult result = producer.send(message,queue);
        System.out.println(result);
        result = producer.send(message,queue);
        System.out.println(result);

发送成功:


3.延时消息

什么是延时消息

当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。

采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票的场景。


延时等级

延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在
RocketMQ服务端的MessageStoreConfig类中的如下变量中:

若指定的延时等级为3,则表示延迟时长为10s,即延迟等级是从1开始计数的。

level==0,消息为非延迟消息

1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s

level>maxLevel,则level== maxLevel,例如level==20,延迟2h


延时消息实现原理

具体实现方案

Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相应的consumequeue。不过,在分发前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;若有则需要经历一个复杂的过程:

修改消息的Topic为SCHEDULE TOPIC XXXX

根据延时等级,在consumequeue目录中SCHEDULE TOPIC XXXX主题下创建出相应的queueld目录与consumequkue文件(如果没有这些目录与文件的话)。

修改消息索引单元内容。索引单元中的Message Tag HashCode部分原本存放的是消息的Tag的Hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原Topic后再次被写入到
commitlog中的时间。投递时间=消息存储时间+延时等级时间。消息存储时间指的是消息被发送到Broker时的时间戳。

将消息索引写入到SCHEDULE TOPIC XXXX主题下相应的consumequeue中

投递延时消息

将消息重新写入commitlog

代码实例

在发送消息时进行如下设置:

        //发送消息
        Message message = new Message("easytopic","rocketmq message".getBytes());
        message.setDelayTimeLevel(5);

标签:发送,producer,队列,Queue,MQ,消息,new,message
From: https://blog.csdn.net/qq_63161848/article/details/141628890

相关文章

  • 单调队列--滑动窗口最大值(leetcode23)
    目录一、单调队列介绍二、题目应用1.题目描述2.解题碎碎念3.代码示例三、扩展1.与优先队列区别2.单调队列其他题目一、单调队列介绍单调队列是一种特殊的队列数据结构,它能够在常数时间内找到队列中的最值。单调队列可以用来解决一些与最值相关的问题,例如滑动窗口最......
  • 抖音私信回复图片接口-企业号授权到开放平台-调用上传图片并发送私信消息
    抖音私信回复图片接口企业号授权到开放平台调用上传图片并发送私信消息这样用户就可以在客服后台,直接给私信用户发送图片了感兴趣的+\/  : llike620golang代码//获取ClientTokenfunc(this*Douyin)GetClientToken()(string,error){url:="https://open.......
  • Go使用rocketmq实现延迟消息
    生产者packagemainimport( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "time&......
  • Kafka Topic 中明明有可拉取的消息,为什么 poll 不到
    开心一刻今天小学女同学给我发消息她:你现在是毕业了吗我:嗯,今年刚毕业她给我发了一张照片,怀里抱着一只大橘猫她:我的眯眯长这么大了,好看吗我:你把猫挪开点,它挡住了,我看不到她:你是sb吗,滚我解释道:你说的是猫呀可消息刚发出,就出现了红色感叹号,并提示:消息已发出,但被对方拒收了kafka......