1.TTL 消息队列的超时时间设置
使用场景介绍---购物付款在指定时间内进行,超过某个时间就会取消(取消后的队列就会加入到死信队列中)
-
通过TTL特有的参数进行插入,设置 x-message-ttl 参数给予超时时间
-
通过MessagePostProcessor类实现方法进行设置 信息 的超时时间
2.Confirm机制和Retrun机制(监听)
Confirm机制用于生产者与交换机之间,确保两者之间的消息通畅,通过回调数据展现两者的情况。
ACK:当消息被borker签收,会回调此方法
# Confirm 的开启 publisher-confirm-type
# Return 的开启 publisher-returns
# correlated 执行ack的时候还会携带数据(消息的元数据)
# none 为禁用
# simple 不推荐使用
spring:
rabbitmq:
port: 5672
virtual-host: cystest
username: root
password: root
host: 192.168.231.3
publisher-confirm-type: correlated
publisher-returns: true
//实现ACK的监听器
@Component
public class MyConfirm implements RabbitTemplate.ConfirmCallback {
@Autowired
RabbitTemplate rabbitTemplate;
// @PostConstruct 服务启动时就会执行该方法,Spring初始化时执行该方法
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
//获取消息的ID
String id = "";
if (Objects.nonNull(correlationData)){
id = correlationData.getId();
}
if (ack){
System.out.println("消息投递成功,id:"+id);
}else {
//可以将消息添加到定时任务,等待下次发送
System.out.println("消息投递失败,原因:"+s);
}
}
//消息没有传递到队列
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息:"+new String(returnedMessage.getMessage().getBody())+",没有成功投递到队列");
}
}
@Service
public class ProducerServiceImpl {
@Autowired
RabbitTemplate rabbitTemplate;
public String sentMessage(String message){
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("cys_confirm_test123","cys",message,correlationData);
return "success";
}
}
@RestController
public class ProducerController {
@Autowired
ProducerServiceImpl service;
@GetMapping("/send")
public String send(String message){
return service.sentMessage(message);
}
}
3.死信队列
其实也是普通队列,只是死信队列中存储的消息是未被正常执行的消息;
1.超时未被消费 2.消息数量大于队列限制数量 3.消息未被签收
设置信息队列的参数以此声明死信队列,再用普通队列绑定死信队列,这样不正常的消息就会进入死信队列。