首页 > 其他分享 >业务幂等性技术——3

业务幂等性技术——3

时间:2024-12-19 14:31:52浏览次数:9  
标签:String 业务 技术 重试 消息 messageId println out

4、消息幂等性

在系统中当使用消息队列时,无论做哪种技术选型,有很多问题是无论如何也不能忽视的,如:消息必达、消息幂等等。本章节以典型的RabbitMQ为例,讲解如何保证消息幂等的可实施解决方案,其他MQ选型均可参考。

4.1 消息重试演示

消息队列的消息幂等性,主要是由MQ重试机制引起的。因为消息生产者将消息发送到MQ-Server后,MQ-Server会将消息推送到具体的消息消费者。假设由于网络抖动或出现异常时,MQ-Server根据重试机制就会将消息重新向消息消费者推送,造成消息消费者多次收到相同消息,造成数据不一致。
在这里插入图片描述
在RabbitMQ中,消息重试机制是默认开启的,但只会在consumer出现异常时,才会重复推送。在使用中,异常的出现有可能是由于消费方又去调用第三方接口,由于网络抖动而造成异常,但是这个异常有可能是暂时的。所以当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。

数据补偿方案:当重试多次后仍然出现异常,则让此条消息进入死信队列,最终进入到数据库中,接着设置定时job查询这些数据,进行手动补偿。

本节中以consumer消费异常为演示主体,因此需要修改RabbitMQ配置文件。

  1. 修改consumer一方的配置文件
    # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000
  1. 设置消费异常
    当consumer消息监听类中添加异常,最终接受消息时,可以发现,消息在接收五次后,最终出现异常。

4.2 消息幂等解决

要保证消息幂等性的话,其实最终要解决的就是保证多次操作,造成的影响是相同的。那么其解决方案的思路与服务间幂等的思路其实基本都是一致的。

  1. 消息防重表,解决思路与服务间幂等的防重表一致。
  2. redis。利用redis防重。

这两种方案是最常见的解决方案。其实现思路其实都是一致的。
在这里插入图片描述

4.2.1 修改OrderController

    /**
     * 此处为了方便演示,不做基础添加数据库操作
     */
    @PostMapping("/addOrder")
    public String addOrder(){
        String uniqueKey = String.valueOf(idWorker.nextId());
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(uniqueKey);
        messageProperties.setContentType("text/plain");
        messageProperties.setContentEncoding("utf8");
        Message message = new Message("1271700536000909313".getBytes(),messageProperties);
        rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message);
        return "success";
    }

4.2.2 修改stockApplication

    @Bean
    public JedisPool jedisPool(){
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        return new JedisPool(poolConfig, "192.168.150.4",6379, 2000, "");
    }

4.2.3 新增消息监听类

/**
 * @author xinlei
 * @date 2024/12/16
 */
@Component
public class ReduceStockListener {
    @Autowired
    private StockService stockService;
    @Autowired
    private JedisPool jedisPool;
    @Autowired
    private StockFlowService stockFlowService;

    @RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE)
    @Transactional
    public void receiveMessage(Message message) {
        // 获取消息id
        String messageId = message.getMessageProperties().getMessageId();
        Jedis jedis = jedisPool.getResource();
        System.out.println(messageId);
        try {
            // redis锁去重校验
            if (!"OK".equals(jedis.set(messageId, messageId, "NX", "PX", 300000))) {
                System.out.println("重复请求");
                return;
            }
            // mysql状态校验
            if (!(stockFlowService.findByFlag(messageId).size() == 0)) {
                System.out.println("数据已处 理");
                return;
            }
            String goodsId = null;
            try {
                // 获取消息体中goodsId
                goodsId = new String(message.getBody(), "utf-8");
                stockService.reduceStock(goodsId, messageId)
                ;
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            int nextInt = new Random().nextInt(100);
            System.out.println("随机数:" + nextInt);
            if (nextInt % 2 == 0) {
                int i = 1 / 0;
            }
        } catch (RuntimeException e) {
            // 解锁
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId));
            System.out.println("出现异常了");
            System.out.println(messageId + ":释放锁");
            throw e;
        }
    }
}

4.3 消息缓冲区

对于RabbitMQ的使用,默认情况下,每条消息都会进行分别的ack通知,消费完一条后,再来消费下一条。但是这样就会造成大量消息的阻塞情况。所以为了提升消费者对于消息的消费速度,可以增加consumer数据或者对消息进行批量消费。MQ接收到producer发送的消息后,不会直接推送给consumer。而是积攒到一定数量后,再进行消息的发送。这种方式的实现,可以理解为是一种缓冲区的实现,提升了消息的消费速度,但是会在一定程度上舍弃结果返回的实时性。

对于批量消费来说,也是需要考虑幂等的。对于幂等性的解决方案,沿用刚才的思路即可解决。

标签:String,业务,技术,重试,消息,messageId,println,out
From: https://blog.csdn.net/qq_54698124/article/details/144506860

相关文章

  • 大数据 | 数据存储技术与应用深度解析,HDFS/ Kudu/ 云对象存储/ NoSQL数据库,及数据虚拟
    一、大数据存储的基本需求要理解大数据存储技术的发展方向,首先需要明确大数据存储面临的挑战和需求。1.数据规模:高扩展性大数据通常以TB、PB甚至EB为单位进行计算,因此,数据存储系统需要具备高扩展性,能够随着数据量的增长快速增加存储容量。例如,传感器网络、社交媒体、或......
  • 大数据 | 数据采集与输入核心技术与应用深度解析,Flume/ NiFi/ Kafka/ Sqoop/ Denodo区
    大数据处理的第一步是数据的采集与输入。无论是用于批量处理还是实时分析,数据的采集过程都是整个大数据生命周期的基础。数据采集不仅需要高效、实时地从不同来源获取数据,还需要能够处理各种形式的数据,确保其质量和一致性。采集的数据通常来自多个源头,例如传感器数据、系统日志......
  • 容器化技术全面解析:Docker 与 Containerd 的深入解读
    目录Docker简介1.什么是Docker?2.Docker的核心组件3.Docker的主要功能4.Docker的优点5.Docker的使用场景Containerd简介1.什么是Containerd?2.Containerd的核心特性3.Containerd的架构4.Containerd与Docker的关系5.Containerd的优点6.Con......
  • Windows Server 2022 集群服务器技术提供了一种可靠的方式来提高业务连续性、增强系统
    WindowsServer2022集群服务器简介什么是WindowsServer2022集群服务器?WindowsServer2022集群服务器是一种由多个物理或虚拟服务器组成的系统,这些服务器通过网络连接形成一个群集(Cluster)。群集中的服务器协同工作,共同提供高可用性、负载均衡、灾难恢复等功能。WindowsS......
  • 超绝!基站/Wi-Fi/GPS定位技术详解与应用示例
    今天特别分享定位相关示例,欢迎大家一起来探讨。一、基站/Wi-Fi/GPS定位概述1.1基站定位原理基站定位也就是“LBS定位”,全称是LocationBasedService,它包括两层含义:首先是确定移动设备或用户所在的地理位置;其次是提供与位置相关的各类信息服务。意指与定位相关的各类服务系......
  • 从零开始学习黑客技术,看这一篇就够了
    ......
  • 从零开始学习黑客技术,看这一篇就够了
    ......
  • 20222404 2024-2025-2 《网络与系统攻防技术》实验八实验报告
    1.实验内容(1)Web前端HTML能正常安装、启停Apache。理解HTML,理解表单,理解GET与POST方法,编写一个含有表单的HTML。(2)Web前端javascipt理解JavaScript的基本功能,理解DOM。在(1)的基础上,编写JavaScript验证用户名、密码的规则。在用户点击登陆按钮后回显“欢迎+输入的用户名”尝......
  • 京准科技:电厂DCS自控NTP时钟同步服务器技术方案
    京准科技:电厂DCS自控NTP时钟同步服务器技术方案京准科技:电厂DCS自控NTP时钟同步服务器技术方案京准电子科技官微——ahjzsz随着计算机和网络通信技术的飞速发展,火电厂热工自动化系统数字化、网络化的时代已经到来。一方面它为控制和信息系统之间的数据交换、分析和应用提供了更......
  • 基于Halcon的图像拼接技术
    图像拼接一般分为硬拼接与软件拼接,硬拼接相对简单,将多幅图像按照指定的方式直接组合在一起,形成一个大的图像。在Halcon中,可以使用tile_images或tile_images_offset等算子来实现硬拼接。这里先将硬拼接讲解。常规步骤:1、读取图像:使用read_image算子读取需要拼接的图像。......