多级缓存同步更新操作的优化策略——MQ
文章部分内容参考京东技术文章,在文章的基础上添加自己的见解,如有侵权请告知删除
https://mp.weixin.qq.com/s/2SQP1c-zMWdkIu4XRnBV0A
参考资料:
技术背景
在像商城板块一样的情景里面,大型活动会在短时间内容易产生超出预期的流量,这意味着页面的热点数据需要超高频率地存取,这就要求系统符合高性能、高并发、高可用的属性,在单纯添加机器效益较低和中间件等依赖服务已经遇到瓶颈的时候,就需要有使用以下的方法来解决。
高并发解决方案
限流
限流的多少主要按照以下的顺序进行考虑:
- 进行限流后对用户体验的影响多少
- 活动产生的数据量
- 限流之后对系统架构的复杂度和成本的影响
当然,限流的多少基于历史数据进行判定会让限流拥有更高的效率
重缓存(多级缓存)
本地缓存的构建的两种方式:
- 一级缓存:本地+mysql+推模式更新。
- 两级缓存:本地+r2m+mysql+推模式更新(这里面的R2M是指分布式缓存中间件Redis Cluster)
重点策略概要
结合MQ的推模式更新实现多级存储的同步
采用MQ的原因:
原先直接顺序实现多级缓存的每一级同步工作,存在一些弊端,如果本地缓存机器或者Redis机器down机了,那么往后的缓存同步操作就要进行等待,这意味着同一个时刻如果有多个更新操作产生,使用了锁机制来解决并发带来的超出限定的问题,但是这个时候还是会出现多个同步操作被阻塞的问题,迟迟不能返回结果,那么与同步操作挂钩的操作可能就会因此无法继续,在高并发情境下极容易出现系统无法处理短时间内过多的请求而down机的问题
采用推模式更新的原因:
数据从一个数据源主动地推送到缓存中,而不是由缓存去拉取数据,这种模式能确保缓存中的数据在更新时立即得到同步,减少了缓存和数据源之间的延迟和不一致性
需要使用的相关技术:
数据库触发器、消息变化数据表、MQ
触发器+MQ实现推模式更新过程:
-
创建消息变化数据表
CREATE TABLE message_queue ( id INT AUTO_INCREMENT PRIMARY KEY, message JSON, processed BOOLEAN DEFAULT FALSE );
[!TIP]
- 消息需要是JSON类型的,里面包含数据项:
operation
和content
- 需要有一个状态字段作为判定是否已经发出这个消息
- 消息需要是JSON类型的,里面包含数据项:
-
使用触发器检测存储对应数据的表,如果数据有变化就写入消息队列表
DELIMITER // -- 修改SQL语句的结束符为//,这样子可以防止在触发器体内使用END语句时,MySQL误认为语句结束 CREATE TRIGGER after_insert_update_delete AFTER INSERT OR UPDATE OR DELETE ON your_table FOR EACH ROW -- 指定触发器对表中的每一行的插入、更新或者删除操作生效 BEGIN DECLARE msg JSON; -- 声明一个变量为JSON类型 IF (NEW.id IS NOT NULL) THEN SET msg = JSON_OBJECT('operation', 'INSERT/UPDATE', 'id', NEW.id, 'data', NEW.data); ELSE SET msg = JSON_OBJECT('operation', 'DELETE', 'id', OLD.id); END IF; INSERT INTO message_queue (message) VALUES (msg); END; // DELIMITER ; -- 插入 DELIMITER // CREATE TRIGGER after_insert_seckill_sku AFTER INSERT ON SECKILL_SKU FOR EACH ROW BEGIN DECLARE msg JSON; SET msg = JSON_OBJECT('operation', 'INSERT', 'id', NEW.id, 'data', NEW.seckillStock); INSERT INTO MESSAGE_QUEUE (message) VALUES (msg); END; // DELIMITER ; -- 更新 DELIMITER // CREATE TRIGGER after_update_seckill_sku AFTER UPDATE ON SECKILL_SKU FOR EACH ROW BEGIN DECLARE msg JSON; SET msg = JSON_OBJECT('operation', 'UPDATE', 'id', NEW.id, 'data', NEW.seckillStock); INSERT INTO MESSAGE_QUEUE (message) VALUES (msg); END; // DELIMITER ; -- 删除 DELIMITER // CREATE TRIGGER after_delete_seckill_sku AFTER DELETE ON SECKILL_SKU FOR EACH ROW BEGIN DECLARE msg JSON; SET msg = JSON_OBJECT('operation', 'DELETE', 'id', OLD.id); INSERT INTO MESSAGE_QUEUE (message) VALUES (msg); END; // DELIMITER ;
[!TIP]
- 这里面的your_table是存储热点信息的表
- 如果是插入或更新操作,创建一个JSON对象,包含操作类型
'INSERT/UPDATE'
,新行的id
和数据NEW.data
如果是删除操作,创建一个JSON对象,包含操作类型'DELETE'
和被删除行的id
NEW
和OLD
是 MySQL 触发器中使用的两个特殊关键字,它们用于引用在触发器中触发的行的新值和旧值,对于这里面的NEW.data对应的是新插入或者更新的行的data字段,所以自己可以根据自己设计的表字段来添加这里面的数据项- MySQL需要为每种事件分别定义触发器,所以不能一次性为三种操作进行添加触发器
-
写一个Java类,采用定时任务专门监听消息变化数据表的变化,然后将其作为消息利用Kafka发送出来
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class SyncInfoProducer { @Resource private KafkaTemplate<String,String> kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } }
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.ArrayList; import java.util.List; @Component public class DatabaseChangeListener { @Autowired private DataSource dataSource; @Autowired private SyncInfoProducer syncInfoProducer; @Scheduled(fixedDelay = 5000) // 每5秒检查一次数据库变化,结合系统的情况自行自定义就好 @Transactional public void checkDbChanges() { List<Integer> processedIds = new ArrayList<>(); try (Connection connection = dataSource.getConnection(); PreparedStatement ps = connection.prepareStatement("SELECT id, message FROM logisticOrder.MESSAGE_QUEUE WHERE processed = FALSE"); // 查询所有未发送的消息出来 ResultSet rs = ps.executeQuery()) { while (rs.next()) { int id = rs.getInt("id"); String message = rs.getString("message"); syncInfoProducer.sendMessage("cache_updates", message); processedIds.add(id); // 对所有发送的消息的id存储进List里面 } } catch (Exception e) { e.printStackTrace(); } markMessagesAsProcessed(processedIds); } private void markMessagesAsProcessed(List<Integer> ids) { if (ids.isEmpty()) return; String idList = ids.toString().replaceAll("[\\[\\]]", ""); String updateQuery = "UPDATE message_queue SET processed = TRUE WHERE id IN (" + idList + ")"; try (Connection connection = dataSource.getConnection(); PreparedStatement ps = connection.prepareStatement(updateQuery)) { ps.executeUpdate(); } catch (Exception e) { e.printStackTrace(); } } }
[!TIP]
-
markMessagesAsProcessed()方法的说明
String idList = ids.toString().replaceAll("[\\[\\]]", "");
-
ids.toString()
:将ID列表转换为字符串形式,结果类似于"[1, 2, 3]"
。 -
.replaceAll("[\\[\\]]", "")
:使用正则表达式移除字符串中的方括号,结果变为"1, 2, 3"
。这一步是为了生成适用于SQLIN
子句的格式。
-
-
上面的很多都用到了
PreparaedStatement
及其对应各种操作的执行方法
-
-
监听方法接收到JSON格式的消息之后就要获得里面的操作类型字段,从而对Redis和LocalCache进行对应的操作
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; @Component public class CacheUpdateConsumer { @Autowired private StringRedisTemplate stringRedisTemplate; private Map<String, JsonNode> localCache = new HashMap<>(); @KafkaListener(topics = "cache_updates", groupId = "cache-update-group") public void receiveMessage(String message) { try { ObjectMapper mapper = new ObjectMapper(); JsonNode jsonMessage = mapper.readTree(message); String operation = jsonMessage.get("operation").asText(); String id = jsonMessage.get("id").asText(); switch (operation) { case "INSERT": case "UPDATE": JsonNode data = jsonMessage.get("data"); // 这里的data也是自己在发消息的时候设置的数据项名字 // 更新Redis缓存 stringRedisTemplate.opsForValue().set("your_key_prefix:" + id, data.toString()); // 更新本地缓存 localCache.put(id, data); break; case "DELETE": // 删除Redis缓存 stringRedisTemplate.delete("your_key_prefix:" + id); // 删除本地缓存 localCache.remove(id); break; } } catch (Exception e) { e.printStackTrace(); } } public JsonNode getFromLocalCache(String id) { return localCache.get(id); } }
[!TIP]
- 这里面的
your_key_prefix
前缀要注意一致性
- 这里面的
以上方法由于我个人的数据库添加触发器之后出现解决不了的堆栈溢出问题,所以实际上本人没有使用触发器的方法,而是直接在业务层直接实现异步进行同步操作
业务层MQ实现异步处理更新操作
一些必备的功能
由于我的数据采用三级存储结构来存储的,所以我写了一个方法专门从三级存储结构里面获得数据,按照顺序:一级本地缓存->二级Redis缓存->三级数据库
public String getDataFrom_3Storage(String cache_name, String local_keyname, String redis_keyname,String databaseOP, int skuId, boolean isFramework){
Collection<Cache> cacheCollection = cacheManager.getCache(cache_name); // 直接getCache的话返回的是一个集合Collection
String data_asString = null;
for(Cache cache:cacheCollection){
// 只需要拿一次就够了,因为Cache里面可以有同名的,但是我们为了避免混乱就避免缓存名重复
data_asString = cache.get(local_keyname, String.class);
break;
}
if(data_asString!=null){
log.info("______从本地缓存中获得{}数据:{}______",databaseOP,data_asString);
return data_asString; // 使用了Layering-cache框架的注解存储Redis缓存的时候由于序列化器的原因会多一个双引号
}
else {
// 从Redis里面获得数据
data_asString = stringRedisTemplate.boundValueOps(redis_keyname).get();
if(isFramework){
data_asString = data_asString.substring(1,data_asString.length()-1);
}
if(data_asString !=null){
log.info("______从Redis缓存中获得{}数据:{}______",databaseOP,data_asString);
return data_asString;
}
// 从数据库里面获得数据
else {
if(Objects.equals(databaseOP, "STOCK")){
data_asString = String.valueOf(seckillSkuMapper.selectStockBySkuId(skuId));
}
else if(Objects.equals(databaseOP, "LIMIT")){
data_asString = String.valueOf(seckillSkuMapper.selectLimitBySkuId(skuId));
}
log.info("______从数据库中获得{}数据:{}______",databaseOP,data_asString);
return data_asString;
}
}
}
三级存储结构的异步更新操作
先更新一级缓存,之所以三级里面选择本地缓存优先更新,主要是因为本系统里面的数据获取顺序是先本地缓存再Redis缓存再数据库,提前更新好本地缓存能最大可能地减少数据未更新的情况出现
@Transactional
public boolean syncUpdateData_3Storage(String cache_name,String local_keyname,String redis_keyname,String databaseOP,int change,int skuId,boolean isFramework){
// 本地缓存的同步更新
Collection<Cache> cacheCollection = cacheManager.getCache(cache_name);
for(Cache cache:cacheCollection){
String old_LocalValue = cache.get(local_keyname,String.class);
if(old_LocalValue == null){
log.error("^^^^^^本地缓存同步更新失败^^^^^^");
return false;
}
String new_LocalValue = String.valueOf(Integer.parseInt(old_LocalValue)+change);
cache.evict(local_keyname);
cache.put(local_keyname,new_LocalValue);
break;
} // 执行一遍就完成
SyncInfo syncInfo = new SyncInfo();
syncInfo.setCacheName(cache_name);
syncInfo.setLocalKeyName(local_keyname);
syncInfo.setRedisKeyName(redis_keyname);
syncInfo.setDatabaseOP(databaseOP);
syncInfo.setChange(change);
syncInfo.setSkuId(skuId);
syncInfo.setFramework(isFramework);
log.info("开始同步Redis缓存与数据库");
kafkaTemplate.executeInTransaction(kafkaOperations -> {
kafkaOperations.send("sync-Redis-DB",JSONObject.toJSONString(syncInfo));
return true;
});
return true;
}
注意点:
- Layering-Cache多级缓存框架的getCache方法返回类型为
Collection<Cache>
- 使用@Transactional注解结合KafkaTemplate的executeTransactional方法实现事务功能,这个很必要,因为消息队列很大的一个主题就是数据一致性问题的处理
@Component
@Slf4j
public class Sync3StorageUtils {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private SeckillRedisCaffeineUtils seckillRedisCaffeineUtils;
@Resource
private SeckillSkuMapper seckillSkuMapper;
@KafkaListener(topics = "sync-Redis-DB")
public void sync_execute(ConsumerRecord<String,String> record){
SyncInfo syncInfo = JSONObject.parseObject(record.value(), SyncInfo.class);
Long surplus_expiration = stringRedisTemplate.boundValueOps(syncInfo.getRedisKeyName()).getExpire(); // 剩余过期时间
String old_RedisValue = seckillRedisCaffeineUtils.getDataFrom_3Storage(syncInfo.getCacheName()
, syncInfo.getLocalKeyName()
, syncInfo.getRedisKeyName()
, syncInfo.getDatabaseOP()
, syncInfo.getSkuId()
, syncInfo.isFramework()); // 也从三级存储空间拿数据
if(old_RedisValue == null || surplus_expiration == null){
log.error("^^^^^^Redis缓存同步更新失败^^^^^^");
return;
}
String new_RedisValue = String.valueOf(Integer.parseInt(old_RedisValue)+ syncInfo.getChange());
stringRedisTemplate.boundValueOps(syncInfo.getRedisKeyName() ).set(new_RedisValue,surplus_expiration, TimeUnit.SECONDS);
// 数据库信息的同步更新,限购不需要更新,只有库存量需要同步更新
if(Objects.equals(syncInfo.getDatabaseOP(), "STOCK")){
seckillSkuMapper.updateStock(syncInfo.getSkuId());
}
}
}
注意点:
- 这里同步Redis的时候,如果不考虑剩余的刷新时间,那么直接重置过期时间不合理的时候可能会出现Redis堆积过多本应该淘汰的缓存,所以直接先从Redis里面拿过期时间
原先是将三级存储的更新操作采用顺序实现,改为异步处理之后,接口也丝滑了不少
之后我会把我的购买接口写一篇博客,这个接口以多级缓存为底层,实现包含限购、校验、同步、存取等功能
上面的内容是本人自己的见解得来,难免有不够合理的地方,也欢迎读者在评论区予以指正!
标签:缓存,String,多级,syncInfo,MQ,import,data,id From: https://blog.csdn.net/m0_73500407/article/details/139534014