面试官:你能说说RabbitMQ是如何保证消息顺序消费的吗?
老任:如果我们想要保证消息是按照顺序进行发送的,发送到队列后,队列的消息应该是先进先出的,我们只需要一个队列配置一个消费者即可(窃喜中......)。
面试官:我们的项目一般都是集群部署的,一个队列就会有多个消费者,怎么实现一个队列中所有顺序消息只能有一个消费者消费呢?
老任:这个好办,项目不做集群部署不就行了。
面试官:回去等通知吧......
一. 场景介绍
很多时候,消息的消费是不用保证顺序的,比如借助mq实现订单超时的处理。但有些时候,业务中可能会存在多个消息需要顺序处理的情况,比如生成订单和扣减库存消息,那肯定是先执行生成订单的操作,再执行扣减库存的操作。
那么这种情况下,是如何保证消息顺序消费的呢?
首先,为了效率,我们可以设置多个队列都来处理顺序执行的消息。另外,我们需要保证每组顺序消费的消息发到同一个队列中,给这些消息设置一个统一的全局id即可。
其次,保证消息的顺序消费。就像上面所说,一个队列对应一个消费者即可,但是在项目的集群部署下,这又该怎么处理呢?针对这种情况,我们可以设置队列的“单活模式”。
x-single-active-consumer:单活模式,表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个,除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。
二. 模拟代码实现
假设现在我们有两个队列处理顺序消息(消息1-1和1-2属于一组需要顺序消费的消息,消息2-1和2-2属于另一组需要顺序消费的消息),每个队列有两个消费者(模拟消费者集群)。
队列的配置类
package com.qfedu.springbootmq.sequence.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;@Configurationpublic class SeqQueueConfiguration { /** * 创建两个队列,处理顺序消息 */ @Bean public Queue seqQueue1() { return creatQueue("q_seq1"); }
@Bean public Queue seqQueue2() { return creatQueue("q_seq2"); } // 交换机 @Bean public DirectExchange seqDirectExchange() { return new DirectExchange("direct_seq"); }
// 队列绑定交换机,执行路由key @Bean public Binding seqBinding1() { return BindingBuilder.bind(seqQueue1()).to(seqDirectExchange()).with("1"); }
@Bean public Binding seqBinding2() { return BindingBuilder.bind(seqQueue2()).to(seqDirectExchange()).with("2"); }
/** * 创建一个 单活模式的队列 * @param name * @return queue */ private Queue creatQueue(String name) { HashMap<String, Object> args = new HashMap<>(); // x-single-active-consumer 单活模式 队列 // 表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个, // 除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。 args.put("x-single-active-consumer", true); return new Queue(name, true, false, false, args); }}
package com.qfedu.springbootmq.sequence.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class SeqQueueConfiguration {
/**
* 创建两个队列,处理顺序消息
*/
@Bean
public Queue seqQueue1() {
return creatQueue("q_seq1");
}
@Bean
public Queue seqQueue2() {
return creatQueue("q_seq2");
}
// 交换机
@Bean
public DirectExchange seqDirectExchange() {
return new DirectExchange("direct_seq");
}
// 队列绑定交换机,执行路由key
@Bean
public Binding seqBinding1() {
return BindingBuilder.bind(seqQueue1()).to(seqDirectExchange()).with("1");
}
@Bean
public Binding seqBinding2() {
return BindingBuilder.bind(seqQueue2()).to(seqDirectExchange()).with("2");
}
/**
* 创建一个 单活模式的队列
* @param name
* @return queue
*/
private Queue creatQueue(String name) {
HashMap<String, Object> args = new HashMap<>();
// x-single-active-consumer 单活模式 队列
// 表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个,
// 除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。
args.put("x-single-active-consumer", true);
return new Queue(name, true, false, false, args);
}
}
生产者
package com.qfedu.springbootmq.sequence.producer;import com.qfedu.springbootmq.sequence.message.MessageInfo;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.Resource;
@Componentpublic class ProducerSeq { @Resource private RabbitTemplate rabbitTemplate;
/** * 根据id,将消息顺序发送到对应的队列 * @param id 业务id * @param msg 业务信息 */ public void send(int id, String msg) { MessageInfo message = new MessageInfo(id, msg); rabbitTemplate.convertAndSend("direct_seq", String.valueOf(id % 2 + 1), message); }}
package com.qfedu.springbootmq.sequence.producer;
import com.qfedu.springbootmq.sequence.message.MessageInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class ProducerSeq {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 根据id,将消息顺序发送到对应的队列
* @param id 业务id
* @param msg 业务信息
*/
public void send(int id, String msg) {
MessageInfo message = new MessageInfo(id, msg);
rabbitTemplate.convertAndSend("direct_seq", String.valueOf(id % 2 + 1), message);
}
}
消费者
为了模拟,我们可以创建四个消费者,每两个消费者监听一个队列。四个消费者的逻辑一样,只是类名和监听的队列名不一样。消费者1的代码实现如下:
package com.qfedu.springbootmq.sequence.consumer;import com.qfedu.springbootmq.sequence.message.MessageInfo;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Random;
@Component@RabbitListener(queues = "q_seq1")public class Consumer11 { @RabbitHandler public void onMessage(MessageInfo message) { System.out.println("c11:" + message.getId() + ":" + message.getMsg()); // 随机休眠 long l = new Random(1000).nextLong(); try { Thread.sleep(l); } catch (InterruptedException e) { e.printStackTrace(); } }}
package com.qfedu.springbootmq.sequence.consumer;
import com.qfedu.springbootmq.sequence.message.MessageInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component
@RabbitListener(queues = "q_seq1")
public class Consumer11 {
@RabbitHandler
public void onMessage(MessageInfo message) {
System.out.println("c11:" + message.getId() + ":" + message.getMsg());
// 随机休眠
long l = new Random(1000).nextLong();
try {
Thread.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者2的代码实现:
package com.qfedu.springbootmq.sequence.consumer;import com.qfedu.springbootmq.sequence.message.MessageInfo;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Random;
@Component@RabbitListener(queues = "q_seq1")public class Consumer12 { @RabbitHandler public void onMessage(MessageInfo message) { System.out.println("c12:" + message.getId() + ":" + message.getMsg()); // 随机休眠 long l = new Random(1000).nextLong(); try { Thread.sleep(l); } catch (InterruptedException e) { e.printStackTrace(); } }}
package com.qfedu.springbootmq.sequence.consumer;
import com.qfedu.springbootmq.sequence.message.MessageInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component
@RabbitListener(queues = "q_seq1")
public class Consumer12 {
@RabbitHandler
public void onMessage(MessageInfo message) {
System.out.println("c12:" + message.getId() + ":" + message.getMsg());
// 随机休眠
long l = new Random(1000).nextLong();
try {
Thread.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者3的代码实现:
package com.qfedu.springbootmq.sequence.consumer;import com.qfedu.springbootmq.sequence.message.MessageInfo;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Random;
@Component@RabbitListener(queues = "q_seq2")public class Consumer21 { @RabbitHandler public void onMessage(MessageInfo message) { System.out.println("c21:" + message.getId() + ":" + message.getMsg()); // 随机休眠 long l = new Random(1000).nextLong(); try { Thread.sleep(l); } catch (InterruptedException e) { e.printStackTrace(); } }}
package com.qfedu.springbootmq.sequence.consumer;
import com.qfedu.springbootmq.sequence.message.MessageInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component
@RabbitListener(queues = "q_seq2")
public class Consumer21 {
@RabbitHandler
public void onMessage(MessageInfo message) {
System.out.println("c21:" + message.getId() + ":" + message.getMsg());
// 随机休眠
long l = new Random(1000).nextLong();
try {
Thread.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者4的代码实现:
package com.qfedu.springbootmq.sequence.consumer;import com.qfedu.springbootmq.sequence.message.MessageInfo;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Random;
@Component@RabbitListener(queues = "q_seq2")public class Consumer22 { @RabbitHandler public void onMessage(MessageInfo message) { System.out.println("c22:" + message.getId() + ":" + message.getMsg()); // 随机休眠 long l = new Random(1000).nextLong(); try { TimeUnit.MILLISECONDS.sleep(l); } catch (InterruptedException e) { e.printStackTrace(); } }}
package com.qfedu.springbootmq.sequence.consumer;
import com.qfedu.springbootmq.sequence.message.MessageInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component
@RabbitListener(queues = "q_seq2")
public class Consumer22 {
@RabbitHandler
public void onMessage(MessageInfo message) {
System.out.println("c22:" + message.getId() + ":" + message.getMsg());
// 随机休眠
long l = new Random(1000).nextLong();
try {
TimeUnit.MILLISECONDS.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试
发送4个消息模拟顺序消费的消息,id为1和3的发送到一个队列,id为2和4的发送到另一个队列。
@Testpublic void testSeq() { for (int i = 1; i <= 4; i++) { producerSeq.send(i, "hello" + i); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }}
@Test
public void testSeq() {
for (int i = 1; i <= 4; i++) {
producerSeq.send(i, "hello" + i);
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
输出结果:
从结果中可以看到,虽然一个队列配置了两个消费者,但是每对顺序消息只有一个消费者顺序消费。
另外,我们还可以看到队列中“SAC”,表示启用了单活模式,这样我们就实现了这个需求,现在你学会了吗?关注我,干货天天都不断!
标签:顺序,amqp,springframework,保证,RabbitMQ,org,import,message,public From: https://blog.51cto.com/u_14319530/7089033