首页 > 其他分享 >RabbitMQ如何保证顺序消费

RabbitMQ如何保证顺序消费

时间:2023-08-15 15:02:14浏览次数:32  
标签:顺序 amqp springframework 保证 RabbitMQ org import message public

面试官:你能说说RabbitMQ是如何保证消息顺序消费的吗?

老任:如果我们想要保证消息是按照顺序进行发送的,发送到队列后,队列的消息应该是先进先出的,我们只需要一个队列配置一个消费者即可(窃喜中......)。

面试官:我们的项目一般都是集群部署的,一个队列就会有多个消费者,怎么实现一个队列中所有顺序消息只能有一个消费者消费呢?

老任:这个好办,项目不做集群部署不就行了。

面试官:回去等通知吧......

一. 场景介绍

很多时候,消息的消费是不用保证顺序的,比如借助mq实现订单超时的处理。但有些时候,业务中可能会存在多个消息需要顺序处理的情况,比如生成订单和扣减库存消息,那肯定是先执行生成订单的操作,再执行扣减库存的操作。

那么这种情况下,是如何保证消息顺序消费的呢?

首先,为了效率,我们可以设置多个队列都来处理顺序执行的消息。另外,我们需要保证每组顺序消费的消息发到同一个队列中,给这些消息设置一个统一的全局id即可。

其次,保证消息的顺序消费。就像上面所说,一个队列对应一个消费者即可,但是在项目的集群部署下,这又该怎么处理呢?针对这种情况,我们可以设置队列的“单活模式”。

x-single-active-consumer:单活模式,表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个,除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。

二. 模拟代码实现

假设现在我们有两个队列处理顺序消息(消息1-1和1-2属于一组需要顺序消费的消息,消息2-1和2-2属于另一组需要顺序消费的消息),每个队列有两个消费者(模拟消费者集群)。

RabbitMQ如何保证顺序消费_spring

队列的配置类

package com.qfedu.springbootmq.sequence.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;@Configurationpublic class SeqQueueConfiguration {    /**     * 创建两个队列,处理顺序消息     */    @Bean    public Queue seqQueue1() {        return creatQueue("q_seq1");    }
    @Bean    public Queue seqQueue2() {        return creatQueue("q_seq2");    }    // 交换机    @Bean    public DirectExchange seqDirectExchange() {        return new DirectExchange("direct_seq");    }
    // 队列绑定交换机,执行路由key    @Bean    public Binding seqBinding1() {        return BindingBuilder.bind(seqQueue1()).to(seqDirectExchange()).with("1");    }
    @Bean    public Binding seqBinding2() {        return BindingBuilder.bind(seqQueue2()).to(seqDirectExchange()).with("2");    }
    /**     * 创建一个 单活模式的队列     * @param name     * @return queue     */    private Queue creatQueue(String name) {        HashMap<String, Object> args = new HashMap<>();        // x-single-active-consumer 单活模式 队列        // 表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个,        // 除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。        args.put("x-single-active-consumer", true);        return new Queue(name, true, false, false, args);    }}

package com.qfedu.springbootmq.sequence.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class SeqQueueConfiguration {
    /**
     * 创建两个队列,处理顺序消息
     */
    @Bean
    public Queue seqQueue1() {
        return creatQueue("q_seq1");
    }


    @Bean
    public Queue seqQueue2() {
        return creatQueue("q_seq2");
    }
    // 交换机
    @Bean
    public DirectExchange seqDirectExchange() {
        return new DirectExchange("direct_seq");
    }


    // 队列绑定交换机,执行路由key
    @Bean
    public Binding seqBinding1() {
        return BindingBuilder.bind(seqQueue1()).to(seqDirectExchange()).with("1");
    }


    @Bean
    public Binding seqBinding2() {
        return BindingBuilder.bind(seqQueue2()).to(seqDirectExchange()).with("2");
    }


    /**
     * 创建一个 单活模式的队列
     * @param name
     * @return queue
     */
    private Queue creatQueue(String name) {
        HashMap<String, Object> args = new HashMap<>();
        // x-single-active-consumer 单活模式 队列
        // 表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个,
        // 除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。
        args.put("x-single-active-consumer", true);
        return new Queue(name, true, false, false, args);
    }
}

生产者

package com.qfedu.springbootmq.sequence.producer;import com.qfedu.springbootmq.sequence.message.MessageInfo;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.Resource;
@Componentpublic class ProducerSeq {    @Resource    private RabbitTemplate rabbitTemplate;
    /**     * 根据id,将消息顺序发送到对应的队列     * @param id  业务id     * @param msg 业务信息     */    public void send(int id, String msg) {        MessageInfo message = new MessageInfo(id, msg);        rabbitTemplate.convertAndSend("direct_seq", String.valueOf(id % 2 + 1), message);    }}
package com.qfedu.springbootmq.sequence.producer;
import com.qfedu.springbootmq.sequence.message.MessageInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;


@Component
public class ProducerSeq {
    @Resource
    private RabbitTemplate rabbitTemplate;


    /**
     * 根据id,将消息顺序发送到对应的队列
     * @param id  业务id
     * @param msg 业务信息
     */
    public void send(int id, String msg) {
        MessageInfo message = new MessageInfo(id, msg);
        rabbitTemplate.convertAndSend("direct_seq", String.valueOf(id % 2 + 1), message);
    }
}

消费者

为了模拟,我们可以创建四个消费者,每两个消费者监听一个队列。四个消费者的逻辑一样,只是类名和监听的队列名不一样。消费者1的代码实现如下:

package com.qfedu.springbootmq.sequence.consumer;import com.qfedu.springbootmq.sequence.message.MessageInfo;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Random;
@Component@RabbitListener(queues = "q_seq1")public class Consumer11 {    @RabbitHandler    public void onMessage(MessageInfo message) {        System.out.println("c11:" + message.getId() + ":" + message.getMsg());        // 随机休眠        long l = new Random(1000).nextLong();        try {            Thread.sleep(l);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

package com.qfedu.springbootmq.sequence.consumer;
import com.qfedu.springbootmq.sequence.message.MessageInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;


@Component
@RabbitListener(queues = "q_seq1")
public class Consumer11 {
    @RabbitHandler
    public void onMessage(MessageInfo message) {
        System.out.println("c11:" + message.getId() + ":" + message.getMsg());
        // 随机休眠
        long l = new Random(1000).nextLong();
        try {
            Thread.sleep(l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费者2的代码实现:

package com.qfedu.springbootmq.sequence.consumer;import com.qfedu.springbootmq.sequence.message.MessageInfo;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Random;
@Component@RabbitListener(queues = "q_seq1")public class Consumer12 {    @RabbitHandler    public void onMessage(MessageInfo message) {        System.out.println("c12:" + message.getId() + ":" + message.getMsg());        // 随机休眠        long l = new Random(1000).nextLong();        try {            Thread.sleep(l);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

package com.qfedu.springbootmq.sequence.consumer;
import com.qfedu.springbootmq.sequence.message.MessageInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;


@Component
@RabbitListener(queues = "q_seq1")
public class Consumer12 {
    @RabbitHandler
    public void onMessage(MessageInfo message) {
        System.out.println("c12:" + message.getId() + ":" + message.getMsg());
        // 随机休眠
        long l = new Random(1000).nextLong();
        try {
            Thread.sleep(l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费者3的代码实现:

package com.qfedu.springbootmq.sequence.consumer;import com.qfedu.springbootmq.sequence.message.MessageInfo;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Random;
@Component@RabbitListener(queues = "q_seq2")public class Consumer21 {    @RabbitHandler    public void onMessage(MessageInfo message) {        System.out.println("c21:" + message.getId() + ":" + message.getMsg());        // 随机休眠        long l = new Random(1000).nextLong();        try {            Thread.sleep(l);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

package com.qfedu.springbootmq.sequence.consumer;
import com.qfedu.springbootmq.sequence.message.MessageInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;


@Component
@RabbitListener(queues = "q_seq2")
public class Consumer21 {
    @RabbitHandler
    public void onMessage(MessageInfo message) {
        System.out.println("c21:" + message.getId() + ":" + message.getMsg());
        // 随机休眠
        long l = new Random(1000).nextLong();
        try {
            Thread.sleep(l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费者4的代码实现:

package com.qfedu.springbootmq.sequence.consumer;import com.qfedu.springbootmq.sequence.message.MessageInfo;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Random;
@Component@RabbitListener(queues = "q_seq2")public class Consumer22 {    @RabbitHandler    public void onMessage(MessageInfo message) {        System.out.println("c22:" + message.getId() + ":" + message.getMsg());        // 随机休眠        long l = new Random(1000).nextLong();        try {            TimeUnit.MILLISECONDS.sleep(l);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

package com.qfedu.springbootmq.sequence.consumer;
import com.qfedu.springbootmq.sequence.message.MessageInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;


@Component
@RabbitListener(queues = "q_seq2")
public class Consumer22 {
    @RabbitHandler
    public void onMessage(MessageInfo message) {
        System.out.println("c22:" + message.getId() + ":" + message.getMsg());
        // 随机休眠
        long l = new Random(1000).nextLong();
        try {
            TimeUnit.MILLISECONDS.sleep(l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

测试

发送4个消息模拟顺序消费的消息,id为1和3的发送到一个队列,id为2和4的发送到另一个队列。

@Testpublic void testSeq() {    for (int i = 1; i <= 4; i++) {        producerSeq.send(i, "hello" + i);    }    try {        Thread.sleep(2000);    } catch (InterruptedException e) {        e.printStackTrace();    }}

@Test
public void testSeq() {
    for (int i = 1; i <= 4; i++) {
        producerSeq.send(i, "hello" + i);
    }
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

输出结果:

RabbitMQ如何保证顺序消费_java_02

从结果中可以看到,虽然一个队列配置了两个消费者,但是每对顺序消息只有一个消费者顺序消费。

RabbitMQ如何保证顺序消费_java_03

另外,我们还可以看到队列中“SAC”,表示启用了单活模式,这样我们就实现了这个需求,现在你学会了吗?关注我,干货天天都不断!

标签:顺序,amqp,springframework,保证,RabbitMQ,org,import,message,public
From: https://blog.51cto.com/u_14319530/7089033

相关文章

  • 《高级程序员 面试攻略 》RabbitMQ 如何实现可靠性
    RabbitMQ提供了多种机制来实现消息传递的可靠性。下面是一些常见的方法:1.持久化消息:RabbitMQ允许将消息标记为持久化,以确保即使在发生故障或重启后,消息也不会丢失。通过将消息的`deliverymode`设置为2,可以将消息标记为持久化消息。1.持久化队列:创建持久化队列可以确保即使......
  • 《高级程序员 面试攻略 》RocketMQ 如何保证顺序性
    RocketMQ提供了一种称为顺序消息的机制来确保消息的顺序性。下面是一些关键的方法和概念:1.顺序消息:顺序消息是指在发送和消费过程中,消息按照特定的顺序进行处理。RocketMQ通过将消息发送到同一个消息队列(MessageQueue)来实现顺序消息。每个消息队列都有一个全局唯一的标识符(Me......
  • 7.3 C/C++ 实现顺序栈
    顺序栈是一种基于数组实现的栈结构,它的数据元素存储在一段连续的内存空间中。在顺序栈中,栈顶元素的下标是固定的,而栈底元素的下标则随着入栈和出栈操作的进行而变化。通常,我们把栈底位置设置在数组空间的起始处,这样在进行入栈和出栈操作时,只需要维护栈顶指针即可。顺序栈的实现比......
  • 使用线程实现ACB的顺序输出
    在java中可以使用join方法来实现,join会阻塞当前方法,调用的当前方法执行结束后,才会继续往下执行!publicclassFoo{publicFoo(){}publicvoidA(){System.out.println("A");}publicvoidB(){System.out.println("B");}......
  • dp-矩阵链相乘顺序
    矩阵链相乘顺序目录矩阵链相乘顺序问题描述举例说明问题分析程序问题描述A1,A2,..,An表示n个矩阵的序列,其中Ai为\(P_{i−1}×P_i\)阶矩阵,i=1,2,...,n。向量P=<P0,P1,P2..Pi>表示矩阵链的输入,其中P0是A1的行数,P1是A1的列数,P1是A2的行数,以此类推。计算这个矩阵需要做n−1次两......
  • 查找算法——顺序查找
    基于无序链表的顺序查找:在查找中我们一个一个地顺序遍历符号表中的所有键并使用equals()方法来寻找与被查找的键匹配的键。无序链表查找的性能上面get()方法中查找第一个键需要1次比较,查找第二个需要2次比较,如此这般,平均比较次数为(1+2+...+N)/N,也就是(N+1)/2~N/2。比较的总次数......
  • Windows10下安装docker 并启动 rabbitmq
    1.官网下载docker:https://hub.docker.com/editions/community/docker-ce-desktop-windows 2.安装docker:运行下载好的DockerforWindowsInstaller.exe 3.安装好后启动docker: 4.修改成国内镜像仓库:https://q3a36twa.mirror.aliyuncs.com  5.拉取rabbitmq,在cmd窗......
  • rabbitMQ初识
    消息队列-RabbitMQ认识MQ同步和异步通讯微服务间通讯有同步和异步两种方式:同步通讯:就像打电话,需要实时响应。异步通讯:就像发邮件,不需要马上回复。同步通讯同步调用的优点:时效性较强,可以立即得到结果同步调用的问题:耦合度高性能和吞吐能力下降有额外的资源消耗有级......
  • RabbitMq应用示例(RabbitMq第三篇)
    一、应用示例:参考文献:(写的很好的一篇文章,小编已经参照该博主的内容完整的编写了一遍,可能会有一些遐思,后续会把自己编写的代码也补充一下)Springboot整合RabbitMq,用心看完这一篇就够了_springbootrabbitmq_小目标青年的博客-CSDN博客简单的消息推送流程图:黄色的圈圈就是我们的消息......
  • 顺序查找(线性查找)
    博客地址:https://www.cnblogs.com/zylyehuo/#_*_coding:utf-8_*_fromcal_timeimport*@cal_timedeflinear_search(li,val):forind,vinenumerate(li):ifv==val:returnindelse:returnNoneli=list(range(100000......