背景
统计最近五天所有content信息的正文字节数(正文字段占用较多),然后根据这个大小,推送存在redis要配置多少的内存。
统计方法
1.在mongodb中查询
db.content_.aggregate([
{
$match: {
updatetime: {
$gte: 1686134400000, // 对应日期 "2023-06-07T00:00:00Z" 的毫秒数
$lte: 1686671999000 // 对应日期 "2023-06-12T23:59:59Z" 的毫秒数
}
}
}
,
{
$group: {
_id: null,
avgSize: {
$avg: { $strLenBytes: "$content", $encoding: "utf8" }
}
}
}
])
**测试结果:**content字段每条约占3000字节,一天接近20万数据,redis每天存最新一天的数据,设置过期时间5天, 5天x20万数据x3000字节约等于3GB数据。
同步redis采用生产消费者模式
static MyBlockingQueue<Iterator<DBObject>> queue = new MyBlockingQueue<>(24);
private static final ExecutorService executorService = Executors.newFixedThreadPool(6*4); //创建一个固定大小的线程池,线程池大小,根据任务类型,因为都是io查询,线程大部分在等待带宽,所以线程数=cupx2或者3,4都可以,需要测试哪种性能最好。我本地cup6核。
public static void main(String[] args) throws UnknownHostException {
long startTime = System.currentTimeMillis();
//1.连接mongodb,查询时间范围的数据
int totalPages = 0;
// 输入日期
long startTimestamp = LocalDate.parse("2023-06-15").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
long endTimestamp = LocalDate.parse("2023-06-16").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
long ExTimestamp = LocalDate.parse("2023-06-11").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
// 创建 MongoTemplate 实例
MongoClientURI uri = new MongoClientURI("mongodb://127.0.0.1:2580/zb_contxxx");
SimpleMongoDbFactory factory = new SimpleMongoDbFactory(uri);
MongoTemplate mongoTemplate = new MongoTemplate(factory);
System.out.println("开始查询");
//添加一个标志,用来判断是否为最后一个查询任务
Object queryFinishedMarker = new Object();
for (long i = startTimestamp; i <endTimestamp ; i+=3600) {
long startTimestamp2 = i;
executorService.execute(() -> {
Query query = new Query(Criteria.where("updatetime").gte(startTimestamp2).lte(endTimestamp));
Iterator<DBObject> cursor = mongoTemplate.getCollection("zb_content_document")
//组装查询条件
.find(query.getQueryObject())
//设置批量从数据库中获取的数据量
.batchSize(2000)
.iterator();
try {
queue.put(cursor);
// 判断是否为最后一个查询任务,是则添加标记对象
if (startTimestampTemp + 3600 >= endTimestamp) {
queue.put((Iterator<DBObject>) queryFinishedMarker);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
System.out.println("查询完毕");
//3.添加到redis中
JedisPoolConfig poolConfig = new JedisPoolConfig();
JedisPool jedisPool = new JedisPool(poolConfig, "172.0.0.1", 6379);
Jedis jedis;
jedis = jedisPool.getResource();
Pipeline pipeline = jedis.pipelined();
try {
while (true){
Iterator<DBObject> cursor = queue.take();
if (cursor == queryFinishedMarker) {
// 所有数据已经消费完毕,退出循环
break;
}
while (cursor.hasNext()) {
DBObject document = cursor.next();
//过期时间为数据跟新时间减去自定义时间达到,早入库的早过期,晚入库的晚过期的效果,比如今天是20号,20号入库的数据5天后过期,19号入库的数据4天后过期。
pipeline.setex(document.get("_id").toString(), (int) ((Long)document.get("updatetime") - ExTimestamp), (String) document.get("content"));
System.out.println(document.toString());
System.out.println("总数:" + totalPages++);
pipeline.sync();
}
}
} catch (Exception e) {
e.printStackTrace();
// 异常处理
} finally {
if (jedis != null) {
jedis.close(); // 释放连接
}
}
long endTime = System.currentTimeMillis();
System.out.println("执行总耗时:"+(endTime - startTime));
}
在上述代码中:
1.我们采用redis pipeline管道进行数据批量提交,而不是一条一条提交,提高效率。
2.将mongodb查库采用每小时一个查询,弃用翻页查询采用游标查询,一次查询2000条。
3.采用多线程生产者消费者模式,每个线程查询一个小时的数据,将结果加入到队列中,redis负责消费队列中的数据。
测试结果:
消费者一直处于饥饿状态,生产者的速度太慢,生产消费者模式效果达不到预期。
改进建议:
删除生产者消费者模式,改为多线程查询数据直接插入redis中。
最终完整代码:
public static void main(String[] args) throws UnknownHostException {
long startTime = System.currentTimeMillis();
//1.连接mongodb,查询时间范围的数据,翻页查询
int totalPages = 0;
// 输入日期
long startTimestamp = LocalDate.parse("2023-06-17").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
long endTimestamp = LocalDate.parse("2023-06-18").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
long ExTimestamp = LocalDate.parse("2023-06-12").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
//查询语句
// 创建 MongoTemplate 实例
MongoClientURI uri = new MongoClientURI("mongodb://127.0.0.1:xxx/zb_contxxx");
SimpleMongoDbFactory factory = new SimpleMongoDbFactory(uri);
MongoTemplate mongoTemplate = new MongoTemplate(factory);
System.out.println("开始查询");
//添加一个标志,用来判断是否为最后一个查询任务
Object queryFinishedMarker = new Object();
JedisPoolConfig poolConfig = new JedisPoolConfig();
JedisPool jedisPool = new JedisPool(poolConfig, "127.0.0.1", 6379);
Jedis jedis;
jedis = jedisPool.getResource();
for (long i = startTimestamp; i <endTimestamp ; i+=3600) {
long startTimestampTemp = i;
//线程池数量,线程池数量根据任务类型,你是io型任务还是计算密集类型,大部分时间是花在io等待数据返回,所以线程池数量可以开到cpu数 x2最少,你可以从x2开始往上试,×3,x4,甚至x5
executorService.execute(() -> {
//看下updatetime字段的索引,mongodb的底层也是b+树,看看他有没有索引,索引的区分度高不高,如果不高,需要换查询方法,换成id之类的区分度高的有索引的。
Query query = new Query(Criteria.where("updatetime").gte(startTimestampTemp).lte(endTimestamp));
Iterator<DBObject> cursor = mongoTemplate.getCollection("zb_content_document")
//组装查询条件
.find(query.getQueryObject())
//设置批量从数据库中获取的数据量
.batchSize(5000)
.iterator();
Pipeline pipeline = jedis.pipelined();
while (cursor.hasNext()) {
DBObject document = cursor.next();
pipeline.setex(document.get("_id").toString(), (int) ((Long)document.get("updatetime") - ExTimestamp), (String) document.get("content"));
System.out.println(document.toString());
pipeline.sync();
}
});
}
long endTime = System.currentTimeMillis();
System.out.println("执行总耗时:"+(endTime - startTime));
}
代码导入版本迭代记录
- 将mongodb查出来,redis插入数据
- redis改成管道,一次批量提交数据
- 优化mongodb查询,mongodb底层是b+树,需要我们看一下查询时的索引是否存在,以及索引的分辨度高不高。
- mongodb改成使用迭代器分批查询 75万数据80分钟
- 添加线程池,使用多线程生产消费者模式
- 添加标志位用来结束程序
- 实际测试,消费者一直处于饥饿状态,删除生产消费者模式,改为mongodb和redis用同一个线程
- 优化线程池数量,线程执行任务都是io处理,所以线程的大部分时间是等待io返回,带宽处理,所以线程数可以多开一些,从cup数x2往上测试。
结果总结
经过五天的测试,每天加入一天的数据,并设置5天过期时间,加上内存碎片率的为1.4,实际需要内存最少3GBx1.4=4.2GB内存,为了增加一些冗余,设置redis内存5个GB。
标签:迭代,MongoDB,Redis,System,long,查询,mongodb,new,document From: https://blog.51cto.com/u_15836311/6523347