[root@host135 config]# more sync_multi_table_mysql2es.conf #logstash输入配置 input { #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期 jdbc { jdbc_driver_library => "/soft/mysql-connector-java-5.1.49.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.1.14:13306/db_hxl" jdbc_user => root jdbc_password => mysql # 是否开启分页 jdbc_paging_enabled => true # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件 use_column_value => true #用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名 tracking_column => "unix_ts_in_secs" # tracking_column 对应字段的类型 tracking_column_type => "numeric" # 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次 schedule => "*/5 * * * * *" #同步数据的查询sql语句 statement => "select id,name,f_int,f_dou,f_flo,date_format(create_time,'%Y-%m-%d %H:%i:%S') as create_time,date_fo rmat(update_time,'%Y-%m-%d %H:%i:%S') as update_time, unix_timestamp(update_time) as unix_ts_in_secs from tb_es where (unix_timestamp(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC" type => "tb_es" last_run_metadata_path =>"/opt/logstash-6.8.5/logs/tb_es.txt" } jdbc { jdbc_driver_library => "/soft/mysql-connector-java-5.1.49.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.1.14:13306/db_hxl" jdbc_user => root jdbc_password => mysql # 是否开启分页 jdbc_paging_enabled => true # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件 use_column_value => true #用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名 tracking_column => "id" # tracking_column 对应字段的类型 tracking_column_type => "numeric" # 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次 schedule => "*/5 * * * * *" #同步数据的查询sql语句 statement => "select * from tb_es_notime where id>:sql_last_value" type =>"tb_es_notime" last_run_metadata_path => "/opt/logstash-6.8.5/logs/tb_es_notime.txt" } } #logstash输入数据的字段匹配和数据过滤 filter { mutate { copy => { "id" => "[@metadata][_id]"} remove_field => ["@version","@timestamp", "unix_ts_in_secs"] } } #logstash输出配置 output { #采用stdout可以将同步数据输出到控制台,主要是调试阶段使用 stdout { codec => "rubydebug"} #指定输出到ES的具体索引 if[type] == "tb_es" { elasticsearch { hosts => ["http://192.168.1.134:19200"] user => "elastic" password => "elastic" index => "index_tb_es" document_id => "%{[@metadata][_id]}" } } if[type] == "tb_es_notime" { elasticsearch { hosts => ["http://192.168.1.134:19200"] user => "elastic" password => "elastic" index => "index_tb_es_notime" document_id => "%{[@metadata][_id]}" } } }
标签:同步,column,id,jdbc,mysql,tb,logstash,es From: https://www.cnblogs.com/hxlasky/p/17804922.html