首页 > 其他分享 >logstash同步es数据(目的端先创建好mapping)

logstash同步es数据(目的端先创建好mapping)

时间:2023-10-26 11:14:48浏览次数:32  
标签:elastic keyword 端先 text mapping long message type logstash

环境:
OS:Centos 7
ES(源端和目标端):6.8.5
logstash:6.8.5

说明:
1.logstash版本尽量与es版本保持一致
2.我们这里先在目的端创建好mapping(表结构),因为不提前创建好mapping的话,logstash同步的时候根据数据情况自动推断数据字段类型定义字段类型,
创建的mapping与原来的可能不一致,比如将源端的short类型同步后变成了long类型,个别情况会把text类型变成long类型,导致最后的同步会失败.

 

1.查看源端的index

[elasticsearch@localhost bin]$ curl -u elastic:elastic -X GET "192.168.1.108:19200/_cat/indices?v"
health status index           uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .security-6     TBWap6LdRY6ALA4vDIagew   1   0          6            0     19.5kb         19.5kb
yellow open   app_message_all C04l_PP1RQyFPNBSf1cjjg   5   1    1000000            0    766.2mb        766.2mb

 

2.获取源端index的mapping

[elasticsearch@localhost bin]$ curl -u elastic:elastic -H "Content-Type: application/json" -XGET "http://192.168.1.108:19200/app_message_all/_mappings?pretty=true"
{
  "app_message_all" : {
    "mappings" : {
      "_doc" : {
        "properties" : {
          "create_time" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "deleted" : {
            "type" : "long"
          },
          "extra" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "massive_type" : {
            "type" : "long"
          },
          "message" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "message_id" : {
            "type" : "long"
          },
          "message_type" : {
            "type" : "long"
          },
          "send_date" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "sender_seq_no" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "status" : {
            "type" : "long"
          },
          "title" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "user_id" : {
            "type" : "long"
          }
        }
      }
    }
  }
}

 


这里index名称叫:app_message_all
type叫:_doc
说明:7.0之前是可以自定义type名称的,7.0之后的版本不能自定义type名称了,全部统一是_doc

 

3.目标端创建index和mapping
首先创建一个空的index(没有任何字段信息)

[elasticsearch@localhost bin]$curl -u elastic:elastic123 -X PUT "192.168.1.109:19200/app_message_all?pretty" -H 'Content-Type: application/json' -d'
{}
'

 

查询这个时候是没有任何的mapping信息的
[elasticsearch@localhost bin]$curl -u elastic:elastic123 -H "Content-Type: application/json" -XGET "http://192.168.1.109:19200/app_message_all/_mappings?pretty=true"

 

添加mapping信息,这里只需要将源端的properties对应的各项拷贝过来即可,注意{}个数要一致,结尾的4个}去掉
这里需要指定type=_doc需要与源端一致,6版本的type是可以自定义的

curl -u elastic:elastic123 -H 'Content-Type: application/json' -XPOST "http://192.168.1.109:19200/app_message_all/_doc/_mapping?pretty" -d ' 
{
        "properties" : {
          "create_time" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "deleted" : {
            "type" : "long"
          },
          "extra" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "massive_type" : {
            "type" : "long"
          },
          "message" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "message_id" : {
            "type" : "long"
          },
          "message_type" : {
            "type" : "long"
          },
          "send_date" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "sender_seq_no" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "status" : {
            "type" : "long"
          },
          "title" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "user_id" : {
            "type" : "long"
          }
        }
}'

 

这个时候可以比较下源端和目的端的mapping是否一致

源端
curl -u elastic:elastic -H "Content-Type: application/json" -XGET "http://192.168.1.108:19200/app_message_all/_mappings?pretty=true"

目的端
curl -u elastic:elastic123 -H "Content-Type: application/json" -XGET "http://192.168.1.109:19200/app_message_all/_mappings?pretty=true"

 

4.logstach配置文件

[root@localhost config]# more sync_all_index.conf 
input {
    elasticsearch {
        hosts => ["http://192.168.1.108:19200"]
        index => "*,-.monitoring*,-.security*,-.kibana*" ##我这里去掉了系统的index,目前我这里只有app_message_all,所以使用了*,也可以具体指定index名称的
        user => "elastic"
        password => "elastic"
        size => 1000
        scroll => "1m"
        docinfo => true
    }
}
# 该部分被注释,表示filter是可选的
filter {
  mutate {
    remove_field => ["@timestamp", "@version"]  #过滤掉logstash 自己加上的字段
  }
}

output {
    elasticsearch {
        hosts => ["http://192.168.1.109:19200"]
        user => "elastic"
        password => "elastic123"
        index => "%{[@metadata][_index]}"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
    }
}

 

5.同步
[root@localhost config]# /opt/logstash-6.8.5/bin/logstash -f /opt/logstash-6.8.5/config/sync_all_index.conf

 

验证:
1.发现使用logstash同步过来的数据字段顺序与源端是不一致的
_id是eVpxaYsBeUT5Hwt-LE-o 查出来的数据字段顺序不一致
[elasticsearch@localhost bin]$ curl -u elastic:elastic -XGET 'http://192.168.1.108:19200/app_message_all/_doc/eVpxaYsBeUT5Hwt-LE-o?pretty'
[elasticsearch@localhost bin]$ curl -u elastic:elastic123 -XGET 'http://192.168.1.109:19200/app_message_all/_doc/eVpxaYsBeUT5Hwt-LE-o?pretty'

 

2.目的端创建maping的时候可以少一些字段,但是同步会自动添加少的那些字段,若想不同步该字段可以通过filter实现

filter {
  mutate {
    remove_field => ["@timestamp", "@version","user_id"]
  }
}

 

标签:elastic,keyword,端先,text,mapping,long,message,type,logstash
From: https://www.cnblogs.com/hxlasky/p/17788936.html

相关文章

  • 使用logstash迁移遇到的错误(es同步到es)
    环境:OS:Centos7ES:6.8.5 问题1:[2023-10-25T09:29:53,892][INFO][logstash.outputs.elasticsearch]retryingfailedactionwithresponsecode:429({"type"=>"es_rejected_execution_exception","reason"=>"rejectedexecu......
  • java动态创建es 及mapping
    publicCreateIndexResponsecreateIndexWithMapping(Stringindex,Map<String,String>mapping){CreateIndexResponseresponse=null;try{CreateIndexRequestrequest=newCreateIndexRequest(index);request.settings(Settings.b......
  • [924] ArcGIS Pro Mapping Module - arcpy.mp
    ref:Introductiontoarcpy.mpref:Gettingstartedwitharcpy.mptutorialref:Guidelinesforarcpy.mpref:Alphabeticallistofarcpy.mpfunctionsref:Alphabeticallistofarcpy.mpclassesref:SQLreferenceforqueryexpressionsusedinArcGISThefol......
  • elasticsearch通过Java class类的@Setting和@Mapping来定义索引index
    今天就来和大家讲讲如何将es索引中的mapping和setting在索引index和class联系起来,其实在这个问题也困扰我好久了,一直没有解决,在elasticsearch7.x版本的时候貌似好像可以用request在程序中来建立索引,像Stringindex=“{“mapping”:...}”之类的操作,干起来比较复杂,在elasticsearch......
  • elasticsearch通过Java class类的@Setting和@Mapping来定义索引index
    今天就来和大家讲讲如何将es索引中的mapping和setting在索引index和class联系起来,其实在这个问题也困扰我好久了,一直没有解决,在elasticsearch7.x版本的时候貌似好像可以用request在程序中来建立索引,像Stringindex=“{“mapping”:...}”之类的操作,干起来比较复杂,在elasticsear......
  • logstash数据无法写入到es
    现象:偶发性某天应用服务日志数据丢失,查看es无数据架构:filebeat--->logstash--->eslogstash上报错现象:2023-10-13T13:05:14,161][WARN][logstash.outputs.elasticsearch][main]CouldnotindexeventtoElasticsearch.{:status=>400,:action=>["index",{:_id=>nil,......
  • @GetMapping、@PostMapping、@PutMapping、@DeleteMapping 的区别?
    对于@GetMapping、@PostMapping、@PutMapping、@DeleteMapping,首先我们得谈到RESTFUL风格接口,常用的URL请求方式就包括了GET、POST、PUT、DELETE等:谈到@GetMapping、@PostMapping、@PutMapping、@DeleteMapping等注解,首先得讲到@RequestMaping:@RequestMaping主要是将HTTP请求映......
  • Logstash 获取通道类型 Redis 数据
    Redis服务器是logstash官方推荐的broker选择。Broker角色也就意味着会同时存在输入和输出俩个插件。这里我们先学习输入插件。LogStash::Inputs::Redis 支持三种 data_type(实际上是redis_type),不同的数据类型会导致实际采用不同的Redis命令操作:list=>BLPOPchannel......
  • 【从0学习Solidity】7. 映射类型 mapping
    【从0学习Solidity】7.映射类型mapping博主简介:不写代码没饭吃,一名全栈领域的创作者,专注于研究互联网产品的解决方案和技术。熟悉云原生、微服务架构,分享一些项目实战经验以及前沿技术的见解。关注我们的主页,探索全栈开发,期待与您一起在移动开发的世界中,不断进步和创造!本文收录于......
  • 3、SpringMVC之RequestMapping注解
    3.1、环境搭建创建名为spring_mvc_demo的新module,过程参考2.1节3.1.1、创建SpringMVC的配置文件<?xmlversion="1.0"encoding="UTF-8"?><beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/......