首页 > 其他分享 >ES中摄取管道详解

ES中摄取管道详解

时间:2022-12-07 17:42:27浏览次数:37  
标签:pipeline description ingest field 管道 详解 摄取 my ES


一、什么是摄取管道

摄取管道​Ingest pipelines​

摄取管道主要用来在数据被索引之前对数据执行常见的转换。
例如,您可以使用管道来移除字段、从文本中提取值以及丰富数据。

管道由一系列称为处理器的可配置任务组成。每个处理器按顺序运行,对传入的文档进行特定的更改。在处理器运行之后,Elasticsearch 将转换后的文档添加到数据流或索引中。

管道的工作流程图如下:

ES中摄取管道详解_管道

二、摄取管道使用

1.创建管道

方式一:在kibana中创建

Stack Management > Ingest Pipelines

ES中摄取管道详解_字段_02


方式二:采用API创建

下面的 create pipeline API 请求创建一个包含两个 set 处理器和一个小写处理器的管道。处理器按指定的顺序顺序运行。

PUT _ingest/pipeline/my-pipeline
{
"description": "My optional pipeline description",
"processors": [
{
"set": {
"description": "My optional processor description",
"field": "my-long-field",
"value": 10
}
},
{
"set": {
"description": "Set 'my-boolean-field' to true",
"field": "my-boolean-field",
"value": true
}
},
{
"lowercase": {
"field": "my-keyword-field"
}
}
]
}

2.测试管道

方式一:在kibana中测试

选择创建的管道,打开编辑页面,测试管道——》添加文档

ES中摄取管道详解_elasticsearch_03


ES中摄取管道详解_elasticsearch_04


ES中摄取管道详解_ide_05

方式二:采用_simulate的API测试
1、在请求URL中指定管道

POST _ingest/pipeline/my-pipeline/_simulate
{
"docs": [
{
"_source": {
"my-keyword-field": "FOO"
}
},
{
"_source": {
"my-keyword-field": "BAR"
}
}
]
}

2、在请求body中指定管道

POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"lowercase": {
"field": "my-keyword-field"
}
}
]
},
"docs": [
{
"_source": {
"my-keyword-field": "FOO"
}
},
{
"_source": {
"my-keyword-field": "BAR"
}
}
]
}

3.在索引请求中使用管道

说明:在向索引my-data-stream添加数据时,使用管道y-pipeline

POST my-data-stream/_doc?pipeline=my-pipeline
{
"@timestamp": "2099-03-07T11:04:05.000Z",
"my-keyword-field": "foo"
}

PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }

在使用​​_update_by_query​​​和​​_reindex​​时使用管道:

POST my-data-stream/_update_by_query?pipeline=my-pipeline

POST _reindex
{
"source": {
"index": "my-data-stream"
},
"dest": {
"index": "my-new-data-stream",
"op_type": "create",
"pipeline": "my-pipeline"
}
}

4.给索引设置默认管道

通过​​index.default_pipeline​​属性,可以给索引设置默认的管道。

5.索引模板中设置默认管道

PUT _component_template/logs-my_app-settings
{
"template": {
"settings": {
"index.default_pipeline": "logs-my_app-default",
"index.lifecycle.name": "logs"
}
}
}

6.管道异常处理

PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"description": "Index document to 'failed-<index>'",
"field": "_index",
"value": "failed-{{{ _index }}}"
}
}
]
}

三、管道功能演示

1、字段重命名

PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"ignore_failure": true
}
}
]
}

2、删除特定记录

这里采用​​if​​配置管道处理函数的触发条件。

PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents with 'network.name' of 'Guest'",
"if": "ctx?.network?.name == 'Guest'"
}
}
]
}

更复杂的条件可以采用scripts脚本:

PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents that don't contain 'prod' tag",
"if": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
]
}

注意⚠️:
尽量避免使用复杂或昂贵的条件脚本,昂贵的条件脚本会降低索引速度。

3、给字段赋值

PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"field": "_source.my-long-field",
"value": 10
}
}
]
}

采用元数据赋值

PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Index the ingest timestamp as 'event.ingested'",
"field": "event.ingested",
"value": "{{{_ingest.timestamp}}}"
}
}
]
}

总结

本文主要介绍了ES中摄取管道pipeline的使用。
摄取管道主要用来在数据被索引之前对数据执行常见的转换。
可以使用管道来​​​移除字段、从文本中提取值以及丰富数据​​。


标签:pipeline,description,ingest,field,管道,详解,摄取,my,ES
From: https://blog.51cto.com/u_15905482/5919819

相关文章

  • ES聚合查询详解(一)
    前言本文主要介绍ES中的聚合查询。一、聚合查询简介聚合查询可以将数据汇总为度量、统计或其他分析。聚合查询主要分为三个类别:Metric指标聚合Bucket桶聚合Pipeline管道......
  • ES聚合查询详解(三):指标聚合
    前言ES聚合查询主要分为3类:指标聚合、桶聚合和管道聚合。本文主要是介绍其中指标聚合的相关使用。一、简介指标聚合​​MetricsAggregations​​如果说​​桶聚合主要是......
  • ES中复杂DSL查询语句不会写怎么办?
    前言刚接触ES不久的同学一定都遇到过这样的问题,复杂的查询场景下不知道怎么写DSL查询语句,今天教大家一个小妙招,解决复杂DSL语句不会写的问题。一、来吧,展示巧用大招:ES7以......
  • Mysql连接查询详解
    前言记得刚工作几年的时候,就写了一篇关于Mysql连接查询的博客文章,里面简单的认为先对关联表的记录进行笛卡尔积,然后再根据where条件过滤,现在看来有点贻笑大方了。正好最近看......
  • logback性能优化详解
    前言不正确的日志打印不但会降低程序运行性能,还会占用大量IO资源和硬盘存储空间。本文主要总结一些能提高日志打印性能的手段。一、通过AsyncAppender异步输出日志我们通常......
  • logback异步输出日志详解
    前言logback应该是目前最流行的日志打印框架了,毕竟SpringBoot中默认的集成的日志框架也是logback。在实际项目开发过程中,常常会遇到由于打印大量日志而导致程序并发降低,QPS......
  • 通过Logstash实现mysql数据定时增量同步到ES
    文章目录​​前言​​​​一、系统配置​​​​二、同步步骤整体概览​​​​三.logstash数据同步实战​​​​1、新建mysql表​​​​2、ES中新建索引​​​​3、Logstash......
  • mybatis一级缓存和二级缓存使用详解
    文章目录​​一、概念说明​​​​1、一级缓存​​​​2、二级缓存​​​​3、比较​​​​二、mybatis缓存的生命周期​​​​三、一级缓存的使用​​​​四、二级缓存的使......
  • mybatis-plus雪花算法生成Id使用详解
    文章目录​​前言​​​​一、mybatis-plus官网​​​​二、雪花算法实战​​​​1.建表​​​​2.新建测试工程​​​​3.单元测试​​​​三、实现分析​​​​四、为什么......
  • Mybatis-Plus字段策略FieldStrategy详解
    文章目录​​前言​​​​一、官方文档​​​​二、字段策略介绍​​​​1、FieldStrategy作用​​​​2、FieldStrategy类型​​​​3、FieldStrategy配置​​​​全局策略......