首页 > 数据库 >spark通过pipline方式批量插入redis集群方式

spark通过pipline方式批量插入redis集群方式

时间:2022-10-16 21:00:07浏览次数:41  
标签:pipeline String redis jedis new import spark pipline

spark通过pipline方式批量插入redis集群网上资料比较少,但是有一大堆都是单机的方式,spring倒是也有写入redis集群的实现代码,以下整理了spark通过pipline批量写入的方式,速度确实快,不然一条条set进去,真的是天都要黑了。

依赖到的maven有以下(spark忽略):

        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>3.5.2</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.3.0</version>
        </dependency>

 以下是spark集成redis cluster部分例子:

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;

import java.util.HashSet;
import java.util.Set;

public class SparkPiplineRedis {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("test").setMaster("local");
        SparkSession session = SparkSession.builder().config(conf).getOrCreate();

        Dataset<Row> dataset = session.sql("" +
                "select '1001' id,'jeff' name,1 age " +
                "union all " +
                "select '1002' id,'kitty' name,2 age ");

        String hosts = "";
        String ports = "";

        dataset.foreachPartition(iter->{
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxTotal(10);// 最大连接数, 默认8个
            jedisPoolConfig.setMaxIdle(10);// redis.maxIdle
            jedisPoolConfig.setMaxWaitMillis(2000);// 2s
            jedisPoolConfig.setTestOnBorrow(true);

            Set<HostAndPort> hostAndPortsSet = new HashSet<HostAndPort>();
            for (String ip : hosts.split(",")) {
                for (String port : ports.split(",")) {
                    hostAndPortsSet.add(new HostAndPort(ip, Integer.parseInt(port)));
                }
            }
            JedisCluster jedisCluster = new JedisCluster(hostAndPortsSet, jedisPoolConfig);
            JedisClusterPipeline jedisClusterPipeline = new JedisClusterPipeline(jedisCluster);

            while (iter.hasNext()){
                Row row = iter.next();
                String id = row.getAs("id").toString();
                String name = row.getAs("name").toString();
                jedisClusterPipeline.hsetByPipeline("TEST:PERSON",id,name);
            }
            jedisClusterPipeline.releaseConnection();
        });

        session.stop();
    }
}

 

jedisCluster管道方式实现代码如下(转自哪里忘了):

import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException;
import redis.clients.jedis.util.JedisClusterCRC16;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * 基于JedisCluster实现管道的使用
 * 核心对象:JedisClusterInfoCache和JedisSlotBasedConnectionHandler
 * 使用构造方法将JedisCluster对象传递进来
 */
public class JedisClusterPipeline {

    /**
     * 构造方法
     * 通过JedisCluster获取JedisClusterInfoCache和JedisSlotBasedConnectionHandler
     * @param jedisCluster
     */
    public JedisClusterPipeline(JedisCluster jedisCluster){
        this.jedisCluster = jedisCluster;
        MetaObject metaObject = SystemMetaObject.forObject(jedisCluster);
        clusterInfoCache = (JedisClusterInfoCache)metaObject.getValue("connectionHandler.cache");
        connectionHandler = (JedisSlotBasedConnectionHandler)metaObject.getValue("connectionHandler");
    }

    /** 管道命令提交阈值 */
    private final int MAX_COUNT = 10000;
    /** Redis集群缓存信息对象 Jedis提供*/
    private JedisClusterInfoCache clusterInfoCache;
    /** Redis链接处理对象 继承于JedisClusterConnectionHandler,对其提供友好的调用方法 Jedis提供 */
    private JedisSlotBasedConnectionHandler connectionHandler;
    /** Redis集群操作对象 Jedis提供 */
    private JedisCluster jedisCluster;

    /** 存储获取的Jedis对象,用于统一释放对象 */
    private CopyOnWriteArrayList<Jedis> jedisList = new CopyOnWriteArrayList();
    /** 存储获取的Jedis连接池对象与其对应开启的管道,用于保证slot(哈希槽)对应的节点链接的管道只被开启一次 */
    private ConcurrentHashMap<JedisPool, Pipeline> pipelines = new ConcurrentHashMap<>();
    /** 存储每个开启的管道需要处理的命令(数据)数,当计数达到提交阈值时进行提交 */
    private ConcurrentHashMap<Pipeline, Integer> nums = new ConcurrentHashMap<>();

    public void hsetByPipeline(String key, String field, String value){
        Pipeline pipeline = getPipeline(key);
        pipeline.hset(key, field, value);
        nums.put(pipeline, nums.get(pipeline) + 1);
        this.maxSync(pipeline);
    }

    /**
     * 释放获取的Jedis链接
     * 释放的过程中会强制执行PipeLine sync
     */
    public void releaseConnection() {
        jedisList.forEach(jedis -> jedis.close());
    }

    /**
     * 获取JedisPool
     * 第一次获取不到尝试刷新缓存的SlotPool再获取一次
     * @param key
     * @return
     */
    private JedisPool getJedisPool(String key){
        /** 通过key计算出slot */
        int slot = JedisClusterCRC16.getSlot(key);
        /** 通过slot获取到对应的Jedis连接池 */
        JedisPool jedisPool = clusterInfoCache.getSlotPool(slot);
        if(null != jedisPool){
            return jedisPool;
        }else{
            /** 刷新缓存的SlotPool */
            connectionHandler.renewSlotCache();
            jedisPool = clusterInfoCache.getSlotPool(slot);
            if (jedisPool != null) {
                return jedisPool;
            } else {
                throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot " + slot);
            }
        }
    }

    /**
     * 获取Pipeline对象
     * 缓存在pipelines中,保证集群中同一节点的Pipeline只被开启一次
     * 管道第一次开启,jedisList,pipelines,nums存入与该管道相关信息
     * @param key
     * @return
     */
    private Pipeline getPipeline(String key){
        JedisPool jedisPool = getJedisPool(key);
        /** 检查管道是否已经开启 */
        Pipeline pipeline = pipelines.get(jedisPool);
        if(null == pipeline){
            Jedis jedis = jedisPool.getResource();
            pipeline = jedis.pipelined();
            jedisList.add(jedis);
            pipelines.put(jedisPool, pipeline);
            nums.put(pipeline, 0);
        }
        return pipeline;
    }

    /**
     * 管道对应的命令计数,并在达到阈值时触发提交
     * 提交后计数归零
     * @param pipeline
     * @return
     */
    private void maxSync(Pipeline pipeline){
        Integer num = nums.get(pipeline);
        if(null != num){
            if(num % MAX_COUNT == 0){
                pipeline.sync();
                nums.put(pipeline, 0);
            }
        }
    }
}

 

标签:pipeline,String,redis,jedis,new,import,spark,pipline
From: https://www.cnblogs.com/linkmust/p/16797114.html

相关文章

  • 17.Redis之分布式锁
    参考1:https://www.cnblogs.com/wangyingshuo/p/14510524.html参考2: https://blog.csdn.net/Me_xuan/article/details/124418176参考3:https://blog.csdn.net/a745233700......
  • 18.Redis常见的面试题
    参考1:https://lebron.blog.csdn.net/article/details/121456167参考2:https://lebron.blog.csdn.net/article/details/120817994......
  • redis的发布订阅模式
    redis的发布订阅模式redis发布订阅(pub/sub)是一种消息通信模式,消息的发布者不会将消息发送给特定的订阅者,而是通过消息通道(频道)广播出去,让订阅该消息主题(频道)的订阅......
  • Redis 实现分布式锁
    Redis实现分布式锁JVM层面的加锁(synchronized,ReentraLock) 单机版的锁分布式微服务架构中,为了避免各个微服务之间发生冲突和数据故障从而引入一种锁--分布式锁......
  • redis: hash类型
    Hash类型,也叫散列,其value是一个无序字典,类似于Java中的HashMap结构。String结构是将对象序列化为JSON字符串后存储,当需要修改对象某个字段时很不方便:  Hash结构可以......
  • redis:set类型
    Redis的Set结构与Java中的HashSet类似,可以看做是一个value为null的HashMap。因为也是一个hash表,因此具备与HashSet类似的特征:无序元素不可重复查找快支持交集......
  • redis:SortedSet类型
    Redis的SortedSet是一个可排序的set集合,与Java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet中的每一个元素都带有一个score属性,可以基于score属性对元素排序,底......
  • redis:jedis客户端
    导入依赖<!--jedis--><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.7.0</version></dependency><!--单元......
  • redis:jedis连接池
    Jedis本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,因此我们推荐大家使用Jedis连接池代替Jedis的直连方式。importredis.clients.jedis.*;publicclassJe......
  • 认识redis
    Redis诞生于2009年全称是RemoteDictionaryServer远程词典服务器,是一个基于内存的键值型NoSQL数据库。特征:键值(key-value)型,value支持多种不同数据结构,功能丰富单......