Docker 搭建 RabbitMQ
- 拉取 RabbitMQ 的镜像执行命令
docker pull rabbitmq:3.7-management
- 执行运行命令
docker run -d --hostname rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 rabbitmq:3-management
- 打开浏览器访问
http://localhost:15672
,账号:admim, 密码: admin
RabbitMQ 消息类型
Topic
topic 既可以实现 direct,也可实现 Fanout,topic 通过 routing_key 进行路由,key 可以存在两种特殊字符“ _ ”与“#”,用于做模糊匹配,其中“_”用于匹配一个单词,“#”用于匹配多个单词(可以是零个) *代表全部的
SpringBoot 整合
通过编写基本配置类,实现交换器和路由器绑定,和实现配置手动确认等,需要实现 RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback
接口,实现接口的方法,还需设置配置文件。
基本配置类
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* @ClassName RabbitConfig
* @Author zhangjiahao
* @Description RabbitMQ基本配置类
* @Date $ $
**/
@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private CachingConnectionFactory connectionFactory;
@PostConstruct
public void init(){
System.out.println("初始化rabbitmq");
rabbitTemplate.setConfirmCallback(this); // 指定 ConfirmCallback
rabbitTemplate.setReturnCallback(this); // 指定 ReturnCallback
}
/**
* 如果消息到达 exchange, 则 confirm 回调, ack = true
* 如果消息不到达 exchange, 则 confirm 回调, ack = false
* 需要设置spring.rabbitmq.publisher-confirms=true
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
Message returnedMessage = correlationData.getReturnedMessage();
if (ack){
System.out.println("消息是否到达Exchange:{}" + "消息成功到达Exchange");
}else {
System.out.println("消息是否到达Exchange:{}" + "消息到达Exchange失败");
}
if (!ack) {
System.out.println("消息到达Exchange失败原因:{}"+ cause);
// 根据业务逻辑实现消息补偿机制
}
}
/**
* exchange 到达 queue, 则 returnedMessage 不回调
* exchange 到达 queue 失败, 则 returnedMessage 回调
* 需要设置spring.rabbitmq.publisher-returns=true
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("queuec处理失败");
System.out.println("消息报文:{}"+ new String(message.getBody()));
System.out.println("消息编号:{}"+ replyCode);
System.out.println("描述:{}"+ replyText);
System.out.println("交换机名称:{}"+exchange);
System.out.println("路由名称:{}"+ routingKey);
// 根据业务逻辑实现消息补偿机制
}
@Bean(name = "consumerlistenerContainer")
public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(3);
factory.setPrefetchCount(5);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
实现 Direct 队列
Direct 配置类
import io.renren.modules.generator.constant.MQConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author zhangjiahao
* @description DirectRabbitConfig
* @date 2020-03-23 23:03:21
*/
@Configuration
public class DirectRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue directQueue() {
return new Queue(MQConstant.MQQueueAndExchange.DIRECT_QUEUE, true); //true 是否持久
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange directExchange() {
return new DirectExchange(MQConstant.MQQueueAndExchange.DIRECT_QUEUE_EXCHANGE);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with(MQConstant.MQQueueAndExchange.DIRECT_QUEUE_ROUTING_KEY);
}
}
实现 Fanout 队列
Fanout 配置类
import io.renren.modules.generator.constant.MQConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName FanoutRabbitConfig
* @Author zhangjiahao
* @Description //TODO $
* @Date $ $
**/
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue fanoutQueue(){
return new Queue(MQConstant.MQQueueAndExchange.FANOUT_QUEUE);
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(MQConstant.MQQueueAndExchange.FANOUT_QUEUE_EXCHANGE);
}
@Bean
public Binding fanoutBinding(){
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
}
实现 Topic 队列
Topic 配置类
import io.renren.modules.generator.constant.MQConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName TopicRabbitConfig
* @Author zhangjiahao
* @Description //TODO $
* @Date $ $
**/
@Configuration
public class TopicRabbitConfig {
@Bean
public Queue topicQueue(){
return new Queue(MQConstant.MQQueueAndExchange.TOPIC_QUEUE);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MQConstant.MQQueueAndExchange.TOPIC_QUEUE_EXCHANGE);
}
@Bean
public Binding topicBinding(){
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(MQConstant.MQQueueAndExchange.TOPIC_QUEUE_ROUTING_KEY);
}
}
配置文件 yml
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 127.0.0.1
port: 5672
## 手动确认
publisher-confirms: true
publisher-returns: true
编写消费者
消费端的代码基本一致,所以写个 Topic 的代码模板。
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.rabbitmq.client.Channel;
import io.renren.modules.generator.constant.MQConstant;
import io.renren.modules.generator.entity.BrokerMessageLogEntity;
import io.renren.modules.generator.entity.SysMessageEntity;
import io.renren.modules.generator.service.BrokerMessageLogService;
import io.renren.modules.generator.service.SysMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
/**
* @ClassName TopicConsumer
* @Author zhangjiahao
* @Description //TODO $
* @Date $ $
**/
@Slf4j
@Component
public class TopicConsumer {
@Autowired
private SysMessageService sysMessageService;
@Autowired
private BrokerMessageLogService brokerMessageLogService;
@RabbitHandler
@RabbitListener(queues = "topic_queue",containerFactory = "consumerlistenerContainer")
public void topicConsumer(SysMessageEntity msg, Channel channel, Message message) throws IOException {
System.out.println(msg.toString());
BrokerMessageLogEntity one =
brokerMessageLogService.getOne(new QueryWrapper<BrokerMessageLogEntity>().lambda().eq(BrokerMessageLogEntity::getMessageId, msg.getMessageId()).eq(BrokerMessageLogEntity::getStatus, 0));
if (one == null) {
one = new BrokerMessageLogEntity();
one.setCreateTime(msg.getGmtCreate());
one.setUpdateTime(msg.getGmtModified());
one.setStatus("0");
one.setMessageId(msg.getMessageId());
one.setMessage(JSON.toJSONString(msg));
one.setTryCount(MQConstant.MQ_TRY_COUNT);
one.setNextRetry(new Date(System.currentTimeMillis() + MQConstant.MQ_TIME_SPACE));
}
try {
log.info("正常处理数据,{}", msg);
//处理完成 消息设置为已处理
msg.setState(1);
sysMessageService.updateById(msg);
// MQ日志状态已处理
one.setStatus("4");
brokerMessageLogService.saveOrUpdate(one);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
if(one.getTryCount()<1){
log.error("已超过尝试次数,在此失败{}",msg);
msg.setState(-1);
sysMessageService.updateById(msg);
one.setStatus("-1");
brokerMessageLogService.updateById(one);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
// 消息重新放回队列
try {
one.setTryCount(one.getTryCount() - 1);
one.setNextRetry(new Date(System.currentTimeMillis() + MQConstant.MQ_TIME_SPACE));
brokerMessageLogService.updateById(one);
System.out.println("消息[{}]处理失败,重新放回队列" + msg);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
e.printStackTrace();
} catch (IOException ex) {
log.error("消费失败,{}", msg);
ex.printStackTrace();
}
}
}
}
这里其他的 service 是用于记录 RabbitMQ 的日志,用于处理投递失败和消费失败的情况。
编写测试的生产者
@GetMapping("direct")
public R direct() {
for (int i = 0; i < 11; i++) {
SysMessageEntity sysMessageEntity = new SysMessageEntity();
sysMessageEntity.setCreator("1");
sysMessageEntity.setGmtCreate(new Date());
sysMessageEntity.setGmtModified(new Date());
sysMessageEntity.setMessageId(snowFlake.nextId() + "");
sysMessageEntity.setModifier("1");
sysMessageEntity.setName("测试消费direct-" + i);
sysMessageEntity.setType(0);
sysMessageEntity.setState(0);
CommonCorrelationData commonCorrelationData = new CommonCorrelationData();
commonCorrelationData.setId(sysMessageEntity.getMessageId());
commonCorrelationData.setData(sysMessageEntity);
sysMessageService.save(sysMessageEntity);
rabbitTemplate.convertAndSend(MQConstant.MQQueueAndExchange.DIRECT_QUEUE_EXCHANGE,
MQConstant.MQQueueAndExchange.DIRECT_QUEUE_ROUTING_KEY, sysMessageEntity,commonCorrelationData);
}
return R.ok();
}
@GetMapping("/fanout")
public R fanout() {
for (int i = 0; i < 11; i++) {
SysMessageEntity sysMessageEntity = new SysMessageEntity();
sysMessageEntity.setCreator("1");
sysMessageEntity.setGmtCreate(new Date());
sysMessageEntity.setGmtModified(new Date());
sysMessageEntity.setMessageId(snowFlake.nextId() + "");
sysMessageEntity.setModifier("1");
sysMessageEntity.setName("测试消费fanout-" + i);
sysMessageEntity.setType(0);
sysMessageEntity.setState(0);
sysMessageService.save(sysMessageEntity);
rabbitTemplate.convertAndSend(MQConstant.MQQueueAndExchange.FANOUT_QUEUE_EXCHANGE,
MQConstant.MQQueueAndExchange.FANOUT_QUEUE_ROUTING_KEY, sysMessageEntity);
}
return R.ok();
}
@GetMapping("topic")
public R topic() {
for (int i = 0; i < 11; i++) {
SysMessageEntity sysMessageEntity = new SysMessageEntity();
sysMessageEntity.setCreator("1");
sysMessageEntity.setGmtCreate(new Date());
sysMessageEntity.setGmtModified(new Date());
sysMessageEntity.setMessageId(snowFlake.nextId() + "");
sysMessageEntity.setModifier("1");
sysMessageEntity.setName("测试消费topic-" + i);
sysMessageEntity.setType(0);
sysMessageEntity.setState(0);
sysMessageService.save(sysMessageEntity);
rabbitTemplate.convertAndSend(MQConstant.MQQueueAndExchange.TOPIC_QUEUE_EXCHANGE,
MQConstant.MQQueueAndExchange.TOPIC_QUEUE_ROUTING_KEY, sysMessageEntity);
}
return R.ok();
}
关于 RabbitMQ 的 UI 界面设置
自定义 CorrelationData
因为 MQ 完全监控的原因,可以通过手动确认来处理投递失败的。
/**
* 如果消息到达 exchange, 则 confirm 回调, ack = true
* 如果消息不到达 exchange, 则 confirm 回调, ack = false
* 需要设置spring.rabbitmq.publisher-confirms=true
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
Message returnedMessage = correlationData.getReturnedMessage();
if (ack){
System.out.println("消息是否到达Exchange:{}" + "消息成功到达Exchange");
}else {
System.out.println("消息是否到达Exchange:{}" + "消息到达Exchange失败");
}
if (!ack) {
System.out.println("消息到达Exchange失败原因:{}"+ cause);
// 根据业务逻辑实现消息补偿机制
}
}
通过重写 CorrelationData 来实现完全的监控。
public class CorrelationData implements Correlation {
private final SettableListenableFuture<CorrelationData.Confirm> future = new SettableListenableFuture();
@Nullable
private volatile String id;
private volatile Message returnedMessage;
public CorrelationData() {
}
public CorrelationData(String id) {
this.id = id;
}
@Nullable
public String getId() {
return this.id;
}
public void setId(String id) {
this.id = id;
}
public SettableListenableFuture<CorrelationData.Confirm> getFuture() {
return this.future;
}
@Nullable
public Message getReturnedMessage() {
return this.returnedMessage;
}
public void setReturnedMessage(Message returnedMessage) {
this.returnedMessage = returnedMessage;
}
public String toString() {
return "CorrelationData [id=" + this.id + "]";
}
public static class Confirm {
private final boolean ack;
private final String reason;
public Confirm(boolean ack, @Nullable String reason) {
this.ack = ack;
this.reason = reason;
}
public boolean isAck() {
return this.ack;
}
public String getReason() {
return this.reason;
}
public String toString() {
return "Confirm [ack=" + this.ack + (this.reason != null ? ", reason=" + this.reason : "") + "]";
}
}
}