一、项目背景
由于项目升级需要将es索引迁移,从es版本看是从elasticsearch-5.6.6版本迁移到elasticsearch-7.17.5版本中,因之前其他项目采用elasticdump工具迁移,有过成功经验,所以首先借鉴其经验采用elasticdump工具来实现。
注意:由于网络、服务器性能等的关系,elasticdump工具在索引doc数超过5千万可能会发生中断现象,并且在elasticsearch 6.0版本之后不支持offset参数进行断点续传,可以采用ES自带接口reindex(推荐指数:★★★★★)方式进行迁移。
如果各位读者工作中项目比较着急,建议可以直接跳转【最佳实践】进行阅读实践
二、elasticdump工具介绍及实践
(一)elasticdump工具简介
首先先来简单认识一下elasticdump工具:
Elasticdump 是一个用于导入和导出 Elasticsearch 数据的命令行工具。它提供了一种方便的方式来在不同的Elasticsearch 实例之间传输数据,或者进行数据备份和恢复。
使用 Elasticdump,你可以将 Elasticsearch 索引中的数据导出为 JSON 文件,或者将 JSON 文件中的数据导入到 Elasticsearch 索引中。它支持各种选项和过滤器,用于指定源和目标,包括索引模式、文档类型、查询过滤器等等。
主要特征包括
- 支持在Elasticsearch实例或者集群之间传输和备份数据。可以将数据从一个集群复制到另一个集群。
- 支持不同格式的数据传输,包括JSON、NDJSON、CSV、备份文件等。
- 可以通过命令行或者程序化的方式使用。命令行方式提供了便捷的操作接口。
- 支持增量式同步,只复制目标集群中不存在的文档。
- 支持各种认证方式连接Elasticsearch,如basic auth、Amazon IAM等。
- 支持多线程操作,可以加快数据迁移的速度。
- 开源免费,代码托管在GitHub上。
(二)elasticdump工具安装
1、迁移工具离线准备
a.准备elasticdump工具
-
下载node安装包(有公网的服务器A)
wget https://nodejs.org/dist/v16.14.0/node-v16.14.0-linux-x64.tar.xz
b.安装node
-
解压:
tar -xvf node-v16.14.0-linux-x64.tar.xz -C /usr/local
c.建立软链接
ln -s /usr/local/node-v16.14.0-linux-x64/bin/node /usr/bin/node
ln -s /usr/local/node-v16.14.0-linux-x64/bin/npm /usr/bin/npm
d.确认安装成功
node -v
npm -v
e.安装打包工具 npm-pack-all
npm install -g npm-pack-all
f.安装elasticdump
npm install elasticdump -g
g.打包elasticdump
cd /usr/local/node-v16.3.0-linux-x64/lib/node_modules/elasticdump
npm-pack-all --output build/elasticdump.tgz
#当前目录build/下生成elasticdump.tgz
2、迁移工具离线安装
a.将node安装包和elasticdump安装包复制到离线安装的服务器B
node-v16.14.0-linux-x64.tar.xz、elasticdump.tgz
b.在服务器B上部署elasticdump
tar -xvf node-v16.14.0-linux-x64.tar.xz -C /usr/local
tar -zxvf elasticdump.tgz -C /usr/local
ln -s /usr/local/node-v16.14.0-linux-x64/bin/node /usr/bin/node
ln -s /usr/local/node-v16.14.0-linux-x64/bin/npm /usr/bin/npm
ln -s /usr/local/package/bin/elasticdump /usr/bin/elasticdump
c.核对elasticdump是否安装成功
node -v
npm -v
elasticdump --version
d.验证elasticdump工具是否能正常使用
- 库对文件
elasticdump --limit=10000 --input=http://elastic:elastic@10.5.3.5:9200/demo_2023-02 --output=/home/esjson/demo_2023-02.json
- 库对库
elasticdump --input= --input=http://elastic:elastic@10.5.3.5:9200/demo_2023-02 --output= --input=http://elastic:elastic@10.5.3.6:9200/demo_2023-02
(三)参数说明
● --input 源位置(必填)
● --input-index 来源索引和类型(默认值: all,示例: index/type)
● --output 目标位置(必填)
● --output-index 目标索引和类型(默认值: all,示例: index/type)
● --limit 每次操作批量移动多少个对象(docs)限制是文件流的近似值(默认值:100 )
● --size 要检索多少个对象(默认值:- 1 ->无限制)
● --debug 显示正在使用的elasticsearch命令debug日志(默认值:false)
● --quiet 禁止显示除错误之外的所有消息(默认值:false)
● --type 指定迁移的索引类型 (默认值: data, 选项: [data, settings, analyzer, mapping, alias])
● --delete 当文档被移动时,从输入中逐个删除它们,不会删除源索引 (默认值:false)
● --overwrite 如果存在,覆盖输出文件(默认:false)
● --bulkaction 设置准备发送给elasticsearch的请求体时使用的操作类型。(default:索引,选项:[索引,更新,删除,创建])
● --filtersystemtemplates 是否删除metrics--和logs--系统模板(默认:true)
● --templateRegex 用正则表达式在输出传输之前过滤模板(默认:(metrics|logs|..+)(-.+)?)
● --delete-with-routing 将路由查询参数传递给delete函数,实现将操作路由到特定的分片。(默认值:false)
● --skip-existing 跳过已经存在的异常资源,并成功退出(默认:false)
● --headers 为Elastisearch 请求添加自定义头(当 Elasticsearch 实例位于代理之后很有用)(默认值: ' {"User-Agent": "elasticdump"} ' )
● --params 添加自定义参数到 Elastisearch 请求 uri。例如:当希望使用elasticsearch首选项(默认值:null )
● --searchBody 根据搜索结果执行部分提取 (当 ES为输入时,默认值为: if ES > 5 '{"query": { "match_all": {} }, "stored_fields": ["*"], "_source": true }' else '{"query": { "match_all": {} }, "fields": ["*"], "_source": true }' [As of 6.68.0] 如果searchBody前面有@符号,elasticdump将在执行文件指定的位置查找。注意:文件必须包含有效的JSON
● --searchWithTemplate 启用---searchBody时使用搜索模板。如果使用Search Template,则searchBody必须由"id"字段和"params"对象组成。如果在Search Template中定义了"size"字段,则它将被--size parameter覆盖。
● --searchBodyTemplate 一个方法/函数,可以调用到searchBody。可以多次使用。 doc.searchBody = { query: { match_all: {} }, stored_fields: [], _source: true };
● --sourceOnly 仅输出document_source中包含的json(默认值:false) Normal: {"index":"","type":"","_id":"", "__source":{SOURCE}} sourceOnly: {SOURCE}
● --ignore-errors 写错误时是否继续读/写循环 (默认值: false)
● --ignore-es-write-errors 将对elasticsearch的写错误继续进行读写循环(默认值:true)
● --scrollId 从elasticsearch返回的最后一个滚动Id。允许使用最后一个滚动id并且scrollTime没有过期来恢复转储。
● --scrollTime 节点将按顺序保存请求的搜索时间(默认值:10m)
● --scroll-with-post 使用HTTP POST方法来执行滚动,而不是默认的GET方式(默认值:false)
● --maxSockets 可以同时处理多少个 HTTP 请求 (默认值:5 [node <= v0.10.x] |无限 [node >= v0.11.x] )
● --timeout 整数,包含在放弃请求之前等待请求响应的毫秒数,直接到请求库。当不太关心导入时是否丢失一些数据,而是希望有速度时使用。
● --offset 整数,包含要跳过的行数,在input传输之前。当导入大的索引,事情可能会出错,无论是连接性、系统崩溃或者忘记“筛选”等。这可以让你从最后写入的已知行再次开始转储(由output中的“offset”记录) 。由于最初转储是最初创建的,没有真正的方法可以保证跳过的行已经写入或解析,可以使用该参数(默认值:0 )
● --noRefresh 禁用输入索引刷新。优点:1 . 大大提高索引速度;2 .硬件要求少得多。缺点:1 . 最近添加的数据可能不会被索引,建议与大数据索引一起使用,速度和系统健康状况具有更高的优先级。
● --inputTransport 提供自定义js文件用作输入传输
● --outputTransport 提供自定义js文件用作输出传输
● --toLog 使用自定义outputTransport输出传输时,应记录行附加到输出流 (默认值: true,'$'除外)
● --transform一个javascript,可以在将其写入目的地之前调用它来修改文档。全局变量“ doc ”可用。 示例脚本:用于计算双倍的新字段“ f2 ” 字段“ f1 ” 的值: doc.source[ " f2 " ] = doc.source.f1 * 2 ;
● --httpAuthFile 使用http身份验证时,在ini文件中提供凭据 user=<username> password=<password>
● --support-big-int 支持大整型数据
● --big-int-fields 指定一个逗号分割的字段列表,检查是否支持大整型数据
● --retryAttempts 整数,当连接失败时,出现以下错误之一:ECONNRESET, ENOTFOUND, ESOCKETTIMEDOUT,指示请求在失败之前应该自动重新尝试的次数。(默认值: 0)
● --retryDelay 整数,表示重试期间的回退/中断时间(默认值 : 5000)
● --maxRows 支持文件拆分,文件按照指定的行数分割。
● --fileSize 支持文件拆分,该值必须是bytes模块支持的字符串。必须使用以下缩写来表示单位大小。 b for bytes kb for kilobytes mb for megabytes gb for gigabytes tb for terabytes 分区通过有效地对文件进行分段,有助于缓解内存溢出的状况。分成更小的块,如果需要可以合并。
● --fsCompress 在发送输出到文件之前,gzip压缩数据,在导入时,该参数用于解压gzip文件。
● --tlsAuth 启用TLS X509客户端身份验证。
● --cert, --input-cert, --output-cert 客户端证书文件,如果源和目标相同,则使用--cert,否则根据需要使用--input-cert和--output-cert。
● --key, --input-key, --output-key 私钥文件。如果源和目标相同,则使用--key,否则根据需要使用--input-key和--output-key。
● --pass, --input-pass, --output-pass 私钥的传递短语。如果源和目标相同,则使用--pass,否则根据需要使用--input-pass和--output-pass。
● --ca, --input-ca, --output-ca CA证书。如果源和目标相同,则使用--ca,否则根据需要使用--input-ca和--output-ca。
● --inputSocksProxy, --outputSocksProxy Socks5主机地址
● --inputSocksPort, --outputSocksPort Socks5主机端口
● --force-os-version 强制使用elasticdump使用的OpenSearch版本。(默认值:7.10.2)
● --help 帮助页面
(四)最佳实践
库对库
nohup
/usr/bin/elasticdump
--input=http:
//10
.5.4.119:9200
/demo_2023-06
--output=http:
//elastic
:elastic@10.5.3.126:9200/demo_2023-06
--support-big-int --
type=data --timeout=3600000 --noRefresh > demo_2023-06.log
库到文件
nohup
/usr/bin/elasticdump
--input=http:
//10
.5.4.119:9200/demo_2023-07
--output=/home/demo_2023-07.json --support-big-int ----
type=data --timeout=3600000 --noRefresh > demo_2023-07.log
文件到库
nohup
/usr/bin/elasticdump
--input=/home/demo_2023-08.json --output=http:
//10
.5.4.125:9200/demo_2023-08
--support-big-int --
type=data --timeout=3600000 --noRefresh > demo_2023-08.log
因为es版本在V6.0之后,且该项目迁移数据较大,所以经常出现迁移中断现象,还必须重新创建任务进行迁移,所以采用另外一种迁移方式:elasticsearch自带接口 reindex 进行数据迁移。
三、reindex API简介及实践
(一)reindex简介
reindex ,是elasticdump本身自带的API,是将数据从一个 index 移动到另一个 index 的过程。是ES 5.X版本之后提供的数据迁移功能,不需要额外安装,支持同集群索引迁移和跨集群索引迁移。
(二)reindex实践
不同场景下该如何正确的使用 reindex
1、基本使用方式
POST _reindex
{
"source": {
"index": "product_index"
},
"dest": {
"index": "new_product_index"
}
}
适用场景:源索引的文档数量极少,reindex 的时候不用去考虑效率、不用做其它特别处理的情况。
2、使用分片进行reindex
POST _reindex?slices=auto&refresh
{
"source": {
"index": "product_index"
},
"dest": {
"index": "new_product_index"
}
}
适用场景:源索引的文档数量较多,为了提高 reindex 效率,采用设置slices参数进行并行加速处理,当值设置为auto时,ES 会合理的选择切片数量进行处理,建议使用auto。但是从远程集群重新创建索引不支持手动或自动切片。
3、指定部分字段进行reindex
POST _reindex?refresh
{
"source": {
"index": "product_index",
"_source":["productId","productName","updateTime","amuontTotal"]
},
"dest": {
"index": "new_product_index"
}
}
适用场景:如果只需要把源索引的部分字段进行 reindex 到目标索引,在请求体的 source 中设置 _source 参数指定这些字段即可。
4、指定部分文档进行reindex
POST _reindex?refresh
{
"max_docs": 50000,
"source": {
"index": "product_index",
"query":{
"bool":{
"must":[
{"wildcard":{"productName":"康师傅*"}}
]
}
}
},
"dest": {
"index": "new_product_index"
}
}
适用场景:使用 query DSL 语句查询到文档集,进行 reindex 的时候设置 max_docs 最大文档数量不超过5W个。当然,请求体不设置 max_docs 参数也是可以的,将查询到的所有文档集进行 reindex 。
5、指定速率进行 reindex
POST _reindex?requests_per_second=600
{
"source": {
"index": "product_index",
"size": 600
},
"dest": {
"index": "new_product_index"
}
}
适用场景:如果 reindex 操作过快,可能会给 ES 集群造成写入压力,严重的话会导致集群的崩溃。为此,通过请求参数可以设置 requests_per_second 参数限制处理的速率,而 size 用于批量读写操作的文档数,此参数是可选的,缓冲区最大为 200 MB,默认 100 M。可在集群性能允许的情况下,通过调大 size 参数值来提升迁移速度,默认 size 大小为 1000。
6、使用 script 进行 reindex
POST _reindex?refresh
{
"source": {
"index": "product_index"
},
"dest": {
"index": "new_product_index"
},
"script": {
"source": "ctx._source.lastupdatetime = ctx._source.remove(\"updataTime\")"
}
}
适用场景:ES script 是一个强大的存在,可以轻松帮我们实现很多对文档修改的需求,比如,把文档中的 updataTime 字段名称改为 lastupdatetime ;又比如,在文档中新增一个字段并赋默认值等。
例子1:字段名重命名
原有的数据字段名称不合理,重新按照新字段命名,ES字段名称原始是不允许修改的,但通过脚本可以操作
POST _reindex
{
"source":[
"index": [
"kibana_sample_data"
],
"_source":["FlightNum"."DestCountry","DestCityName","OriginAirportID"]
},
"dest":{
"index": "kibana_sample_data_001"
},
"script":{
ctx._source.FlightNum01=ctx._source.FlightNum;
ctx._source.remove("FlightNum");
"lang": "painless"
}
}
原索引
POST test/_doc/1?refresh
{
"text": "words words",
"flag": "foo"
}
重建索引,将原索引中的flag字段重命名为tag字段
POST _reindex
{
"source": {
"index": "test"
},
"dest": {
"index": "test2"
},
"script": {
"source": "ctx._source.tag = ctx._source.remove(\"flag\")"
}
}
结果
GET test2/_doc/1
{
"found": true,
"_id": "1",
"_index": "test2",
"_type": "_doc",
"_version": 1,
"_seq_no": 44,
"_primary_term": 1,
"_source": {
"text": "words words",
"tag": "foo"
}
}
例子2:基于脚本修改原始文档数据
POST _reindex
{
"source":[
"index": [
"kibana_sample_data"
],
"_source":["FlightNum"."DestCountry","DestCityName","OriginAirportID"]
},
"dest":{
"index": "kibana_sample_data_001"
},
"script":{
ctx._source.FlightNum=ctx._source.FlightNum+"_123";
"lang": "painless"
}
}
7、多个源索引进行 reindex 到一个目标索引
POST _reindex?refresh
{
"source": {
"index": ["product_index","product_index_1","product_index_2"]
},
"dest": {
"index": "new_product_index"
}
}
也支持*号来匹配多个索引
POST _reindex
{
"source": {
"index": "product_index*"
},
"dest": {
"index": "new_product_index"
}
}
适用场景:多个源索引向同一个目标索引进行 reindex,但需要注意多个源索引的文档id有可能是一样的,reindex 到目标索引时无法保证是哪个源索引的文档id,最终覆盖只保留一个文档id。
8、覆盖更新索引
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"version_type": "internal"
}
}
"version_type": "internal",internal表示内部的,省略version_type或version_type设置为 internal 将导致 Elasticsearch 盲目地将文档转储到目标中,覆盖任何具有相同类型和 ID 的文件。
9、创建丢失的文档并更新旧版本的文档
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"version_type": "external"
}
}
"version_type": "external",external表示外部的,将 version_type 设置为 external 将导致 Elasticsearch 保留源中的版本,创建任何丢失的文档,并更新目标索引中版本比源索引中版本旧的任何文档。
id不存在的文档会直接更新;id存在的文档会先判断版本号,只会更新版本号旧的文档。
10、仅创建丢失的文档
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"op_type": "create"
}
}
要创建的 op_type 设置将导致 _reindex 仅在目标索引中创建丢失的文档,所有存在的文档都会引起版本冲突。只要两个索引中存在id相同的记录,就会引起版本冲突。
· 冲突处理
默认情况下,版本冲突会中止 _reindex 进程。 “冲突”请求正文参数可用于指示 _reindex 继续处理有关版本冲突的下一个文档。 需要注意的是,其他错误类型的处理不受“冲突”参数的影响。当"conflicts": "proceed"在请求正文中设置时,_reindex 进程将继续处理版本冲突并返回遇到的版本冲突计数。
POST _reindex
{
"conflicts": "proceed",
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"op_type": "create"
}
}
11、reindex超时情况
es中的请求超时时间默认是1分钟,当重建索引的数据量太大时,经常会出现超时。这种情况可以增大超时时间,也可以添加wait_for_completion=false参数将请求转为异步任务,同时增加socket和连接超时时间:
POST _reindex?wait_for_completion=false
{
"source": {
"remote":
{"host":"http://10.5.3.119:9200",
"socket_timeout":"60m",
"connect_timeout":"3600s"},
"index":"demo_2023-06",
"size":10000
},
"dest": {
"index": "demo_2023-06_NEW"
}
}
12、从远程 ES 集群进行 reindex
注意:要保证源索引与目的索引的表结构信息一致,否则可能导致源索引与目的索引字段类型等信息不一致
1、查询出源索引的表结构信息,并根据此表结构提前在目的集群中创建出目的索引
2、若源索引有对应的索引模版,可提前将该索引模版在目的集群中创建出来
白名单配置
从一个远程的 Elasticsearch 的服务器上进行 reindex,需要在目的 es 集群中一个节点配置上源 es 集群的白名单信息,如下:
vim es_master/config/elasticsearch.yml
reindex.remote.whitelist: "10.5.3.119:9200,10.5.3.120:9200"
索引迁移,需要在请求体的 remote 参数填写连接信息:
POST _reindex?wait_for_completion=false&refresh
{
"source": {
"remote": {
"host": "http://10.5.3.119:9200",
"username": "user",
"password": "xxxx"
},
"size":10000,
"index": "product_index",
},
"dest": {
"index": "new_product_index"
}
}
当开启 reindex 时,我们可以使用 task API 查看进度,使用以下命令:
GET _tasks?detailed=true&actions=*reindex
可以中途取消 reindex,使用以下命令:
# 从查看进度信息里找到具体的task_id
GET _tasks?detailed=true&actions=*reindex
# 取消reindex任务
POST _tasks/task_id:xxx/_cancel
(三)最佳实践
将10.5.3.119 ES的索引demo_2023-06迁移到10.5.3.126
/usr/bin/curl
-u elastic:elastic -XPOST
'http://10.5.3.126:9200/_reindex?wait_for_completion=false&refresh'
-H
"Content-Type: application/json"
-d
'{"conflicts":"proceed","source":{"remote":{"host":"http://10.5.3.119:9200","username":"elastic","password":"elastic","socket_timeout":"60m","connect_timeout":"3600s"},"index":"demo_2023-06","size":10000},"dest":{"index":"demo_2023-06","op_type":"create"}}'
当开启 reindex 时,可以使用 task API 查看进度,使用以下命令:
curl -u elastic:elastic -XGET
'http://10.5.3.126:9200/_tasks?detailed=true&actions=*reindex'
或者根据任务id查看任务:
curl -u elastic:elastic -XGET
'http://10.5.3.126:9200/_tasks/W29Va7J_Tj--sUYS6fSWlg:280879028'
取消任务举例:
curl -u elastic:elastic -XGET 'http:
//10
.5.3.126:9200
/_tasks/W29Va7J_Tj--sUYS6fSWlg
:280879028
/_cancel
(四)Reindex 优化问题
reindex 是一个很耗时的操作,当 ES 索引的文档数量很大时,不得不去面对和思考效率的问题了,有以下几个方面可以参考:
创建目标索引
1、设置刷新间隔时间
settings.index.refresh_interval,增加refresh间隔,最好设置为-1,即不刷新,等 reindex 结束后可以重新设置该参数;
如果搜索结果不需要接近实时的准确性,考虑先不要急于索引刷新refresh。可以将每个索引的refresh_interval到30s。
如果正在进行大量数据导入,可以通过在导入期间将此值设置为-1来禁用刷新。完成后不要忘记重新启用
设置方法:
PUT /my_logs/_settings { "refresh_interval": -1 }
2、设置副本数量
settings.index.number_of_replicas:副本数量,设置为0,副本在索引创建之后也是可以动态调整的,reindex 没必要设置;
如果要进行大量批量导入,请考虑通过设置index.number_of_replicas来禁用副本:0。
主要原因在于:
复制文档时,将整个文档发送到副本节点,并逐字重复索引过程。 这意味着每个副本都将执行分析,索引和潜在合并过程。
相反,如果您使用零副本进行索引,然后在提取完成时启用副本,则恢复过程本质上是逐字节的网络传输。 这比复制索引过程更有效。
设置方法:
PUT /my_logs/_settings { "number_of_replicas": 0 }
3、异步刷新translog
translog默认的持久化策略为:request。这个非常影响 ES 写入速度。但是这样写操作比较可靠。如果系统可以接受一定概率的数据丢失(例如:数据写入主分片成功,尚未复制到副分片时,主机断电。由于数据既没有刷到Lucene,translog也没有刷盘,恢复时translog中没有这个数据,数据丢失),则调整translog持久化策略。在每一个索引,删除,更新或批量请求之后是否进行fsync和commit操作。此设置接受以下参数:
request:(默认)在每次请求后fsync并commit。如果发生硬件故障,所有已确认的写入将已经提交到磁盘。
async:每隔sync_interval段时间进行fsync并commit。如果发生故障,则自上次自动提交以来所有已确认的写入将被丢弃。
设置方法:
PUT /my_logs/_settings { "index.translog.durability": "async" }
调用 Reindex API
- 请求参数使用自动切片方式,即 slices=auto,但是从远程集群重新创建索引不支持手动或自动切片;
- 请求参数适当限制处理速度,避免ES集群出现崩溃,即 requests_per_second;
- 请求体使用参数 conflicts 并修改为 proceed,因为 conflicts 默认的是 abort,即默认遇到版本冲突的时候会退出 reindex;
- 请求体使用参数 size 并根据当前文档的情况评估一个合理的批量处理的数值;
总之,对于大数据量的索引,reindex 是一个耗时而漫长的操作,一定要注意这些优化点。