环境:
OS:Centos 7
es:6.8.5
logstash:6.8.5
mysql:5.7
1.mysql创建表
create table tb_es ( id bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id', name varchar(32) not null, f_int int, f_dou double(10,2), f_flo float(9,2), create_time timestamp not null default current_timestamp, update_time timestamp not null default current_timestamp on update current_timestamp, primary key (id) );
2.写入测试数据
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name1',100,123.12,16.26,now(),now());
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name2',200,323.12,26.46,now(),now());
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name3',300,423.12,36.36,now(),now());
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name4',400,623.12,46.56,now(),now());
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name5',500,723.12,56.66,now(),now());
3.logstash配置文件
vi sync_mysql2es.conf
input { #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期 jdbc { jdbc_driver_library => "/soft/mysql-connector-java-5.1.49.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.1.14:13306/db_hxl" jdbc_user => root jdbc_password => mysql # 是否开启分页 jdbc_paging_enabled => true # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件 use_column_value => true #用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名 tracking_column => "unix_ts_in_secs" # tracking_column 对应字段的类型 tracking_column_type => "numeric" # 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次 schedule => "*/5 * * * * *" #同步数据的查询sql语句 statement => "select *, unix_timestamp(update_time) as unix_ts_in_secs from tb_es where (unix_timestamp(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC" } } #logstash输入数据的字段匹配和数据过滤 filter { mutate { copy => { "id" => "[@metadata][_id]"} remove_field => ["@version", "@timestamp","unix_ts_in_secs"] } } #logstash输出配置 output { #采用stdout可以将同步数据输出到控制台,主要是调试阶段使用 stdout { codec => "rubydebug"} #指定输出到ES的具体索引 elasticsearch { hosts => ["http://192.168.1.134:19200"] user => "elastic" password => "elastic" index => "index_tb_es" document_id => "%{[@metadata][_id]}" } }
4.执行同步
/opt/logstash-6.8.5/bin/logstash -f /opt/logstash-6.8.5/config/sync_mysql2es.conf
5.查看同步的数据
查看index
[root@host135 soft]# curl -u elastic:elastic -H "Content-Type: application/json" -X GET 'http://192.168.1.134:19200/_cat/indices?v' health status index uuid pri rep docs.count docs.deleted store.size pri.store.size yellow open index_tb_es fLGPfjfWSfCH0WLsn0176w 5 1 5 0 20.3kb 20.3kb yellow open vaccination_report mD_OEHpHQNeT7-S7wMhP9A 5 1 147693 0 52.7mb 52.7mb green open .security-6 KUv3Qpw5Qg-bDQ6kE0csHQ 1 0 6 0 19.5kb 19.5kb
查看index数据
curl -u elastic:elastic -H "Content-Type: application/json" -XPOST '192.168.1.134:19200/index_tb_es/_search?pretty' -d ' { "query": { "match_all": {} }, "size":10 }'
查看mapping
[root@host135 logstash-6.8.5]# curl -u elastic:elastic -H "Content-Type: application/json" -XGET "http://192.168.1.134:19200/index_tb_es/_mappings?pretty=true" { "index_tb_es" : { "mappings" : { "doc" : { "properties" : { "create_time" : { "type" : "date" }, "f_dou" : { "type" : "float" }, "f_flo" : { "type" : "float" }, "f_int" : { "type" : "long" }, "id" : { "type" : "long" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "update_time" : { "type" : "date" } } } } } }
6.重新做全量同步
每次做完同步后,同步的时间点会记录到文件.logstash_jdbc_last_run,该文件是隐藏文件,文件位置使用find查找
[root@host135 config]# find / -name *logstash_jdbc_last_run*
/root/.logstash_jdbc_last_run
删除该文件重新同步
##################提前创建好mapping的同步#########################
1.提前创建好空的index和mapping
curl -u elastic:elastic -X PUT "192.168.1.134:19200/index_tb_es01?pretty" -H 'Content-Type: application/json' -d' {} '
2.创建mapping
6.8.5版本的type默认值为doc,所有我们这里使用doc
curl -u elastic:elastic -H 'Content-Type: application/json' -XPOST "http://192.168.1.134:19200/index_tb_es01/doc/_mapping?pretty" -d ' { "properties" : { "create_time" : { "type" : "date", "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, "f_dou" : { "type" : "float" }, "f_flo" : { "type" : "float" }, "f_int" : { "type" : "long" }, "id" : { "type" : "long" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "update_time" : { "type" : "date", "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } } }'
执行同步发现报错误:
"reason"=>"failed to parse field [create_time] of type [date] in document with id '8'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Invalid format: \"2023-11-01T01:19:38.000Z\" is malformed at \"T01:19:38.000Z\""}}}}}
解决办法:
使用date_format格式化日期字段,如下:
date_format(create_time,'%Y-%m-%d %H:%i:%S') AS createTime
修改同步的配置文件:
[root@host135 config]# more sync_mysql2es.conf #logstash输入配置 input { #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期 jdbc { jdbc_driver_library => "/soft/mysql-connector-java-5.1.49.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.1.14:13306/db_hxl" jdbc_user => root jdbc_password => mysql # 是否开启分页 jdbc_paging_enabled => true # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件 use_column_value => true #用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名 tracking_column => "unix_ts_in_secs" # tracking_column 对应字段的类型 tracking_column_type => "numeric" # 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次 schedule => "*/5 * * * * *" #同步数据的查询sql语句 statement => "select id,name,f_int,f_dou,f_flo,date_format(create_time,'%Y-%m-%d %H:%i:%S') as create_time,date_fo rmat(update_time,'%Y-%m-%d %H:%i:%S') as update_time, unix_timestamp(update_time) as unix_ts_in_secs from tb_es where (unix_timestamp(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC" } } #logstash输入数据的字段匹配和数据过滤 filter { mutate { copy => { "id" => "[@metadata][_id]"} remove_field => ["@version","@timestamp", "unix_ts_in_secs"] } } #logstash输出配置 output { #采用stdout可以将同步数据输出到控制台,主要是调试阶段使用 stdout { codec => "rubydebug"} #指定输出到ES的具体索引 elasticsearch { hosts => ["http://192.168.1.134:19200"] user => "elastic" password => "elastic" index => "index_tb_es01" document_id => "%{[@metadata][_id]}" } }
标签:jdbc,tb,mysql,update,id,time,type,logstash,ES From: https://www.cnblogs.com/hxlasky/p/17803187.html