近期接触到一个性能调优的问题,需要在 mongo db 中比对大约 100G 的 csv 文件的 key 在 mongo db 中是否存在
base line
首先做一个什么优化都没有的情况下的基准测试:
batch_size = 10**4*3
find_count = 0
total = 0
exception_count = 0
begin = datetime.now()
max_size = 10**6*5
for chunk in pd.read_csv(file, header = None, chunksize=batch_size, names = ['key']):
total += batch_size
try:
find_count += mycol.count_documents({'Key': {'$in': chunk.key.tolist()}})
print('size: {:,}, find: {}, time: {}'.format(total, find_count, datetime.now() - begin))
except:
exception_count += 1
if total > max_size:
break
print('size: {:,}, batch_size: {:,}, time: {}'.format(total, batch_size, datetime.now() - begin))
size: 5,000,000, batch_size: 30,000, time: 0:00:44
500 万的数据,耗时 44 秒
优化一:建立索引
给 Key 栏位建立 hashed 索引:
{
"Key": "hashed"
}
size: 5,000,000, batch_size: 30,000, time: 0:00:34
耗时 34 秒
优化二:启用 network compression
pip install zstandard
import pymongo
import urllib
myclient = pymongo.MongoClient('mongodb://{}:{}@localhost/db?authSource=admin'.format(urllib.parse.quote('root'),
urllib.parse.quote('example')), compressors='zstd')
size: 5,000,000, batch_size: 30,000, time: 0:00:20
耗时 20 秒
优化三:同时使用 index 和 network compression
size: 5,000,000, batch_size: 30,000, time: 0:00:09
优化四:使用多线程
使用多线程,将数据读取和数据查询分开进行,提升并发效率
import pandas as pd
from queue import Queue
from threading import Thread, Lock
my_queue = Queue(maxsize=10)
num_threads = 2
lock = Lock()
def do_stuff(id, q, mycol):
global find_count
global exception_count
while True:
try:
count = mycol.count_documents({'Key': {'$in': q.get()}})
with lock:
find_count += count
print('id: {}, size: {:,}, find: {}, now: {}'.format(id, total, find_count, datetime.now() - begin))
except:
exception_count += 1
print('size exception: {:,}, find: {}, now: {}'.format(batch_size, find_count, datetime.now() - begin))
q.task_done()
for i in range(num_threads):
worker = Thread(target=do_stuff, args=(i, my_queue, mycol))
worker.setDaemon(True)
worker.start()
for chunk in pd.read_csv(file, header = None, chunksize = batch_size, names = ['key']):
total += batch_size
my_queue.put(chunk.key.tolist())
if total > max_size:
break
my_queue.join()
print('size: {:,}, batch_size: {:,}, find: {}, exception: {}, now: {}'.format(total, batch_size, find_count, exception_count, datetime.now() - begin))
size: 5,000,000, batch_size: 30,000, time: 0:00:05
耗时 5 秒
总结
no optimization | index | network compression | multiple thread | result | |
---|---|---|---|---|---|
v | 0:00:45 | ||||
v | 0:00:34 | 1.32x | |||
v | 0:00:20 | 2.25x | |||
v | v | 0:00:09 | 5x | ||
v | v | v | 0:00:06 | 7.5x | |