YML
rabbitmq:
host: xxx.xxx.xxx.xxx
port: 5672
virtual-host: dev
username: xxx
password: xxx
publisher-confirm-type: correlated
publisher-returns: true
listener:
direct:
acknowledge-mode: auto
simple:
acknowledge-mode: none
配置
public class RabbitConfig {
@Resource
private RabbitUtil rabbitUtil;
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
if(!ack) log.error("NFT_MQ_ConfirmCallback => cause={}", cause);
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
log.info("NFT_MQ_ReturnCallback => message={},replyCode={},replyText={},exchange={},routingKey={}", message, replyCode, replyText, exchange, routingKey);
rabbitUtil.failMessage(message, exchange, routingKey, replyCode + " | " + replyText);
});
return rabbitTemplate;
}
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(RabbitKey.delay_exchange, "x-delayed-message", true, false, args);
}
@Bean
public FanoutExchange globalFanoutExchange(){
return new FanoutExchange(RabbitKey.global_fanout_exchange);
}
}
发送
JSONObject msgData = new JSONObject();
msgData.put("loginType", loinType);
msgData.put("loginPlat", "client");
RabbitMessageModel model = RabbitMessageModel.loginModel(RabbitMessageDataType.userLogin, user.getId(), msgData);
rabbitTemplate.convertAndSend(RabbitKey.global_fanout_exchange, "", model);
接收
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "global_fanout_queue_client", durable = "true"),
exchange = @Exchange(name = RabbitKey.global_fanout_exchange, type = ExchangeTypes.FANOUT)
))
public void globalFanoutQueue(RabbitMessageModel model) {
RabbitMessageDataType type = RabbitMessageDataType.typeOf(model.getDataType());
switch (type) {
case workView: workView(model); break;
}
}
-------------------------------------------------------
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = RabbitKey.payment_cancel_delay + "_client", durable = "true"),
exchange = @Exchange(value = RabbitKey.delay_exchange, delayed = "true"),
key = RabbitKey.payment_cancel_delay
)
)
public void cancel(RabbitMessageModel model) {
Payment payment = paymentDao.findById(model.getDataId()).orElse(null);
if(payment == null) return; //有可能事物回滚,没产生支付单
paymentService.cancel(payment.getOrderNumber(), 1);
}
标签:rabbitTemplate,springboot,exchange,配置,RabbitKey,rabbitmq,new,model,public
From: https://www.cnblogs.com/z8080/p/17477751.html