该脚本的功能,类似于 elasticsearch-dump ,二者都是基于scroll来实现的(包括reindex底层也是scroll)。
依赖包
# 我这里演示的ES是7.x的,如果下面的脚本运行报错,请考虑调整这里的python的elasticsearch包版本
pip install elasticsearch==7.13.1
配置文件
vim configs.py
# -*- coding: utf-8 -*-
# es数据源的信息
es_source_host = ['127.0.0.1:9200'] # 支持多个节点间用逗号分隔
es_source_index = "index-test1"
# es目标库的信息
es_dest_host = ['127.0.0.1:9200']
es_dest_index = "index-test2"
# 每次取的条数
batch_size = 2000
# 每轮休眠的时间(单位秒)
sleep_time = 0
主程序
vim run.py
# -*- coding: utf-8 -*-
import json
import time
import configs
from elasticsearch import Elasticsearch
src_es = Elasticsearch(hosts = configs.es_source_host,maxsize=16)
dest_es = Elasticsearch(hosts = configs.es_dest_host,maxsize=16)
start_ts = time.time()
scroll_time = '5m' # 指定 Scroll 上下文的存活时间
src_index_name = configs.es_source_index
dest_index_name = configs.es_dest_index
def create_dest_index():
try:
dest_es.indices.create(
index=configs.es_dest_index,
body={"settings": {"index": {"number_of_shards": 4}}},
)
except Exception as e:
print(str(e))
def update_dest_index_setting(time_dur,replicas):
try:
res = dest_es.indices.put_settings(
index=configs.es_dest_index,
body={"index.refresh_interval": time_dur, "number_of_replicas": replicas},
)
print(res)
except Exception as e:
print(str(e))
def update_dest_index_mapping():
dest_mapping = src_es.indices.get_mapping(index=configs.es_source_index)[configs.es_source_index]["mappings"]
try:
res = dest_es.indices.put_mapping(body=dest_mapping, index=configs.es_dest_index)
print(res)
except Exception as e:
print(str(e))
def migrate():
query = {
"query": {
"match_all": {} # 查询所有文档
}
}
# 计数下,用于最后确认scroll的次数
count = 0
# 初始化 Scroll 上下文
response = src_es.search(index=src_index_name, scroll=scroll_time, body=query,size=configs.batch_size)
scroll_id = response['_scroll_id']
hits = response['hits']['hits']
# 处理第一批结果,拼装bulk需要的数据结构
data_list1=[]
for hit in hits:
data1={}
_id, _source = hit["_id"], hit["_source"]
data1["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
data_list1.append(data1["index"])
data_list1.append(_source)
# 把第一次找出的数据,拼装好的结果写入目标ES
dest_res = dest_es.bulk(index=dest_index_name, body=data_list1)
if dest_res["errors"]:
for item in response["items"]:
if "error" in item["index"]:
print(f"Failed operation: {item['index']}")
else:
print("Bulk operations completed successfully!")
count += 1
# 滚动获取剩余结果
while True:
if len(hits) < 0:
break
response = src_es.scroll(scroll_id=scroll_id, scroll=scroll_time)
scroll_id = response['_scroll_id']
# print("scroll_id ---> ", scroll_id )
hits = response['hits']['hits']
# 拼装bulk需要的数据结构
data_list2=[]
for hit in hits:
data2={}
_id, _source = hit["_id"], hit["_source"]
data2["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
data_list2.append(data2["index"])
data_list2.append(_source)
if len(data_list2) <=0:
break
dest_res = dest_es.bulk(index=dest_index_name, body=data_list2)
if dest_res["errors"]:
for item in response["items"]:
if "error" in item["index"]:
print(f"Failed operation: {item['index']}")
else:
print("Bulk operations completed successfully!")
time.sleep(configs.sleep_time)
count += 1
stop_ts = time.time()
print('scroll 遍历的次数: ', count, '耗时(秒):', int(stop_ts - start_ts))
if __name__ == '__main__':
create_dest_index() # 创建目标索引
update_dest_index_setting("60s",0) # 临时降低持久性,提升写入性能
update_dest_index_mapping() # 复制mapping
migrate() # 数据同步
update_dest_index_setting("1s",1) # 提升持久性,确保数据安全性
执行
python run.py
效率
测试下来,速度还是很给力的。
测试数据集:
docs: 639566
primary size: 179.78MB
耗时:
elasticsearch-dump迁移耗时7分钟。
python脚本迁移耗时 4分钟(可能是因为我脚本里面的迁移前先调大refresh的功劳?)。