1.下载logstash 要和 es的版本一致 用的是 7.17.6
下载地址:https://www.elastic.co/cn/downloads/past-releases#logstash
2.配置 LS_JAVA_HOME 系统环境变量
logstash 7版本之后 用的是这个环境变量 6的 直接用原来的JAVA_HOME就可以 ,7版本的 java jdk用8 或者11都可以,我用的是11
3. 在xx\logstash-7.17.6-windows-x86_64\logstash-7.17.6\ 和bin的的同级目录下创建一个mysql文件夹
a.在mysql文件夹里创建一个 jdbc.conf 文件, student.sql,student_last_value.txt,以及添加mysql连接的jar包 mysql-connector-java-5.1.47.jar
jdbc.conf 里边配置 jdbc 以及输出es的相关信息
student_last_value.txt 用来处理增量数据同步时记录最新数据值的
student.sql 写入查询的sql语句
b.配置jdbc.conf 文件 一定要注意 文件保存的时候是 utf8, 别选utf8-bom这种的,可以用notepad++ 查看编码
input{ jdbc{ # mysql相关jdbc配置 注意 serverTimezone 时区的设置 jdbc_connection_string => "jdbc:mysql://ip:port/test?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false" # 数据库连接账号 jdbc_user => "账号" # 数据库连接密码 jdbc_password => "密码" # jdbc连接mysql驱动的文件 jdbc_driver_library => "D:\xxxx\mysql\mysql-connector-java-5.1.47.jar" # mysql驱动程序类的名称 jdbc_driver_class => "com.mysql.jdbc.Driver" # 是否启用分页 jdbc_paging_enabled => "true" # 分页大小 jdbc_page_size => "10000" # mysql文件, 也可以直接写SQL语句在此处,如下: statement_filepath => "D:\xxxxx\logstash-7.17.6\mysql\student.sql" # 同步sql语句 #statement => "SELECT * FROM `student`" # 这里类似crontab,可以定制定时操作,比如每10分钟执行一次同步(分 时 天 月 年) schedule => "* * * * * *" # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false) lowercase_column_names => false # 处理中文乱码问题 codec => plain { charset => "UTF-8" } # 使用其它字段追踪,而不是用时间 use_column_value => true # 追踪的字段 这个字段只有在上面的lowercase_column_names配置为false的时候才会区分大小写 因为不配置为false的话默认是true 查询结果字段默认会变成全小写; 我这里使用的是更新时间 tracking_column => "up_time" # 这个就是追踪字段的类型,只有数值和时间两个类型(numeric和timestamp,默认numeric) 这个值会记录到last_run_metadata_path 配置的文件中 如果配置是numeric 那么默认值为0 如果配置为timestamp 那么默认值为1970年 tracking_column_type => "timestamp" # 记录上一次运行记录 record_last_run => true # 上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值 这个就是增量数据同步的关键 last_run_metadata_path => "D:\xxxxx\logstash-7.17.6\mysql\student_last_value.txt" # 是否清除 last_run_metadata_path 的记录,如果为true那么每次都相当于从头开始查询所有的数据库记录 clean_run => false # 多表同步时,表类型区分,建议命名为“库名_表名”,每个jdbc模块需对应一个type type => "student" } } filter{ } output{ # 根据input中type进行不同操作,多表同步会用到,可能每一个表输出不同索引 if[type] == "student"{ elasticsearch{ # es地址 hosts => ["ip:port"] user => "账号" password => "密码" # 索引名称 index => "test" # es的id生成策略。不写es自己分配,我这里使用和mysql一致的id document_id => "%{id}" } } stdout{ codec => json_lines } }
c. 在student.sql 文件中写入自己 的sql语句 (:sql_last_value 这个是student_last_value.txt中的值 )
SELECT id,name,age,tel,up_time FROM `student` where up_time>:sql_last_value order by up_time asc
d.student_last_value.txt 文件里边写入时间 (他会自动改 每次查询的最后一个时间点会更新进去)
如果设置的自动增长列 不是时间类型的 是numeric 类型 那么 就吧该文件里边设置为数字0
1970-00-00 00:00:00
4.在es中创建对应的索引库 ,es创建索引库的方法 百度搜索
在kibana的dev tools 执行
PUT /test/ { "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "properties":{ "id":{ "type":"long" }, "name":{ "type":"keyword" }, "age":{ "type":"keyword" }, "tel":{ "type":"keyword" }, "up_time" : { "type" : "date" } } } }
5.定时任务配置 jdbc schedule的配置规则,类似linux的crontab的写法,具体语法规则如下:
a. * * * * *
分 时 天 月 星期
6.执行logstash
a.在bin目录下 cmd
b. 输入 logstash -f ../mysql/jdbc.conf
标签:jdbc,last,elasticsearch,student,mysql,type,logstash From: https://www.cnblogs.com/kaikaichao/p/16892431.html