首页 > 其他分享 >RabbitMq消息的百分百投递

RabbitMq消息的百分百投递

时间:2022-10-31 21:02:45浏览次数:65  
标签:百分百 应答 RabbitMq gmallCorrelationData 消息 test 投递 where id

RabbitMq消息的百分百投递

​ 在RabbitMq作为消息中间件的时候,存在消息丢失的情况;在大部分业务中是不允许的;

尤其是在使用中间件来保证数据的最终一致性时,消息丢失则无法保证消息的最终一致性;

下图展示了消息丢失的三种情况

所以消息的百分百投递就必须在这三个位置消息不发生丢失

第一处位置:生产者到交换机

通过confirmCallback

生产者投递消息后,如果exchang收到消息后,会给生产者⼀个ACK。生产者通过ACK,可以确认这条消息是否正常发送到exchang,这种方式是消息可靠性投递的核心

#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type: CORRELATED

代码实现

@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback,
                                             RabbitTemplate.ReturnCallback{
   @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RedisTemplate redisTemplate;
    @PostConstruct
    public void MQProducerAckConfig(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    //交换机应答方法     成功或失败都应答
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //每次发消息 此方法都会被执行
        if(ack){
            log.info("交换机成功应答:" + cause);
        }else{
            log.error("交换机失败应答:失败的原因:" + cause);
            //重新发送消息
            retrySendMessage(correlationData);
        }
    }
                                                  
    //重新发消息
    private void retrySendMessage(CorrelationData correlationData) {
        //附加对象
        GmallCorrelationData gmallCorrelationData =
                (GmallCorrelationData)correlationData;
        //判断重新发送几次了
        int retryCount = gmallCorrelationData.getRetryCount();
        if(retryCount < 3){
            retryCount++;
            gmallCorrelationData.setRetryCount(retryCount);
            rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange()
            ,gmallCorrelationData.getRoutingKey(),
                    gmallCorrelationData.getMessage(),gmallCorrelationData);
            log.info("交换机重新发送消息:第" + retryCount + "次数");
        }else{
            // 打印日志 记录  重试了2次 还是发不出去

        }

    }
 }

第二处位置:由交换机到队列

//队列应答 方法  只有失败了才应答
@Override
public void returnedMessage(Message message, int replyCode,
                            String replyText, String exchange, String routingKey) {
    log.error("队列失败应答:消息:" + new String(message.getBody()));
    log.error("队列失败应答:失败码:" + replyCode);
    log.error("队列失败应答:失败消息:" + replyText);
    log.error("队列失败应答:交换机:" + exchange);
    log.error("队列失败应答:routingKey:" + routingKey);

    Object uuid = message.getMessageProperties().getHeader("spring_returned_message_correlation");
    String json = (String) redisTemplate.opsForValue().get(uuid);
    GmallCorrelationData gmallCorrelationData =
            JSONObject.parseObject(json, GmallCorrelationData.class);
    //重新发送
    retrySendMessage(gmallCorrelationData);

由于要保证消息的百分百投递,所以必须要进行消息失败重发

就有可能导致消息被重复消费----->也就必须要保证消息的幂等性

消息的幂等性:

幂等性:对一个操作,请求一次和请求多次,所产生的结果是一样的,这叫幂等性。

哪些情况需要判断幂等
1、多次点击某一个按钮
2、回退之后再次点击
3、Feign的远程调用失败,并且使用的是Feign的重试机制
4、消息重复消费
5、其它业务

哪些操作出现幂等
1、select操作
select * from test where id=1;//幂等性
select * from test where id>1;//可能满足也可能不满足
2、delete删除
delete from test where id=1;//幂等性
delete from test where id>1;//可能满足也可能不满足
3、update操作
update test set c=1 where id=1;//幂等性
update test set c=c+1 where id=1;//一定不满足
4、insert操作
insert into test(userId,userName) values(100,'zs');
假若userId唯一索引或者主键索引:满足===数据库层面约束
假若userId不是唯一索引或者主键索引:不满足

保证消息幂等性的方法:

1、Token机制====检验订单重复提交
1.1在执行业务前先在服务端生成一个Token(自定义规则生成),第一份存到容器中【Redis】,第二份返回前端
1.2前端得到后端返回的token,一般将其存放到cookie中或者请求头中
1.3在执行业务的时候,获取到前端携带的token和容器中的token做比较
1.3若相等,则代表是第一次提交,接着执行业务,最后将该Token从容器中删除
1.4若不相等,则证明不是第一次提交,就不能执行业务,返回错误消息给前端

 Token机制的危险性:
 1、获取、判断和删除要保证原子性。====lua脚本
 2、删除Token的顺序
   2.1先判断====执行业务====删除Token【项目中采用的这个】
   2.2先判断====删除Token===执行业务 【建议用下面方式】

2、各种锁机制
1、悲观锁
select * from test where id =1 for update; //写锁
update
insert
delete
select * from test where id =1 lock in share mode;//加读锁
注意:查询条件的id要是主键或者唯一索引,不然就可能锁表的情况。

2、乐观锁
实现就是加version
update stock set count=count-1,version=version+1 where oderId=1 and version=1;

   场景:读多写少

​ 3、分布式锁
Redis:setnx
​ 同一时间可能多个服务【不同机器】都去修改同一条数据,使用分布锁
​ MySQL
​ Redis
​ Zk

3、各种唯一约束【数据库层面】
主键约束或者唯一键约束
insert into test(userId,userName) values(100,'zs');====注意【主键不能自增】
因此注意在分库分表的时候,同一个请求根据路由计算之后,业务请求要落到同一个库同一张表。

​ 2、Redis===set防重

​ 3、防重表结合事务使用
​ 除开设计业务表之外,在建一个防重表。

第三处位置:

从对列到消费者的位置必须要保证是手动签收

//配置文件
spring:
   rabbitmq:
     host: 192.168.18.230
     port: 5672
     username: admin
     password: 123
     listener:
       simple:
         acknowledge-mode: manual//设置手动签收
//手动签收对应的方法
basicAck 同意签收 支持批量,设置入参mutiple为true
basicReject 拒绝签收,不支持批量,支持是否重新入队,设置入参requeue为true
basicNack 拒绝签收,支持批量,支持是否重新入队,设置入参requeue为true

以及保证消息的持久化--->https://www.cnblogs.com/xuzhidong/p/16845459.html

标签:百分百,应答,RabbitMq,gmallCorrelationData,消息,test,投递,where,id
From: https://www.cnblogs.com/xuzhidong/p/16845761.html

相关文章

  • RabbitMq
    RabbitMq什么是MQ:是一种存放消息的队列;还是一种跨进程的通讯机制,用于两个微服务之间的消息通讯;消息中间件作用于分布式系统之间的通讯且必须是异步处理的场景提......
  • RabbitMQ
    我这边的使用场景:多名用户进行同一操作。(具体一点:我这边的系统哦,数据库用的是mysql,操作是导入excel,可以这么说百分之六十的页面都有批量导入,而且一个页面不止一个批量导入......
  • 某 .NET RabbitMQ SDK 有采集行为,你怎么看?
    一:背景1.讲故事前几天有位朋友在微信上找到我,说他的一个程序上了生产之后,被运维监控定位到这个程序会向一个网址为:http://m.365ey.net上不定期打数据,而且还是加密的格......
  • Spring Cloud Stream RabbitMQ消费消息失败
    丢弃消息这种场景生产环境应该不怎么用。入死信队列cloud:stream:bindings:hking:destination:hkingcontent-type:application/j......
  • .Net Core&RabbitMQ限制循环消费
    前言当消费者端接收消息处理业务时,如果出现异常或是拒收消息将消息又变更为等待投递再次推送给消费者,这样一来,则形成循环的条件。循环场景生产者发送100条消息到Rabbit......
  • RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群
    一、为什么要搭建RabbitMQ集群?未部署集群的缺点如果RabbitMQ集群只有一个broker节点,那么该节点的失效将导致整个服务临时性的不可用,并且可能会导致message的丢失(尤其是在非......
  • Python RabbitMQ pika的安装及direct路由模式的使用
    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有......
  • java-rabbitmq-官网实例02
    java-rabbitmq-官网实例02描述:  1.定义持久化队列,发送持久化消息,消息接受者需要手动应答,MQ才会删除队列中的消息 2.使用channel.basicQos......
  • java-rabbitmq-官网实例01
    java-rabbitmq-官网实例01描述:最简单实例,使用非持久化队列,生产者发布消息,MQ将消息推送给消费者消费,之后MQ在队列中删除该消息依次运行:D1_Send.......
  • java-rabbitmq-交换机介绍
    java-rabbitmq-交换机介绍RabbitMQ的四种交换机直连交换机:Directexchange扇出交换机:Fanoutexchange主题交换机:Topicexchange首部交换机:H......