Redis开门之批量插入Pipeline
下发数据同步到Redis中,数据少的话几千条,多则达百万级。其中一个场景是把下发的数据同步到Redis中,数据同步完成后,把数据写入到文件中,下发给客户,客户调用。某天......
产品经理:小A,我发现我们这个数据整体下发的流程耗时有点长啊...从拉取数据到处理下发将近一个小时的时间,每天处理的XX数量也不是很多,看下是哪块耗时比较久,优化下
小A:嗯...我们现在的处理逻辑是所有的任务完成后 才会统一进行同步Redis的操作,而且目前的代码并没有批量处理数据到Redis,而是直接一条一条的处理,就很慢....
产品经理:一条一条?百万的数据 那大概处理多长时间啊?优化下吧...
小A:嗯,我看下怎么处理吧
看了之前同步Redis代码的逻辑(之前不是小A写的),异步处理、线程池、分批次、应有尽有但就是很慢。给出小A 优化前的大概框架。
try {
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> redisOperations) throws DataAccessException {
for (JSONObject redisSynchro : json) {
// 同步数据到redis
redisTemplate.opsForList().rightPush("XXX")
// 设置过期时间
redisTemplate.expire()
}
return null;
}
});
} catch (Exception e) {
log.info("同步redis发生意外异常 :" + e.getMessage());
} finally {
countDownLatch.countDown();
}
小A查了下Redis批量处理数据的方法 网上给出的博客就是pipeline操作啊,代码里用的就是executePipelined 执行管道操作。嗯... 不对再查查。executePipelined的执行原理是什么?
-
管道(Pipeline) 是 Redis 提供的一种优化手段,允许客户端将多个命令一次性发送到 Redis 服务器,而不是每次发送一个命令并等待响应。这样可以减少网络延迟,提高操作效率。
-
executePipelined()
方法的本意是将多个 Redis 操作封装为一个管道批量执行,返回所有操作的结果。 -
客户端批量发送命令 → 服务器批量执行命令 → 客户端一次性接收所有结果。
为什么第一次的执行没有通过管道执行呢?
封装层次过高: executePipelined 中使用了 redisTemplate
的操作方法,而不是直接通过 RedisConnection
来执行 Redis 的底层命令。
分散的命令: 每个 rightPush和 expire 方法都各自独立调用,即使这些操作在同一个 executePipelined
方法中,也无法通过 Redis 的原生管道机制优化。
所以如果要想使用Redis的管道操作,应该避免使用高层次封装的redisTemplat。于是乎,小A改变了上面的执行方式
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
StringRedisSerializer serializer = new StringRedisSerializer();
for (ResultInfo result : tmps) {
// 组装数据 key/value
// 检查元素是否已存在
if (检查条件) {
// 如果不存在,则插入
connection.rPush(serializer.serialize(key), serializer.serialize(value));
}
// 设置过期时间
connection.expire(serializer.serialize(key), 30 * 24 * 3600L);
}
return null;
});
使用了 StringRedisSerializer
对键和值进行序列化。序列化方式直接处理 Redis 底层连接的操作,通常适用于高性能的场景。
经过实测13W级数据插入耗时3秒,9W数据耗时1秒。
标签:pipeLine,批量,处理,redis,Redis,数据,executePipelined,redisTemplate From: https://www.cnblogs.com/kuangsun125/p/18586651