首页 > 数据库 >MongoDB批量导入Redis优化迭代笔记

MongoDB批量导入Redis优化迭代笔记

时间:2023-06-20 16:04:05浏览次数:45  
标签:迭代 MongoDB Redis System long 查询 mongodb new document

背景

统计最近五天所有content信息的正文字节数(正文字段占用较多),然后根据这个大小,推送存在redis要配置多少的内存。

统计方法

1.在mongodb中查询

db.content_.aggregate([
  {
    $match: {
      updatetime: {
        $gte: 1686134400000,  // 对应日期 "2023-06-07T00:00:00Z" 的毫秒数
        $lte: 1686671999000   // 对应日期 "2023-06-12T23:59:59Z" 的毫秒数
      }
    }
  }
  ,
  {
    $group: {
      _id: null,
      avgSize: {
        $avg: { $strLenBytes: "$content", $encoding: "utf8" }
      }
    }
  }
])

**测试结果:**content字段每条约占3000字节,一天接近20万数据,redis每天存最新一天的数据,设置过期时间5天, 5天x20万数据x3000字节约等于3GB数据。

同步redis采用生产消费者模式

static MyBlockingQueue<Iterator<DBObject>> queue = new MyBlockingQueue<>(24);

private static final ExecutorService executorService = Executors.newFixedThreadPool(6*4); //创建一个固定大小的线程池,线程池大小,根据任务类型,因为都是io查询,线程大部分在等待带宽,所以线程数=cupx2或者3,4都可以,需要测试哪种性能最好。我本地cup6核。
   public static void main(String[] args) throws UnknownHostException {
        long startTime = System.currentTimeMillis();

        //1.连接mongodb,查询时间范围的数据
        int totalPages = 0;
        // 输入日期
        long startTimestamp = LocalDate.parse("2023-06-15").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
        long endTimestamp = LocalDate.parse("2023-06-16").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
        long ExTimestamp = LocalDate.parse("2023-06-11").atStartOfDay(ZoneOffset.UTC).toEpochSecond();

        // 创建 MongoTemplate 实例
        MongoClientURI uri = new MongoClientURI("mongodb://127.0.0.1:2580/zb_contxxx");
        SimpleMongoDbFactory factory = new SimpleMongoDbFactory(uri);
        MongoTemplate mongoTemplate = new MongoTemplate(factory);

        System.out.println("开始查询");
        //添加一个标志,用来判断是否为最后一个查询任务
        Object queryFinishedMarker = new Object();
        for (long i = startTimestamp; i <endTimestamp ; i+=3600) {
            long startTimestamp2 = i;
            executorService.execute(() -> {
                Query query = new Query(Criteria.where("updatetime").gte(startTimestamp2).lte(endTimestamp));

                Iterator<DBObject> cursor = mongoTemplate.getCollection("zb_content_document")
                        //组装查询条件
                        .find(query.getQueryObject())
                        //设置批量从数据库中获取的数据量
                        .batchSize(2000)
                        .iterator();
                try {
                    queue.put(cursor);
                     // 判断是否为最后一个查询任务,是则添加标记对象
                    if (startTimestampTemp + 3600 >= endTimestamp) {
                        queue.put((Iterator<DBObject>) queryFinishedMarker);
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }

        System.out.println("查询完毕");
        //3.添加到redis中
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        JedisPool jedisPool = new JedisPool(poolConfig, "172.0.0.1", 6379);
        Jedis jedis;
        jedis = jedisPool.getResource();
        Pipeline pipeline = jedis.pipelined();
        try {
            while (true){
                Iterator<DBObject> cursor = queue.take();
                if (cursor == queryFinishedMarker) {
                    // 所有数据已经消费完毕,退出循环
                    break;
                }
                while (cursor.hasNext()) {
                    DBObject document = cursor.next();
                    //过期时间为数据跟新时间减去自定义时间达到,早入库的早过期,晚入库的晚过期的效果,比如今天是20号,20号入库的数据5天后过期,19号入库的数据4天后过期。
                    pipeline.setex(document.get("_id").toString(), (int) ((Long)document.get("updatetime") - ExTimestamp), (String) document.get("content"));
                    System.out.println(document.toString());
                    System.out.println("总数:" + totalPages++);
                    pipeline.sync();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 异常处理
        } finally {
            if (jedis != null) {
                jedis.close(); // 释放连接
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("执行总耗时:"+(endTime - startTime));
    }

在上述代码中:

1.我们采用redis pipeline管道进行数据批量提交,而不是一条一条提交,提高效率。

2.将mongodb查库采用每小时一个查询,弃用翻页查询采用游标查询,一次查询2000条。

3.采用多线程生产者消费者模式,每个线程查询一个小时的数据,将结果加入到队列中,redis负责消费队列中的数据。

测试结果:

消费者一直处于饥饿状态,生产者的速度太慢,生产消费者模式效果达不到预期。

改进建议:

删除生产者消费者模式,改为多线程查询数据直接插入redis中。

最终完整代码:

public static void main(String[] args) throws UnknownHostException {
        long startTime = System.currentTimeMillis();

        //1.连接mongodb,查询时间范围的数据,翻页查询
        int totalPages = 0;
        // 输入日期
        long startTimestamp = LocalDate.parse("2023-06-17").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
        long endTimestamp = LocalDate.parse("2023-06-18").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
        long ExTimestamp = LocalDate.parse("2023-06-12").atStartOfDay(ZoneOffset.UTC).toEpochSecond();

        //查询语句

        // 创建 MongoTemplate 实例
        MongoClientURI uri = new MongoClientURI("mongodb://127.0.0.1:xxx/zb_contxxx");
        SimpleMongoDbFactory factory = new SimpleMongoDbFactory(uri);
        MongoTemplate mongoTemplate = new MongoTemplate(factory);

        System.out.println("开始查询");
        //添加一个标志,用来判断是否为最后一个查询任务
        Object queryFinishedMarker = new Object();

        JedisPoolConfig poolConfig = new JedisPoolConfig();
        JedisPool jedisPool = new JedisPool(poolConfig, "127.0.0.1", 6379);
        Jedis jedis;
        jedis = jedisPool.getResource();

        for (long i = startTimestamp; i <endTimestamp ; i+=3600) {
            long startTimestampTemp = i;
            //线程池数量,线程池数量根据任务类型,你是io型任务还是计算密集类型,大部分时间是花在io等待数据返回,所以线程池数量可以开到cpu数 x2最少,你可以从x2开始往上试,×3,x4,甚至x5
            executorService.execute(() -> {
                //看下updatetime字段的索引,mongodb的底层也是b+树,看看他有没有索引,索引的区分度高不高,如果不高,需要换查询方法,换成id之类的区分度高的有索引的。
                Query query = new Query(Criteria.where("updatetime").gte(startTimestampTemp).lte(endTimestamp));

                Iterator<DBObject> cursor = mongoTemplate.getCollection("zb_content_document")
                        //组装查询条件
                        .find(query.getQueryObject())
                        //设置批量从数据库中获取的数据量
                        .batchSize(5000)
                        .iterator();

                Pipeline pipeline = jedis.pipelined();
                while (cursor.hasNext()) {
                    DBObject document = cursor.next();
                    pipeline.setex(document.get("_id").toString(), (int) ((Long)document.get("updatetime") - ExTimestamp), (String) document.get("content"));
                    System.out.println(document.toString());
                    pipeline.sync();
                }
            });

        }
        long endTime = System.currentTimeMillis();
        System.out.println("执行总耗时:"+(endTime - startTime));
    }

代码导入版本迭代记录

  1. 将mongodb查出来,redis插入数据
  2. redis改成管道,一次批量提交数据
  3. 优化mongodb查询,mongodb底层是b+树,需要我们看一下查询时的索引是否存在,以及索引的分辨度高不高。
  4. mongodb改成使用迭代器分批查询 75万数据80分钟
  5. 添加线程池,使用多线程生产消费者模式
  6. 添加标志位用来结束程序
  7. 实际测试,消费者一直处于饥饿状态,删除生产消费者模式,改为mongodb和redis用同一个线程
  8. 优化线程池数量,线程执行任务都是io处理,所以线程的大部分时间是等待io返回,带宽处理,所以线程数可以多开一些,从cup数x2往上测试。

结果总结

经过五天的测试,每天加入一天的数据,并设置5天过期时间,加上内存碎片率的为1.4,实际需要内存最少3GBx1.4=4.2GB内存,为了增加一些冗余,设置redis内存5个GB。

标签:迭代,MongoDB,Redis,System,long,查询,mongodb,new,document
From: https://blog.51cto.com/u_15836311/6523347

相关文章

  • 使用docker-compose同时启动MySQL和Redis
    环境查看安装docker和docker-composeapt-yinstalldocker.iodocker-compose设置docker-compose配置文件root@iZ2zebcd9hncu1371fetliZ:/data/docker-compose#pwd/data/docker-composeroot@iZ2zebcd9hncu1371fetliZ:/data/docker-compose#catdocker-compose.ymlvers......
  • 缓存方案之Redis
    Redis简介  Redis是RemoteDictionaryServer(Redis)的缩写,或许光听名字你就能猜出它大概是做什么的。不错,它是一个由SalvatoreSanfilippo编写的key-value存储系统,是一个使用ANSIC语言编写、遵守BSD协议、支持网络、可基于内存亦可持久化的日志型的Key-Value数据库,并提供多种......
  • 强化学习从基础到进阶-常见问题和面试必知必答[2]:马尔科夫决策、贝尔曼方程、动态规划
    强化学习从基础到进阶-常见问题和面试必知必答[2]:马尔科夫决策、贝尔曼方程、动态规划、策略价值迭代1.马尔科夫决策核心词汇马尔可夫性质(Markovproperty,MP):如果某一个过程未来的状态与过去的状态无关,只由现在的状态决定,那么其具有马尔可夫性质。换句话说,一个状态的下一个状态......
  • 强化学习从基础到进阶-常见问题和面试必知必答[2]:马尔科夫决策、贝尔曼方程、动态规划
    强化学习从基础到进阶-常见问题和面试必知必答[2]:马尔科夫决策、贝尔曼方程、动态规划、策略价值迭代1.马尔科夫决策核心词汇马尔可夫性质(Markovproperty,MP):如果某一个过程未来的状态与过去的状态无关,只由现在的状态决定,那么其具有马尔可夫性质。换句话说,一个状态的下一个状态......
  • 代码随想录算法训练营第十二天| 递归遍历 (必须掌握)迭代遍历 统一迭代
    递归遍历重点:1,TreeNode的自定义2,val=0== val=NULL;代码:1voidpreRecursor(TreeNode*root,vector<int>&result)2{3if(root==NULL)4return;5result.push_back(root->val);6preRecursor(root->left,result);7......
  • 1.redis常见数据类型-字符串String、列表List、集合Set、Hash哈希、Zset有序集合
    背景:这里说的数据类型是value的数据类型,key的类型都是字符串。命令不区分大小写,而key的值是区分大小写的 help@+数据类型会出现命令提示比如help@string,help@list常见命令:keys*查看当前库所有key(匹配:keys*1)existskey判断某个key是否存在typekey查看你的......
  • lettuce+redisTemplate实现redis单击和集群的整合
    lettuce+redisTemplate实现redis单击和集群的整合Springboot整合redis是非常方便的,大致包含如下四部分pomstart相关jar的引入properties/yaml基础配置信息configbean的initbean的注入及使用如果遇到网上的自动装配的实例直接跳过吧,哪怕再小的公司,密码也会加密处理......
  • Windows环境下Redis的安装以及Redis Desktop Manager的下载安装
    ————本文介绍了Windows环境下Redis的安装,以及Redis数据库管理工具RedisDesktopManager的下载和安装目录|一、Windows环境下安装Redis||--|--||二、RedisDesktopManager的下载及安装|一、Windows环境下安装Redis下载地址:https://github.com/tporadowski/redis/......
  • 2023-06-19:讲一讲Redis分布式锁的实现?
    2023-06-19:讲一讲Redis分布式锁的实现?答案2023-06-19:Redis分布式锁最简单的实现要实现分布式锁,确实需要使用具备互斥性的Redis操作。其中一种常用的方式是使用SETNX命令,该命令表示"SETifNotExists",即只有在key不存在时才设置其值,否则不进行任何操作。通过这种方式,两个客户端......
  • 2023-06-19:讲一讲Redis分布式锁的实现?
    2023-06-19:讲一讲Redis分布式锁的实现?答案2023-06-19:Redis分布式锁最简单的实现要实现分布式锁,确实需要使用具备互斥性的Redis操作。其中一种常用的方式是使用SETNX命令,该命令表示"SETifNotExists",即只有在key不存在时才设置其值,否则不进行任何操作。通过这种方式,两个客户端进程......