首页 > 编程语言 >使用Python脚本实现ElasticSearch的在线数据迁移

使用Python脚本实现ElasticSearch的在线数据迁移

时间:2023-07-30 23:02:41浏览次数:36  
标签:index 在线 source Python id dest ElasticSearch scroll es

该脚本的功能,类似于 elasticsearch-dump ,二者都是基于scroll来实现的(包括reindex底层也是scroll)。


依赖包

# 我这里演示的ES是7.x的,如果下面的脚本运行报错,请考虑调整这里的python的elasticsearch包版本
pip install elasticsearch==7.13.1


配置文件

vim configs.py

# -*- coding: utf-8 -*-

# es数据源的信息
es_source_host = ['127.0.0.1:9200']  # 支持多个节点间用逗号分隔
es_source_index = "index-test1"

# es目标库的信息
es_dest_host = ['127.0.0.1:9200']
es_dest_index = "index-test2"

# 每次取的条数
batch_size = 2000

# 每轮休眠的时间(单位秒)
sleep_time = 0


主程序

vim run.py

# -*- coding: utf-8 -*-
import json
import time
import configs

from elasticsearch import Elasticsearch

src_es = Elasticsearch(hosts = configs.es_source_host,maxsize=16)
dest_es = Elasticsearch(hosts = configs.es_dest_host,maxsize=16)

start_ts = time.time()

scroll_time = '5m'  # 指定 Scroll 上下文的存活时间

src_index_name = configs.es_source_index
dest_index_name = configs.es_dest_index

def create_dest_index():
    try:
        dest_es.indices.create(
            index=configs.es_dest_index,
            body={"settings": {"index": {"number_of_shards": 4}}},
        )

    except Exception as e:
        print(str(e))

def update_dest_index_setting(time_dur,replicas):
    try:
        res = dest_es.indices.put_settings(
            index=configs.es_dest_index,
            body={"index.refresh_interval": time_dur, "number_of_replicas": replicas},
        )
        print(res)
    except Exception as e:
        print(str(e))


def update_dest_index_mapping():
    dest_mapping = src_es.indices.get_mapping(index=configs.es_source_index)[configs.es_source_index]["mappings"]
    try:
        res = dest_es.indices.put_mapping(body=dest_mapping, index=configs.es_dest_index)
        print(res)
    except Exception as e:
        print(str(e))



def migrate():
    query = {
        "query": {
            "match_all": {}  # 查询所有文档
        }
    }

    # 计数下,用于最后确认scroll的次数
    count = 0

    # 初始化 Scroll 上下文
    response = src_es.search(index=src_index_name, scroll=scroll_time, body=query,size=configs.batch_size)
    scroll_id = response['_scroll_id']
    hits = response['hits']['hits']

    # 处理第一批结果,拼装bulk需要的数据结构
    data_list1=[]
    for hit in hits:
        data1={}
        _id, _source = hit["_id"], hit["_source"]
        data1["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
        data_list1.append(data1["index"])
        data_list1.append(_source)

    # 把第一次找出的数据,拼装好的结果写入目标ES
    dest_res = dest_es.bulk(index=dest_index_name, body=data_list1)
    if dest_res["errors"]:
        for item in response["items"]:
            if "error" in item["index"]:
                print(f"Failed operation: {item['index']}")
    else:
        print("Bulk operations completed successfully!")

    count += 1

    # 滚动获取剩余结果
    while True:
        if len(hits) < 0:
            break

        response = src_es.scroll(scroll_id=scroll_id, scroll=scroll_time)
        scroll_id = response['_scroll_id']
        # print("scroll_id ---> ", scroll_id )
        hits = response['hits']['hits']

        # 拼装bulk需要的数据结构
        data_list2=[]
        for hit in hits:
            data2={}
            _id, _source = hit["_id"], hit["_source"]
            data2["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
            data_list2.append(data2["index"])
            data_list2.append(_source)

        if len(data_list2) <=0:
            break
        dest_res = dest_es.bulk(index=dest_index_name, body=data_list2)
        if dest_res["errors"]:
            for item in response["items"]:
                if "error" in item["index"]:
                    print(f"Failed operation: {item['index']}")
        else:
            print("Bulk operations completed successfully!")

        time.sleep(configs.sleep_time)

        count += 1


    stop_ts = time.time()
    print('scroll 遍历的次数: ', count, '耗时(秒):', int(stop_ts - start_ts))



if __name__ == '__main__':
    create_dest_index()  # 创建目标索引
    update_dest_index_setting("60s",0)  # 临时降低持久性,提升写入性能
    update_dest_index_mapping()  # 复制mapping
    migrate() # 数据同步
    update_dest_index_setting("1s",1)  # 提升持久性,确保数据安全性


执行

python run.py


效率

测试下来,速度还是很给力的。
测试数据集:
	docs: 639566
	primary size: 179.78MB

耗时:
elasticsearch-dump迁移耗时7分钟。
python脚本迁移耗时 4分钟(可能是因为我脚本里面的迁移前先调大refresh的功劳?)。


标签:index,在线,source,Python,id,dest,ElasticSearch,scroll,es
From: https://blog.51cto.com/lee90/6903652

相关文章

  • Python安装技术类库模块
    方法1:方法2:用如下命令安装即可(注意都得是英文字符):#简单粗暴,但是可能安装到了不同的环境pipinstallsome-package#复杂但是精准还快速C:\Python310\python.exe-mpipinstall-ihttps://pypi.tuna.tsinghua.edu.cn/simplesome-package其中:C:\Python310\python.......
  • Python报错 | ImportError: To be able to use evaluate-metric/seqeval, you need to
    报错信息使用metric=evaluate.load("seqeval")的时候,报如下错误:ImportError:Tobeabletouseevaluate-metric/seqeval,youneedtoinstallthefollowingdependencies['seqeval']using'pipinstallseqeval'forinstance'错误原因这个错误提示表......
  • Python - 使用 Matplotlib 可视化在 NetworkX 中生成的图形
    介绍Python代表了一种灵活的编码语言,以其易用性和清晰性而闻名。这提供了许多库和组件,用于简化不同的任务,包括创建图形和显示。NetworkX代表了一个高效的Python工具包,用于构建、更改和研究复杂网络的排列、移动和操作。然而,Matplotlib是一个流行的工具包,用于在Python中创建静......
  • python数据分析师入门-学习笔记(爬虫-序言)
    爬虫到底是什么概括爬虫是批量化自动获取既有数据批量化自动既有数据通常获取既有数据特殊批量注册一批账号批量去领取优惠券批量自动下单购物自动做任务(签到)实际应用企业中:竞品调研数据采集办公自动化个人:比如看小说有的网站收费有的网站不收费......
  • python爬虫基础
    前言都说爬虫简介1、首先我们需要知道爬虫是什么?爬虫实际上是一段程序,我们可以通过这段程序从互联网上获取到我们想要的数据,这里还有另外一种解释是我们使用程序来模拟浏览器向服务器发送请求,来获取响应信息2、爬虫的核心:(1)、爬取网页:爬取整个网页,包含网页中的所有内容(2)、解......
  • opencv-python 卷积操作
    1图像卷积图像卷积就是卷积核在图像上按行滑动遍历像素时不断的相乘求和的过程,卷积可以用来提取特征,去噪,平滑等。如下图: ......
  • opencv-python霍夫变换
    1霍夫线检测原理霍夫变换常用来提取图像中的直线和圆等几何形状。在笛卡尔坐标系中,直线可以表示为y=kx+q 也就是说通过变量k,q可以确定一条直线,把直线写成关于k,q的函数,进行空间转换,转换后的空间称为霍夫空间。也就是说:笛卡尔坐标系中的一条线对应了霍夫空间的一个点。反过......
  • python argparse—用于命令行选项、参数和子命令的解析器
    参考:https://docs.python.org/3/library/argparse.htmlargparse.ArgumentParser:创建Parser对象语法格式class argparse.ArgumentParser(prog=None, usage=None, description=None, epilog=None, parents=[], formatter_class=argparse.HelpFormatter, prefix_chars='-......
  • 【es】elasticsearch生产数据备份和恢复方案
    https://huaweicloud.csdn.net/637f7ae4dacf622b8df859b1.html?spm=1001.2101.3001.6650.2&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Eactivity-2-119985243-blog-126509989.235%5Ev38%5Epc_relevant_sort&depth_1-utm_source=distr......
  • python的内置方法
     类型判断issubclass首先,我们先看issubclass()这个内置函数可以帮我们判断x类是否是y类型的子类classBase:passclassFoo(Base):passclassBar(Foo):passprint(issubclass(Bar,Foo))#Trueprint(issubclass(Foo,Bar))#False......