首页 > 其他分享 >logstash同步多个表的配置(使用pipeline)

logstash同步多个表的配置(使用pipeline)

时间:2023-11-02 14:23:39浏览次数:36  
标签:pipeline jdbc column id 同步 mysql logstash metadata

 

说明:
我们这里每个表对应一个配置文件,当然也可以使用多个表使用一个配置文件(多个jdbc进行配置)

 

1.准备配置文件
表1:

[root@host135 config]# more sync_mysql2es.conf
#logstash输入配置
input {
  #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
  jdbc {
    jdbc_driver_library => "/soft/mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.1.14:13306/db_hxl"
    jdbc_user => root
    jdbc_password => mysql
    # 是否开启分页
    jdbc_paging_enabled => true
    # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
    use_column_value => true
    #用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名
    tracking_column => "unix_ts_in_secs"
    # tracking_column 对应字段的类型
    tracking_column_type => "numeric"
    # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
    schedule => "*/5 * * * * *"
    #同步数据的查询sql语句
    statement => "select id,name,f_int,f_dou,f_flo,date_format(create_time,'%Y-%m-%d %H:%i:%S') as create_time,date_fo
rmat(update_time,'%Y-%m-%d %H:%i:%S') as update_time, unix_timestamp(update_time) as unix_ts_in_secs from tb_es where 
(unix_timestamp(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
    last_run_metadata_path =>"/opt/logstash-6.8.5/logs/11.txt"
  }
}

#logstash输入数据的字段匹配和数据过滤
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["@version","@timestamp", "unix_ts_in_secs"]
  }
}

#logstash输出配置
output {
  #采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
  stdout { codec =>  "rubydebug"}

  #指定输出到ES的具体索引
  elasticsearch {
      hosts => ["http://192.168.1.134:19200"]
      user => "elastic"
      password => "elastic"
      index => "index_tb_es01"
      document_id => "%{[@metadata][_id]}"
  }
}

 

表2:

[root@host135 config]# more sync_mysql2es_notime.conf
#logstash输入配置
input {
  #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
  jdbc {
    jdbc_driver_library => "/soft/mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.1.14:13306/db_hxl"
    jdbc_user => root
    jdbc_password => mysql
    # 是否开启分页
    jdbc_paging_enabled => true
    # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
    use_column_value => true
    #用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名
    tracking_column => "id"
    # tracking_column 对应字段的类型
    tracking_column_type => "numeric"
    # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
    schedule => "*/5 * * * * *"
    #同步数据的查询sql语句
    statement => "select * from tb_es_notime where id>:sql_last_value"
    last_run_metadata_path =>"/opt/logstash-6.8.5/logs/22.txt"
  }
}

#logstash输入数据的字段匹配和数据过滤
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["@version","@timestamp"]
  }
}

#logstash输出配置
output {
  #采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
  stdout { codec =>  "rubydebug"}

  #指定输出到ES的具体索引
  elasticsearch {
      hosts => ["http://192.168.1.134:19200"]
      user => "elastic"
      password => "elastic"
      index => "index_tb_es_notime"
      document_id => "%{[@metadata][_id]}"
  }
}

 

2.编辑pipelines.yml文件
在最后面添加如下配置

- pipeline.id: table1
  path.config: "/opt/logstash-6.8.5/config/sync_mysql2es.conf"
- pipeline.id: table2
  path.config: "/opt/logstash-6.8.5/config/sync_mysql2es_notime.conf"

 

3.运行
[root@host135 bin]# cd /opt/logstash-6.8.5/bin
[root@host135 bin]#./logstash

 

标签:pipeline,jdbc,column,id,同步,mysql,logstash,metadata
From: https://www.cnblogs.com/hxlasky/p/17805285.html

相关文章

  • OpenFunction 1.2.0 发布:集成 KEDA http-addon 作为同步函数运行时
    OpenFunction是一个开源的云原生FaaS(FunctionasaService,函数即服务)平台,旨在帮助开发者专注于业务逻辑的研发。我们非常高兴地宣布OpenFunction又迎来了一次重要的更新,即v1.2.0版本的发布!本次更新中,我们继续致力于为开发者们提供更加灵活和强大的工具,并在此基础上加入了......
  • logstash同步多个表jdbc
     [root@host135config]#moresync_multi_table_mysql2es.conf#logstash输入配置input{#jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期jdbc{jdbc_driver_library=>"/soft/mysql-connector-java-5.1.49.jar"jdbc_driver_class=>"com.my......
  • 实时同步刷新excel数据到数据库
    实时同步刷新excel数据到数据库前面,我们已经讲了定时导入excel到数据库,我们只需要稍作修改就可以实现实时刷新。新增定时任务,定时设置全部填*,即代表实时运行可以在定时任务界面看到任务在不停的运行,当excel数据更新了,点击保存后,马上就可以在数据库看到更新的数据了因为实时运行刷......
  • Hbuilderx运行uni-app项目到Android Studio模拟器只显示“同步手机端程序文件完成”界
    如图,开发工具也显示同步文件,模拟器也显示同步文件完成,但是就是不展示页面,遇到这种情况,一般是2种情况,一个是项目本身有问题跑不起来,另一个就是创建的模拟器设备参数不支持当前app。一.连接真机调试,排除项目本身问题:如果连接真机都跑不起来,那么看下控制台日志,先解决项目本身的问......
  • Redis通过复制rdb文件方式同步线上数据到本地以及提示:Can't handle RDB format versi
    场景Redis的持久化机制-RDB方式和AOF方式:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/105052841Redis持久化机制导致服务自启动后恢复数据过长无法使用以及如何关闭:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/130237326以上对于redis持久化......
  • 钉钉同步基础数据(用户和部门)
    1、创建钉钉应用1.1、登录钉钉开放平台https://open-dev.dingtalk.com/1.2、创建钉钉应用点击开发者后台点击应用开发点击创建应用创建成功之后需要开通对应的权限2、java集成钉钉开发2.1、导入依赖<dependency><groupId>com.aliyun</groupId><artifactId>alibaba-d......
  • logstash采用了增量同步后想做全新的同步的方法
     1.文件同步到esNosincedb_pathset,generatingonebasedonthe"path"setting{:sincedb_path=>"/opt/logstash-6.8.5/data/plugins/inputs/file/.sincedb_f019a9f5e77dadb5d6981e37ca0b16f6",:path=>["/tmp/myfile/*.log"]}将.sincedb......
  • 使用logstash同步mysql到ES
    环境:OS:Centos7es:6.8.5logstash:6.8.5mysql:5.7 1.mysql创建表createtabletb_es(idbigint(20)unsignedNOTNULLAUTO_INCREMENTCOMMENT'主键id',namevarchar(32)notnull,f_intint,f_doudouble(10,2),f_flofloat(9,2),create_timet......
  • obsidian实现安卓、windowl同步(Remotely Save+腾讯云对象存储)
    1obsidian安卓端下载https://mobile.softpedia.com/apk/obsidian/2同步思路本文的同步方案并不是以下方案,个人没有采用以下方案。2.1方案1:坚果云网盘方案(没采用此方案)相当于把坚果云作为一个网盘。只需要在电脑端和手机端,同时安装坚果云软件,就可以实现文件在电脑端和手机端同步......
  • Linux时间校准、时间同步(ntpdate及C代码NTP客户端代码校准示例)
    背景机器每次机启后时间就会出现异常,因为机器无法访问外网,只能访问局域网的ntp服务,所以需要保证局域网内部有ntp服务,如何安装ntp服务,参考Ubuntu20.04Ntp服务安装及验证。网络时间协议NetworkTimeProtocol(NTP)是一种确保时钟保持准确的方法。如果可以访问互联网,只需安装ntp......