方式一,使用kibana控制台添加(该方式数据量有上限,批量导入推荐CURL)
该方式需要安装kibana,启动后打开控制台
http://kibana部署IP:5601/app/dev_tools#/console
POST_bulk {"index":{"_index":"test_goods","_type":"goods","_id":10001}} {"code":"1","price":10,"name":"商品1"} {"index":{"_index":"test_goods","_type":"goods","_id":10002}} {"code":"2","price":20,"name":"商品2"} {"index":{"_index":"test_goods","_type":"goods","_id":10003}}
方式二,使用CURL批量导入,十万加耗时3s左右(curl官网下载地址)
//用到的工具为crul.exe ,数据集为 goods.json curl -H "Content-Type: application/json" -XPOST "ES服务IP:9200/test_goods/goods/_bulk?refresh" --data-binary "@goods.json"
方式三,使用 logstash 进行自定义导入
3.1 MySQL导出再导入ES
input { jdbc { jdbc_driver_library => "./mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://xxxxxx.mysql.singapore.rds.aliyuncs.com:3306/fle_staging" jdbc_user => "xxxx" jdbc_password => "xxxx" schedule => "* * * * *" statement => "SELECT * FROM parcxxxnfo WHERE created_at >= :sql_last_value order by created_at limit 200000" use_column_value => true #tracking_column_type => "numeric" tracking_column_type => "timestamp" tracking_column => "created_at" last_run_metadata_path => "syncpoint_table_parcel_info" #处理中文乱码问题 codec => plain { charset => "UTF-8"} #使用其它字段追踪,而不是用时间 #use_column_value => true #追踪的字段 #tracking_column => src_phone record_last_run => true #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值 #last_run_metadata_path => "mysql/station_parameter.txt" jdbc_default_timezone => "Asia/Shanghai" } } output { elasticsearch { hosts => ["172xxxx2.83"] user => "" password => "" index => "parcxxxnfo" document_id => "%{pno}" } file{ path => "/tmp/%{+YYYY.MM.dd}-file.txt" } }
3.2 从文件导入数据到ES,通过命令
logstash.bat -f F:\logstash-7.13.2-windows-x86_64\logstash-7.13.2\config\logstash.conf 来加载配置文件:
配置文件为:
# Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. # input { # beats { # port => 5044 # } # } # # output { # elasticsearch { # hosts => ["http://localhost:9200"] # index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" # #user => "elastic" # #password => "changeme" # } # } input { file { path => "F:/logstash-data-movie-latest/ml-latest/movies.csv" start_position => "beginning" sincedb_path => "F:/logstash-data-movie-latest/ml-latest/movies.stash.log" } beats { port => 5044 } } filter { csv { separator => "," columns => ["id","content","genre"] } mutate { split => {"genre" => "|"} remove_field => ["path","host","@timestamp","message"] } mutate { split => ["content","("] add_field => {"title" => "%{[content][0]}"} add_field => {"year" => "%{[content][1]}"} } mutate { convert => { "year" => "integer" } strip => ["title"] remove_field => ["path","host","@timestamp","message","content"] } } output { elasticsearch { hosts => "http://11.1.217.245:9200" index => "movies" document_id => "%{id}" } stdout{} }
CSV的数据格式如下:
movieId,title,genres 1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy 2,Jumanji (1995),Adventure|Children|Fantasy 3,Grumpier Old Men (1995),Comedy|Romance 4,Waiting to Exhale (1995),Comedy|Drama|Romance 5,Father of the Bride Part II (1995),Comedy
原文链接:https://blog.csdn.net/yunzhonghefei/article/details/11835415
标签:index,jdbc,几个,id,导入,goods,path,logstash,es From: https://www.cnblogs.com/cgy-home/p/17659039.html