logstash配置文件如下:
# 将mysql数据库里面的增量数据发送个kafka,然后消费kafka的数据进行采集,在server05服务器运行 # 先执行一下source /etc/profile,让java的路径生效 # 具体的指令为:nohup /export/servers/logstash-6.7.0/bin/logstash -f /home/code/Spider_Projects/fanruan/logstash.conf > /dev/null 2>&1 & # 直接运行的指令:/export/servers/logstash-6.7.0/bin/logstash -f /home/code/Spider_Projects/fanruan/logstash.conf input{ jdbc{ jdbc_driver_library => "/export/servers/mysql-connector-java-5.1.38.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/database" jdbc_user => "user" jdbc_password => "123456" use_column_value => true tracking_column => "id" schedule => "*/5 * * * *"
# 重点在这里,mysql语句中查询的时候必须包含作为偏移量的字段(这里指id字段),如果不包含的话就不更新偏移量文件,然后就会重复的从之前保留的那个偏移量读取数据,踩坑日记~~~~~~~ statement => "select id, dora, username, method, link from crawl_seed where id > :sql_last_value" # record_last_run上次数据存放位置; record_last_run => true #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值 last_run_metadata_path => "/home/code/Spider_Projects/fanruan/logstash_offset.txt" # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false; clean_run => false } } output{ stdout{codec => rubydebug} redis{ host => "x.x.x.x" port => "6379" db => "0" key => "key" data_type => "list" } }
标签:jdbc,last,偏移量,mysql,run,数据,logstash From: https://www.cnblogs.com/qiaoer1993/p/16640345.html