首页 > 数据库 >RedisTemplate Pipeline 封装和使用,优化Redis操作

RedisTemplate Pipeline 封装和使用,优化Redis操作

时间:2023-04-05 19:56:20浏览次数:49  
标签:Pipeline return Redis private new 操作 public RedisTemplate

前言

公司游戏项目服务端运行过程中,依赖 Redis 保存游戏过程数据。与客户端一次交互会对 Redis 进行大量的操作,在内网开发环境下测试,一次操作开销在 1-2 ms,建立连接和网络开销累积在一起大大降低了响应速度,这种场景下可以使用 Redis Pipeline 进行优化。

Redis Pipeline

Redis 服务本身并没有专门的 Pepeline 功能,而是客户端自行实现的一种交互方式。简单说就是与 Redis 服务器建立一次 Socket 连接,然后将多个操作指令发送给 Redis 服务器,并获取操作结果。一次连接,一次网络交互自然大大减少了开销。

思路

公司使用的是Spring中StringRedisTemplate提供的API,它底层已经实现了两种 Pipeline 操作,其最终都是建立连接,将设置的操作一次性提交给 Redis 服务器,然后获取到操作结果列表。只不过一种偏向底层操作,另一种经过了封装使用更加方便,我选择了后者。如下:

// 偏底层操作
public <T> T execute(RedisCallback<T> action) {
	return execute(action, isExposeConnection());
}
	
// 经过封装
public List<Object> executePipelined(final SessionCallback<?> session) {
	return executePipelined(session, valueSerializer);
}

但是,实际使用过程中,对缓存的操作往往和业务逻辑互相穿插,将代码写在 SessionCallback 实现中,代码扩展性和易读性会大大降低,再次封装是非常有必要的。

封装大致思路是,实现一个管道对象,通过管道对象记录需要执行的操作,最后调用 StringRedisTemplate 实现的 Pipeline 来提交这些操作并获取返回值。

实现

通过调用封装的方法,接收参数生成 Consumer 函数,函数记录了接收的参数,并记录需要执行的对应 Jedis RedisOperations 方法,放入操作队列中。最终管道提交的时候会将队列中的函数全部执行。

生成 Consumer 函数的同时,会返回一个 Supplier 函数,函数记录了获取返回值的方式,在管道提交后通过它来获取返回值。其实它只是记录了操作在队列中的序号,根据序号获取结果列表中对应的结果。关键代码如下:

import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.*;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;

/**
 * Redis 管道操作,利用一次与Redis服务器交互,执行多个操作并获取返回结果
 * 步骤:
 * 1、构建管道对象,利用 **Ops 对象添加 Redis 操作,添加操作会返回 Supplier 函数
 * 2、利用 Supplier 函数获取对应操作返回值
 * 3、调用 Supplier 函数或者执行 execute() 都会提交管道中的操作
 */
public class StringRedisPipeline {

    /**
     * 为了避免一次性执行的操作数太多,占用过多 Redis 服务器缓存空间。程序会按批次提交,这个值代表一批操作最大数量
     */
    private static final int OPERATION_BATCH_SIZE = 50;

    private final StringRedisTemplate stringRedisTemplate;

    /**
     * 操作队列
     */
    private final Queue<Consumer<RedisOperations>> operationSetterQueue;

    /**
     * 操作结果
     */
    private final List<Object> opsResults;

    /**
     * 操作结果索引
     */
    private int opsResultIndex;

    private ValueOperations valueOperations;
    private HashOperations hashOperations;
    private ListOperations listOperations;
    private SetOperations setOperations;
    private ZSetOperations zSetOperations;
    private HyperLogLogOperations hyperLogLogOperations;
    private GeoOperations geoOperations;
    private StreamOperations streamOperations;

    /**
     * 构建一个管道对象
     */
    public static StringRedisPipeline build(StringRedisTemplate stringRedisTemplate) {
        return new StringRedisPipeline(requireNonNull(stringRedisTemplate));
    }

    /**
     * Pipeline 执行所有操作
     *
     * @return true:成功执行,false:管道中无操作
     */
    public boolean execute() {
        if (this.operationSetterQueue.isEmpty()) {
            return false;
        }
        for (List<Consumer<RedisOperations>> operationSetters = splitABatchOfOperation();
             !operationSetters.isEmpty(); operationSetters = splitABatchOfOperation()) {
            this.opsResults.addAll(executeWithPipeline(operationSetters));
        }
        return true;
    }

    /**
     * {@link RedisOperations#delete(Object)}
     */
    public StringRedisPipeline delete(String key) {
        addOperation(operations -> operations.delete(key));
        return this;
    }

    /**
     * {@link RedisOperations#delete(Collection)}
     */
    public Supplier<Long> delete(Collection<String> keys) {
        return addOperation(operations -> operations.delete(keys));
    }

    /**
     * {@link RedisOperations#keys(Object)}
     */
    public Supplier<Set<String>> keys(String pattern) {
        return addOperation(operations -> operations.keys(pattern));
    }

    public ValueOperations opsForValue() {
        if (valueOperations == null) valueOperations = new ValueOperations(this);
        return valueOperations;
    }

    public HashOperations opsForHash() {
        if (hashOperations == null) hashOperations = new HashOperations(this);
        return hashOperations;
    }

    public ListOperations opsForList() {
        if (listOperations == null) listOperations = new ListOperations(this);
        return listOperations;
    }

    public SetOperations opsForSet() {
        if (setOperations == null) setOperations = new SetOperations(this);
        return setOperations;
    }

    public ZSetOperations opsForZSet() {
        if (zSetOperations == null) zSetOperations = new ZSetOperations(this);
        return zSetOperations;
    }

    public HyperLogLogOperations opsForHyperLogLog() {
        if (hyperLogLogOperations == null) hyperLogLogOperations = new HyperLogLogOperations(this);
        return hyperLogLogOperations;
    }

    public GeoOperations opsForGeo() {
        if (geoOperations == null) geoOperations = new GeoOperations(this);
        return geoOperations;
    }

    public StreamOperations opsForStream() {
        if (streamOperations == null) streamOperations = new StreamOperations(this);
        return streamOperations;
    }

    <T> Supplier<T> addOperation(Consumer<RedisOperations> operationSetter) {
        this.operationSetterQueue.offer(operationSetter);
        int resultIndex = this.opsResultIndex++;
        return () -> {
            if (resultIndex >= this.opsResults.size()) this.execute();
            return (T) opsResults.get(resultIndex);
        };
    }

    /**
     * 按配置大小分割出一组操作
     */
    private List<Consumer<RedisOperations>> splitABatchOfOperation() {
        if (operationSetterQueue.isEmpty()) {
            return Collections.emptyList();
        }
        List<Consumer<RedisOperations>> operationSetters = new LinkedList<>();
        for (int i = OPERATION_BATCH_SIZE; i > 0 && !operationSetterQueue.isEmpty(); i--) {
            operationSetters.add(operationSetterQueue.poll());
        }
        return operationSetters;
    }

    /**
     * 通过 Redis 管道执行所有操作
     *
     * @return 操作结果
     */
    private List<Object> executeWithPipeline(List<Consumer<RedisOperations>> operationSetters) {
        return this.stringRedisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) {
                operationSetters.forEach(operationSetter -> operationSetter.accept(operations));
                return null;
            }
        });
    }

    private StringRedisPipeline(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = requireNonNull(stringRedisTemplate);
        this.operationSetterQueue = new LinkedList<>();
        this.opsResults = new ArrayList<>();
    }
}

使用例子

/**
 * 部分无返回值的操作可以采用流式编程调用管道
 */
@Test
void add() {
  StringRedisPipeline jedisPipeline = StringRedisPipeline.build(stringRedisTemplate);
  jedisPipeline.opsForValue()
    .set(KEY_PREFIX + "key1", "1")
    .set(KEY_PREFIX + "key2", "2")
    .and().opsForHash()
    .put(KEY_PREFIX + "key3", "hashKey1", "1")
    .put(KEY_PREFIX + "key3", "hashKey2", "2")
    .execute();
}

/**
 * 多个获取返回值的操作通过管道执行
 */
@Test
void get() {
  StringRedisPipeline jedisPipeline = StringRedisPipeline.build(stringRedisTemplate);
  Supplier<String> valu1 = jedisPipeline.opsForValue().get(KEY_PREFIX + "key1");
  Supplier<String> valu2 = jedisPipeline.opsForValue().get(KEY_PREFIX + "key2");
  Supplier<String> hashValue1 = jedisPipeline.opsForHash().get(KEY_PREFIX + "key3", "hashKey1");
  Supplier<String> hashValue2 = jedisPipeline.opsForHash().get(KEY_PREFIX + "key3", "hashKey2");
  assertEquals("1", valu1.get());
  assertEquals("2", valu2.get());
  assertEquals("1", hashValue1.get());
  assertEquals("2", hashValue2.get());
}

全部源码见 GitHub 项目 Spring Redis Pipeline

标签:Pipeline,return,Redis,private,new,操作,public,RedisTemplate
From: https://www.cnblogs.com/shuiyao3/p/17290709.html

相关文章

  • Redis布隆过滤器的原理和安装使用
    前言本文讲述布隆过滤器(RedisBloom)的基本原理和安装使用。RedisBloom是什么?RedisBloom是Redis中过滤器模块,可以用来判断值是否存在,常用来解决缓存穿透问题。查询数据时,先用RedisBloom判断数据是否存在,不存在则直接返回,存在则从缓存/数据库获取后返回。比如查询接......
  • Redis 在排行榜中的应用
    1.Redis的SortedSet数据类型1.1SortedSet数据类型的特点SortedSet有序集合是Redis提供的一种重要的数据类型。它是由不重复且有序的字符串元素组成的,而且每个元素都会关联一个double类型的分数,通过该分数来为集合中的成员进行从小到大的排序。SortedSet的......
  • Redis持久化RDB和AOF原理解析、使用和优缺点对比
    前言本文讲述Redis两种持久化方式RDB和AOF优缺点以及原理。为何需要持久化?Redis是基于内存操作的,进程终止、服务器宕机后内存数据会丢失,但是在很多使用场景中我们希望数据不丢失,服务重启之后数据还能恢复到停机前的状态,特别是使用Redis做数据库的情况。Redis持久化......
  • Redis 在身份认证中的应用
    1.Redis在Session共享问题中的应用传统Session-Cookeis身份认证方法中,一个Session只保存在一台服务器上,适合域单体应用。随着项目规模的增加,项目的架构也不断向微服务分布式集群演进,传统的Session-Cookie方式在集群环境下就不能很好的工作了,这时就产生了Session共......
  • Redis 缓存机制
    1.Redis缓存缓存(cache),原始意义是指访问速度比一般随机存取存储器(RAM)快的一种高速存储器,通常它不像系统主存那样使用DRAM技术,而使用昂贵但较快速的SRAM技术。缓存的设置是所有现代计算机系统发挥高性能的重要因素之一。Redis因读写性能较高,它非常适合作为存贮数据......
  • 在Linux部署Redis主从和哨兵集群实现高可用
    前言本文主要讲述在Linux系统中配置和部署Redis主从集群和哨兵,实现高可用和自动故障迁移。准备工作参考Redis单机部署安装3个Redis服务作一主二从,本文准备了6380(主)、6381(从)和6382(从)。参考Redis单机部署安装3个Redis服务作哨兵集群,本文准备了26380......
  • 在Linux部署Redis Cluster集群
    前言本文讲述在Linux系统部署RedisCluster实现数据分片的具体步骤。请参考Redis单机部署下载编译。RedisCluster是什么?RedisCluster是官方提供的一种用数据分片来实现横向扩容的解决方案,由一个或多个Redis服务组成一个无主集群。对Key使用哈希算法将数据分散......
  • 在Linux部署Redis代理Predixy实现数据分片
    前言本文以predixy-1.0.5为例,讲述Redis代理Predixy安装过程。Predixy是一款高性能全特征Redis代理,支持Redis-sentinel和Redis-cluster。作者拿其它常用代理做了性能测评,Predixy在各个维度性能都是最优的,与其他代理的功能对比。我们可以通过取模、随机、一致性哈希......
  • 在Linux部署Redis代理Twemproxy实现数据分片
    前言本文主要讲述Redis代理Twemproxy安装过程。Twemproxy是推特开源用于Memcached和Redis的轻量级代理。这里以0.5.0版本为例。我们可以通过取模、随机、一致性哈希等算法将数据分散在多个Redis服务来实现水平扩展。但是客户端直连就需要跟每个Redis服务产生连接,......
  • Redis——(主从复制、哨兵模式、集群)的部署及搭建
    重点:主从复制:主从复制是高可用redis的基础,主从复制主要实现了数据的多机备份,以及对于读操作的负载均衡和简单的故障恢复。哨兵和集群都是在主从复制基础上实现高可用的。缺点:故障恢复无法自动化,写操作无法负载均衡,存储能力受到单机的限制。哨兵:在主从复制的基础上,哨兵......