首页 > 其他分享 >9. RabbitMQ系列之消息发布确认

9. RabbitMQ系列之消息发布确认

时间:2022-10-15 20:12:42浏览次数:84  
标签:RabbitTemplate connectionFactory 系列 确认 CachingConnectionFactory simpleRabbitTemp

Publisher Confirms发布确认是用于实现可靠发布的RabbitMQ扩展。
我们将使用发布确认来确保已发布的消息已安全到达代理。我们将介绍几种使用publisher确认的策略,并解释其优缺点

首先检查application.yml文件

spring:
  rabbitmq:
    host: 127.0.0.1
    # 之前博客未加端口,此处新增
    port: 5672
    username: guest
    password: guest
    virtualHost: /
1. 单独发布消息
  • 新增配置文件PublishConfirmConfig.java
@Configuration
public class PublishConfirmConfig {

    @Bean("myRabbitConnectionFactory")
    public ConnectionFactory myRabbitConnectionFactory(RabbitProperties rabbitProperties){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(rabbitProperties.getHost());
        cachingConnectionFactory.setPort(rabbitProperties.getPort());
        cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
        cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
        cachingConnectionFactory.setVirtualHost("/");
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitTemplate simpleRabbitTemplate(ConnectionFactory myRabbitConnectionFactory) {
        CachingConnectionFactory connectionFactory = (CachingConnectionFactory) myRabbitConnectionFactory;
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
        connectionFactory.setPublisherReturns(true);
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
  • 新增发送文件PublishConfirmSender.java
@Component
public class PublishConfirmSender {

    private RabbitTemplate simpleRabbitTemplate;

    public PublishConfirmSender(RabbitTemplate simpleRabbitTemplate) {
        this.simpleRabbitTemplate = simpleRabbitTemplate;
    }

    public void oneSender() {
        boolean sendFlag = simpleRabbitTemplate.invoke(operations -> {
            simpleRabbitTemplate.convertAndSend("direct", "orange", "orange msg");
            return simpleRabbitTemplate.waitForConfirms(5000);
        });
        if (sendFlag) {
            System.out.println("消息已成功发送");
        }
    }
}

  • 测试发送
@SpringBootTest
public class RabbitTest {
    @Autowired
    private PublishConfirmSender publishConfirmSender;

    @Test
    public void testOneSender() {
        publishConfirmSender.oneSender();
    }
}

1

2. 批量消息发布确认
@Component
public class PublishConfirmSender {

    ............

    public void batchSender() {
        boolean sendFlag = simpleRabbitTemplate.invoke(operations -> {
            for (int i = 0; i < 50; i++) {
                simpleRabbitTemplate.convertAndSend("direct", "orange", "orange " + i + "msg");
                if (i % 10 == 0) {
                    if (simpleRabbitTemplate.waitForConfirms(5000)) {
                        System.out.println(i / 10 + "批次消息已全部成功发送");
                    }
                }
            }
            return simpleRabbitTemplate.waitForConfirms(5000);
        });
        if (sendFlag) {
            System.out.println("消息已全部成功发送");
        }
    }
}

2

3. 发布服务器异步确认
@Component
public class PublishConfirmSender {
	......
    @Bean
    @Primary
    public RabbitTemplate asyncRabbitTemplate(ConnectionFactory myRabbitConnectionFactory) {
        CachingConnectionFactory connectionFactory = (CachingConnectionFactory) myRabbitConnectionFactory;
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(true);
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }
}
@Component
public class PublishConfirmSender {
     public void asyncSender() {
        asyncRabbitTemplate.invoke(operations -> {
            for (int i = 0; i < 50; i++) {
                String body = "orange " + i + "msg";
                simpleRabbitTemplate.convertAndSend("direct", "orange", body);
            }
            return null;
        }, (deliveryTag, multiple) -> {
            System.out.format("消息已确认. Sequence number: %d, multiple: %b%n", deliveryTag, multiple);
        }, (deliveryTag, multiple) -> {
            System.err.format("消息未确认. Sequence number: %d, multiple: %b%n",deliveryTag, multiple);
        });
    }
}

3

4. 总结

在某些应用程序中,确保将发布的消息发送给代理是至关重要的。Publisher确认有助于满足此要求的RabbitMQ功能。Publisher确认本质上是异步的,但也可以同步处理它们。没有明确的方法来实现publisher确认,这通常归结为应用程序和整个系统中的约束。典型技术包括

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,同步等待批处理的确认:简单、合理的吞吐量,但很难判断何时出错。
  • 异步处理:最佳性能和资源利用率,在发生错误时进行良好控制,但需要正确处理。

欢迎关注公众号算法小生沈健的技术博客

标签:RabbitTemplate,connectionFactory,系列,确认,CachingConnectionFactory,simpleRabbitTemp
From: https://www.cnblogs.com/shenjian-online/p/16794929.html

相关文章