首页 > 其他分享 >多级缓存同步更新操作的优化策略——MQ

多级缓存同步更新操作的优化策略——MQ

时间:2024-06-07 19:29:06浏览次数:27  
标签:缓存 String 多级 syncInfo MQ import data id

多级缓存同步更新操作的优化策略——MQ

文章部分内容参考京东技术文章,在文章的基础上添加自己的见解,如有侵权请告知删除

https://mp.weixin.qq.com/s/2SQP1c-zMWdkIu4XRnBV0A

参考资料:

R2M分布式锁原理——https://www.cnblogs.com/Jcloud/p/17098240.html

技术背景

在像商城板块一样的情景里面,大型活动会在短时间内容易产生超出预期的流量,这意味着页面的热点数据需要超高频率地存取,这就要求系统符合高性能、高并发、高可用的属性,在单纯添加机器效益较低和中间件等依赖服务已经遇到瓶颈的时候,就需要有使用以下的方法来解决。

高并发解决方案

限流

限流的多少主要按照以下的顺序进行考虑:

  1. 进行限流后对用户体验的影响多少
  2. 活动产生的数据量
  3. 限流之后对系统架构的复杂度和成本的影响

当然,限流的多少基于历史数据进行判定会让限流拥有更高的效率

重缓存(多级缓存)

多级缓存架构

本地缓存的构建的两种方式:

  • 一级缓存:本地+mysql+推模式更新。
  • 两级缓存:本地+r2m+mysql+推模式更新(这里面的R2M是指分布式缓存中间件Redis Cluster)

重点策略概要

结合MQ的推模式更新实现多级存储的同步

采用MQ的原因:

原先直接顺序实现多级缓存的每一级同步工作,存在一些弊端,如果本地缓存机器或者Redis机器down机了,那么往后的缓存同步操作就要进行等待,这意味着同一个时刻如果有多个更新操作产生,使用了锁机制来解决并发带来的超出限定的问题,但是这个时候还是会出现多个同步操作被阻塞的问题,迟迟不能返回结果,那么与同步操作挂钩的操作可能就会因此无法继续,在高并发情境下极容易出现系统无法处理短时间内过多的请求而down机的问题

采用推模式更新的原因:

数据从一个数据源主动地推送到缓存中,而不是由缓存去拉取数据,这种模式能确保缓存中的数据在更新时立即得到同步,减少了缓存和数据源之间的延迟和不一致性

需要使用的相关技术:

数据库触发器、消息变化数据表、MQ

触发器+MQ实现推模式更新过程:
  1. 创建消息变化数据表

    CREATE TABLE message_queue (
        id INT AUTO_INCREMENT PRIMARY KEY,
        message JSON,
        processed BOOLEAN DEFAULT FALSE
    );
    

    [!TIP]

    1. 消息需要是JSON类型的,里面包含数据项:operationcontent
    2. 需要有一个状态字段作为判定是否已经发出这个消息
  2. 使用触发器检测存储对应数据的表,如果数据有变化就写入消息队列表

    DELIMITER //			-- 修改SQL语句的结束符为//,这样子可以防止在触发器体内使用END语句时,MySQL误认为语句结束
    
    CREATE TRIGGER after_insert_update_delete
    AFTER INSERT OR UPDATE OR DELETE ON your_table
    FOR EACH ROW			-- 指定触发器对表中的每一行的插入、更新或者删除操作生效
    BEGIN
        DECLARE msg JSON;	-- 声明一个变量为JSON类型
        IF (NEW.id IS NOT NULL) THEN
            SET msg = JSON_OBJECT('operation', 'INSERT/UPDATE', 'id', NEW.id, 'data', NEW.data);
        ELSE
            SET msg = JSON_OBJECT('operation', 'DELETE', 'id', OLD.id);
        END IF;
        INSERT INTO message_queue (message) VALUES (msg);
    END;
    //
    
    DELIMITER ;
    
    
    
    -- 插入
    DELIMITER //
    
    CREATE TRIGGER after_insert_seckill_sku
    AFTER INSERT ON SECKILL_SKU
    FOR EACH ROW
    BEGIN
        DECLARE msg JSON;
        SET msg = JSON_OBJECT('operation', 'INSERT', 'id', NEW.id, 'data', NEW.seckillStock);
        INSERT INTO MESSAGE_QUEUE (message) VALUES (msg);
    END;
    //
    
    DELIMITER ;
    
    
    
    
    
    -- 更新
    DELIMITER //
    
    CREATE TRIGGER after_update_seckill_sku
    AFTER UPDATE ON SECKILL_SKU
    FOR EACH ROW
    BEGIN
        DECLARE msg JSON;
        SET msg = JSON_OBJECT('operation', 'UPDATE', 'id', NEW.id, 'data', NEW.seckillStock);
        INSERT INTO MESSAGE_QUEUE (message) VALUES (msg);
    END;
    //
    
    DELIMITER ;
    
    
    
    
    
    
    
    
    -- 删除
    DELIMITER //
    
    CREATE TRIGGER after_delete_seckill_sku
    AFTER DELETE ON SECKILL_SKU
    FOR EACH ROW
    BEGIN
        DECLARE msg JSON;
        SET msg = JSON_OBJECT('operation', 'DELETE', 'id', OLD.id);
        INSERT INTO MESSAGE_QUEUE (message) VALUES (msg);
    END;
    //
    
    DELIMITER ;
    
    

    [!TIP]

    1. 这里面的your_table是存储热点信息的表
    2. 如果是插入或更新操作,创建一个JSON对象,包含操作类型 'INSERT/UPDATE',新行的id和数据NEW.data
      如果是删除操作,创建一个JSON对象,包含操作类型 'DELETE' 和被删除行的 id
    3. NEWOLD 是 MySQL 触发器中使用的两个特殊关键字,它们用于引用在触发器中触发的行的新值和旧值,对于这里面的NEW.data对应的是新插入或者更新的行的data字段,所以自己可以根据自己设计的表字段来添加这里面的数据项
    4. MySQL需要为每种事件分别定义触发器,所以不能一次性为三种操作进行添加触发器
  3. 写一个Java类,采用定时任务专门监听消息变化数据表的变化,然后将其作为消息利用Kafka发送出来

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class SyncInfoProducer {
    
        @Resource
        private KafkaTemplate<String,String> kafkaTemplate;
    
        public void sendMessage(String topic, String message) {
            kafkaTemplate.send(topic, message);
        }
    }
    
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    
    import javax.sql.DataSource;
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.util.ArrayList;
    import java.util.List;
    
    @Component
    public class DatabaseChangeListener {
    
        @Autowired
        private DataSource dataSource;
    
        @Autowired
        private SyncInfoProducer syncInfoProducer;
    
        @Scheduled(fixedDelay = 5000) // 每5秒检查一次数据库变化,结合系统的情况自行自定义就好
        @Transactional
        public void checkDbChanges() {
            List<Integer> processedIds = new ArrayList<>();
            try (Connection connection = dataSource.getConnection();
                 PreparedStatement ps = connection.prepareStatement("SELECT id, message FROM logisticOrder.MESSAGE_QUEUE WHERE processed = FALSE");                            // 查询所有未发送的消息出来
                 ResultSet rs = ps.executeQuery()) {
                while (rs.next()) {
                    int id = rs.getInt("id");
                    String message = rs.getString("message");
                    syncInfoProducer.sendMessage("cache_updates", message);
                    processedIds.add(id);	// 对所有发送的消息的id存储进List里面
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            markMessagesAsProcessed(processedIds);
        }
    
        private void markMessagesAsProcessed(List<Integer> ids) {
            if (ids.isEmpty()) return;
            String idList = ids.toString().replaceAll("[\\[\\]]", "");
            String updateQuery = "UPDATE message_queue SET processed = TRUE WHERE id IN (" + idList + ")";
            try (Connection connection = dataSource.getConnection();
                 PreparedStatement ps = connection.prepareStatement(updateQuery)) {
                ps.executeUpdate();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    [!TIP]

    1. markMessagesAsProcessed()方法的说明

      String idList = ids.toString().replaceAll("[\\[\\]]", "");
      
      • ids.toString():将ID列表转换为字符串形式,结果类似于"[1, 2, 3]"

      • .replaceAll("[\\[\\]]", ""):使用正则表达式移除字符串中的方括号,结果变为 "1, 2, 3"。这一步是为了生成适用于SQL IN 子句的格式。

    2. 上面的很多都用到了PreparaedStatement及其对应各种操作的执行方法

  4. 监听方法接收到JSON格式的消息之后就要获得里面的操作类型字段,从而对Redis和LocalCache进行对应的操作

    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Component
    public class CacheUpdateConsumer {
    
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
        private Map<String, JsonNode> localCache = new HashMap<>();
    
        @KafkaListener(topics = "cache_updates", groupId = "cache-update-group")
        public void receiveMessage(String message) {
            try {
                ObjectMapper mapper = new ObjectMapper();
                JsonNode jsonMessage = mapper.readTree(message);
    
                String operation = jsonMessage.get("operation").asText();
                String id = jsonMessage.get("id").asText();
    
                switch (operation) {
                    case "INSERT":
                    case "UPDATE":
                        JsonNode data = jsonMessage.get("data");	// 这里的data也是自己在发消息的时候设置的数据项名字
                        // 更新Redis缓存
                        stringRedisTemplate.opsForValue().set("your_key_prefix:" + id, data.toString());
                        // 更新本地缓存
                        localCache.put(id, data);
                        break;
                    case "DELETE":
                        // 删除Redis缓存
                        stringRedisTemplate.delete("your_key_prefix:" + id);
                        // 删除本地缓存
                        localCache.remove(id);
                        break;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public JsonNode getFromLocalCache(String id) {
            return localCache.get(id);
        }
    }
    
    

    [!TIP]

    1. 这里面的your_key_prefix前缀要注意一致性

以上方法由于我个人的数据库添加触发器之后出现解决不了的堆栈溢出问题,所以实际上本人没有使用触发器的方法,而是直接在业务层直接实现异步进行同步操作

业务层MQ实现异步处理更新操作
一些必备的功能

由于我的数据采用三级存储结构来存储的,所以我写了一个方法专门从三级存储结构里面获得数据,按照顺序:一级本地缓存->二级Redis缓存->三级数据库

public String getDataFrom_3Storage(String cache_name, String local_keyname, String redis_keyname,String databaseOP, int skuId, boolean isFramework){
    Collection<Cache> cacheCollection = cacheManager.getCache(cache_name);      // 直接getCache的话返回的是一个集合Collection
    String data_asString = null;
    for(Cache cache:cacheCollection){
        // 只需要拿一次就够了,因为Cache里面可以有同名的,但是我们为了避免混乱就避免缓存名重复
        data_asString = cache.get(local_keyname, String.class);
        break;
    }

    if(data_asString!=null){
        log.info("______从本地缓存中获得{}数据:{}______",databaseOP,data_asString);
        return data_asString;     // 使用了Layering-cache框架的注解存储Redis缓存的时候由于序列化器的原因会多一个双引号
    }
    else {
        // 从Redis里面获得数据
        data_asString = stringRedisTemplate.boundValueOps(redis_keyname).get();
        if(isFramework){
            data_asString = data_asString.substring(1,data_asString.length()-1);
        }
        if(data_asString !=null){
            log.info("______从Redis缓存中获得{}数据:{}______",databaseOP,data_asString);
            return data_asString;
        }
        // 从数据库里面获得数据
        else {
            if(Objects.equals(databaseOP, "STOCK")){
                data_asString = String.valueOf(seckillSkuMapper.selectStockBySkuId(skuId));
            }
            else if(Objects.equals(databaseOP, "LIMIT")){
                data_asString = String.valueOf(seckillSkuMapper.selectLimitBySkuId(skuId));
            }
            log.info("______从数据库中获得{}数据:{}______",databaseOP,data_asString);
            return data_asString;
        }
    }
}
三级存储结构的异步更新操作

先更新一级缓存,之所以三级里面选择本地缓存优先更新,主要是因为本系统里面的数据获取顺序是先本地缓存再Redis缓存再数据库,提前更新好本地缓存能最大可能地减少数据未更新的情况出现

@Transactional
public boolean syncUpdateData_3Storage(String cache_name,String local_keyname,String redis_keyname,String databaseOP,int change,int skuId,boolean isFramework){
    // 本地缓存的同步更新
    Collection<Cache> cacheCollection = cacheManager.getCache(cache_name);
    for(Cache cache:cacheCollection){
        String old_LocalValue = cache.get(local_keyname,String.class);
        if(old_LocalValue == null){
            log.error("^^^^^^本地缓存同步更新失败^^^^^^");
            return false;
        }
        String new_LocalValue = String.valueOf(Integer.parseInt(old_LocalValue)+change);
        cache.evict(local_keyname);
        cache.put(local_keyname,new_LocalValue);
        break;
    }   // 执行一遍就完成

    SyncInfo syncInfo = new SyncInfo();
    syncInfo.setCacheName(cache_name);
    syncInfo.setLocalKeyName(local_keyname);
    syncInfo.setRedisKeyName(redis_keyname);
    syncInfo.setDatabaseOP(databaseOP);
    syncInfo.setChange(change);
    syncInfo.setSkuId(skuId);
    syncInfo.setFramework(isFramework);
    log.info("开始同步Redis缓存与数据库");
    kafkaTemplate.executeInTransaction(kafkaOperations -> {
        kafkaOperations.send("sync-Redis-DB",JSONObject.toJSONString(syncInfo));
        return true;
    });
    return true;
}

注意点:

  1. Layering-Cache多级缓存框架的getCache方法返回类型为Collection<Cache>
  2. 使用@Transactional注解结合KafkaTemplate的executeTransactional方法实现事务功能,这个很必要,因为消息队列很大的一个主题就是数据一致性问题的处理
@Component
@Slf4j
public class Sync3StorageUtils {

    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private SeckillRedisCaffeineUtils seckillRedisCaffeineUtils;
    @Resource
    private SeckillSkuMapper seckillSkuMapper;

    @KafkaListener(topics = "sync-Redis-DB")
    public void sync_execute(ConsumerRecord<String,String> record){
        SyncInfo syncInfo = JSONObject.parseObject(record.value(), SyncInfo.class);
        Long surplus_expiration = stringRedisTemplate.boundValueOps(syncInfo.getRedisKeyName()).getExpire(); // 剩余过期时间
        String old_RedisValue = seckillRedisCaffeineUtils.getDataFrom_3Storage(syncInfo.getCacheName()
                , syncInfo.getLocalKeyName()
                , syncInfo.getRedisKeyName()
                , syncInfo.getDatabaseOP()
                , syncInfo.getSkuId()
                , syncInfo.isFramework()); // 也从三级存储空间拿数据
        if(old_RedisValue == null || surplus_expiration == null){
            log.error("^^^^^^Redis缓存同步更新失败^^^^^^");
            return;
        }
        String new_RedisValue = String.valueOf(Integer.parseInt(old_RedisValue)+ syncInfo.getChange());
        stringRedisTemplate.boundValueOps(syncInfo.getRedisKeyName() ).set(new_RedisValue,surplus_expiration, TimeUnit.SECONDS);

        // 数据库信息的同步更新,限购不需要更新,只有库存量需要同步更新
        if(Objects.equals(syncInfo.getDatabaseOP(), "STOCK")){
            seckillSkuMapper.updateStock(syncInfo.getSkuId());
        }
    }
}

注意点:

  1. 这里同步Redis的时候,如果不考虑剩余的刷新时间,那么直接重置过期时间不合理的时候可能会出现Redis堆积过多本应该淘汰的缓存,所以直接先从Redis里面拿过期时间

原先是将三级存储的更新操作采用顺序实现,改为异步处理之后,接口也丝滑了不少

之后我会把我的购买接口写一篇博客,这个接口以多级缓存为底层,实现包含限购、校验、同步、存取等功能

上面的内容是本人自己的见解得来,难免有不够合理的地方,也欢迎读者在评论区予以指正!

标签:缓存,String,多级,syncInfo,MQ,import,data,id
From: https://blog.csdn.net/m0_73500407/article/details/139534014

相关文章

  • vue 连接mqtt
    下载mqtt服务:npminstallmqttconstmqttConfig={//你的MQTT服务器配置protocolId:'MQTT',protocolVersion:4,clean:true,clientId:'xxxx',reconnectPeriod:1000,connectTimeout:60*1000,//will:{//topic:�......
  • MQTTX使用
    windows10-EMQX安装及配置使用教程一、下载安装1.1下载1.2安装1.3设置开机自启动二、连接MQTT2.1MQTT下载安装2.1.1下载2.1.2安装及配置三、EMQX常用命令  本文介绍的是在windows10系统下的emqx的安装、配置及使用教程。一、下载安装1.1下载下载链接:emqx官网-版本......
  • java mqtt自动重连注意点
    1、在使用Java的 org.eclipse.paho.client.mqttv3 MQTT客户端库时,options.setAutomaticReconnect(false) 的设置是用来指定在连接丢失后,客户端是否应该自动尝试重新连接。将其设置为 false 意味着如果连接丢失,客户端不会自动尝试重新连接。然而,即使设置了自动重连为 fa......
  • 使用Redis优化Java应用的性能——高效的缓存实践
    引言:在现代应用开发中,高效的缓存策略是提升性能和用户体验的关键。Redis作为一个高性能的键值存储系统,提供了一种快速存取数据的方式,帮助Java应用处理大量动态信息而无需频繁查询数据库。什么是Redis?Redis是一个开源的键值存储系统,它支持多种类型的数据结构如字符串、哈希、......
  • 谈谈Redis缓存中MySQL的数据如何与Redis同步
    在现代应用程序中,性能和响应速度是至关重要的。为了提高数据访问速度,常常会使用缓存技术。Redis作为一种高性能的内存数据库,常被用作缓存层,而MySQL则作为持久化存储层。如何有效地将MySQL数据与Redis缓存进行同步,是一个关键问题。本文将详细探讨Redis作为缓存时,http://ww......
  • 微服务--MQ安装(+Docker命令应用)
    一、下载镜像方法1:在线拉取 dockerrunrabbitmq:3-management 方法2:从本地加载(即压缩包加载)将镜像包-tar包上传到虚拟机,使用命令加载镜像即可dockerload-imq.tar 补充方法2思路:dockerimages//查看镜像//将tar包上传到tmp目录cd/tmp///进入tmp目录......
  • 互联网大厂的缓存策略:抵抗超高并发的秘密武器,已开源!
    大家好,我是冰河~~最近,有小伙伴私信我:冰哥,我最近出去面试,面试官问我如何设计缓存能让系统在百万级别流量下仍能平稳运行,我当时没回答上来。接着,面试官问我之前的项目是怎么使用缓存的,我说只是缓存了一些数据。当时确实想不到缓存还有哪些用处,估计这次面试是挂了。冰哥,你可以给我讲......
  • # RocketMQ 实战:模拟电商网站场景综合案例(二)
    RocketMQ实战:模拟电商网站场景综合案例(二)===========================================================一、SpringBoot整合Dubbo:dubbo概述1、dubbo概述Dubbo:是阿里巴巴公司开源的一款高性能、轻量级的JavaRPC框架,它提供了三大核心能力:面向接口的远程方法调用......
  • 亲,你有多久没有清理过你电脑的 DNS 缓存了?
    最近明月因为工作关系更换了几次使用的电脑,期间就发现明明另一台电脑访问某个网址是正常,换一台电脑后就会出现无法访问的现象,并且用的还是同一个宽带网络,实在是太诡异了!后来还是突然想起来DNS缓存这个问题,立马清除了那台电脑的DNS缓存后,打不开的网址顺利的呈现出来了。DNS......
  • rabbitMq实现系统内的短信发送设计&动态获取BEAN
    rabbitMq实现系统内的短信发送设计&动态获取BEAN1.短信非系统的重要节点操作,可以在任务完成之后,比如下单成功,发送下单成功的mq消息,短信服务接收到mq消息,动态的判断该短信的code,通过全局公共的父类(调用中台等接口获取全部所有需要的对象参数),获取短信中的{mobile}等参数来替换短......