Publisher Confirms发布确认是用于实现可靠发布的RabbitMQ扩展。
我们将使用发布确认来确保已发布的消息已安全到达代理。我们将介绍几种使用publisher确认的策略,并解释其优缺点
首先检查application.yml文件
spring:
rabbitmq:
host: 127.0.0.1
# 之前博客未加端口,此处新增
port: 5672
username: guest
password: guest
virtualHost: /
1. 单独发布消息
- 新增配置文件PublishConfirmConfig.java
@Configuration
public class PublishConfirmConfig {
@Bean("myRabbitConnectionFactory")
public ConnectionFactory myRabbitConnectionFactory(RabbitProperties rabbitProperties){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(rabbitProperties.getHost());
cachingConnectionFactory.setPort(rabbitProperties.getPort());
cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
cachingConnectionFactory.setVirtualHost("/");
return cachingConnectionFactory;
}
@Bean
public RabbitTemplate simpleRabbitTemplate(ConnectionFactory myRabbitConnectionFactory) {
CachingConnectionFactory connectionFactory = (CachingConnectionFactory) myRabbitConnectionFactory;
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
connectionFactory.setPublisherReturns(true);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
- 新增发送文件PublishConfirmSender.java
@Component
public class PublishConfirmSender {
private RabbitTemplate simpleRabbitTemplate;
public PublishConfirmSender(RabbitTemplate simpleRabbitTemplate) {
this.simpleRabbitTemplate = simpleRabbitTemplate;
}
public void oneSender() {
boolean sendFlag = simpleRabbitTemplate.invoke(operations -> {
simpleRabbitTemplate.convertAndSend("direct", "orange", "orange msg");
return simpleRabbitTemplate.waitForConfirms(5000);
});
if (sendFlag) {
System.out.println("消息已成功发送");
}
}
}
- 测试发送
@SpringBootTest
public class RabbitTest {
@Autowired
private PublishConfirmSender publishConfirmSender;
@Test
public void testOneSender() {
publishConfirmSender.oneSender();
}
}
2. 批量消息发布确认
@Component
public class PublishConfirmSender {
............
public void batchSender() {
boolean sendFlag = simpleRabbitTemplate.invoke(operations -> {
for (int i = 0; i < 50; i++) {
simpleRabbitTemplate.convertAndSend("direct", "orange", "orange " + i + "msg");
if (i % 10 == 0) {
if (simpleRabbitTemplate.waitForConfirms(5000)) {
System.out.println(i / 10 + "批次消息已全部成功发送");
}
}
}
return simpleRabbitTemplate.waitForConfirms(5000);
});
if (sendFlag) {
System.out.println("消息已全部成功发送");
}
}
}
3. 发布服务器异步确认
@Component
public class PublishConfirmSender {
......
@Bean
@Primary
public RabbitTemplate asyncRabbitTemplate(ConnectionFactory myRabbitConnectionFactory) {
CachingConnectionFactory connectionFactory = (CachingConnectionFactory) myRabbitConnectionFactory;
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
}
@Component
public class PublishConfirmSender {
public void asyncSender() {
asyncRabbitTemplate.invoke(operations -> {
for (int i = 0; i < 50; i++) {
String body = "orange " + i + "msg";
simpleRabbitTemplate.convertAndSend("direct", "orange", body);
}
return null;
}, (deliveryTag, multiple) -> {
System.out.format("消息已确认. Sequence number: %d, multiple: %b%n", deliveryTag, multiple);
}, (deliveryTag, multiple) -> {
System.err.format("消息未确认. Sequence number: %d, multiple: %b%n",deliveryTag, multiple);
});
}
}
4. 总结
在某些应用程序中,确保将发布的消息发送给代理是至关重要的。Publisher确认有助于满足此要求的RabbitMQ功能。Publisher确认本质上是异步的,但也可以同步处理它们。没有明确的方法来实现publisher确认,这通常归结为应用程序和整个系统中的约束。典型技术包括
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,同步等待批处理的确认:简单、合理的吞吐量,但很难判断何时出错。
- 异步处理:最佳性能和资源利用率,在发生错误时进行良好控制,但需要正确处理。