首页 > 数据库 >logstash 同步mysql 到elasticsearch

logstash 同步mysql 到elasticsearch

时间:2022-11-15 15:22:45浏览次数:47  
标签:jdbc last elasticsearch student mysql type logstash

1.下载logstash 要和 es的版本一致  用的是 7.17.6  

     下载地址:https://www.elastic.co/cn/downloads/past-releases#logstash

2.配置  LS_JAVA_HOME    系统环境变量

  logstash  7版本之后 用的是这个环境变量 6的  直接用原来的JAVA_HOME就可以 ,7版本的 java  jdk用8 或者11都可以,我用的是11

3. 在xx\logstash-7.17.6-windows-x86_64\logstash-7.17.6\  和bin的的同级目录下创建一个mysql文件夹

  a.在mysql文件夹里创建一个 jdbc.conf 文件, student.sql,student_last_value.txt,以及添加mysql连接的jar包 mysql-connector-java-5.1.47.jar

     

     jdbc.conf  里边配置 jdbc 以及输出es的相关信息

  student_last_value.txt 用来处理增量数据同步时记录最新数据值的

     student.sql 写入查询的sql语句 

   b.配置jdbc.conf 文件  一定要注意 文件保存的时候是 utf8,  别选utf8-bom这种的,可以用notepad++ 查看编码

input{
    jdbc{
        # mysql相关jdbc配置 注意 serverTimezone 时区的设置
        jdbc_connection_string => "jdbc:mysql://ip:port/test?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false"
        # 数据库连接账号
        jdbc_user => "账号"
        # 数据库连接密码
        jdbc_password => "密码"
        
        # jdbc连接mysql驱动的文件
        jdbc_driver_library => "D:\xxxx\mysql\mysql-connector-java-5.1.47.jar"
        # mysql驱动程序类的名称
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        # 是否启用分页
        jdbc_paging_enabled => "true"
        # 分页大小
        jdbc_page_size => "10000"
        
        
        # mysql文件, 也可以直接写SQL语句在此处,如下:
        statement_filepath => "D:\xxxxx\logstash-7.17.6\mysql\student.sql"
        # 同步sql语句
        #statement => "SELECT * FROM `student`"
        
        # 这里类似crontab,可以定制定时操作,比如每10分钟执行一次同步(分 时 天 月 年)
        schedule => "* * * * * *"
        # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
        lowercase_column_names => false
        # 处理中文乱码问题
        codec => plain { charset => "UTF-8" }
        # 使用其它字段追踪,而不是用时间
        use_column_value => true
        # 追踪的字段  这个字段只有在上面的lowercase_column_names配置为false的时候才会区分大小写  因为不配置为false的话默认是true  查询结果字段默认会变成全小写;   我这里使用的是更新时间
        tracking_column => "up_time"
        # 这个就是追踪字段的类型,只有数值和时间两个类型(numeric和timestamp,默认numeric) 这个值会记录到last_run_metadata_path 配置的文件中 如果配置是numeric 那么默认值为0 如果配置为timestamp 那么默认值为1970年
        tracking_column_type => "timestamp"
        # 记录上一次运行记录
        record_last_run => true
        # 上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值  这个就是增量数据同步的关键
        last_run_metadata_path => "D:\xxxxx\logstash-7.17.6\mysql\student_last_value.txt"
        # 是否清除 last_run_metadata_path 的记录,如果为true那么每次都相当于从头开始查询所有的数据库记录
        clean_run => false
        # 多表同步时,表类型区分,建议命名为“库名_表名”,每个jdbc模块需对应一个type
        type => "student"
    }
}

filter{

}

output{
    # 根据input中type进行不同操作,多表同步会用到,可能每一个表输出不同索引
    if[type] == "student"{
        elasticsearch{
            # es地址
            hosts => ["ip:port"]
            user => "账号"
            password => "密码"
            # 索引名称
            index => "test"
            # es的id生成策略。不写es自己分配,我这里使用和mysql一致的id
            document_id => "%{id}"
        }
    }

    stdout{
        codec => json_lines
    }
}

  c. 在student.sql 文件中写入自己 的sql语句  (:sql_last_value  这个是student_last_value.txt中的值 )

SELECT id,name,age,tel,up_time FROM `student` where up_time>:sql_last_value  order by up_time asc

 d.student_last_value.txt 文件里边写入时间  (他会自动改 每次查询的最后一个时间点会更新进去)

    如果设置的自动增长列  不是时间类型的 是numeric 类型 那么 就吧该文件里边设置为数字0

1970-00-00 00:00:00

 

4.在es中创建对应的索引库 ,es创建索引库的方法 百度搜索

   在kibana的dev tools  执行

  

PUT /test/
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
    
    "properties":{
      "id":{
        "type":"long"
      },
      "name":{
        "type":"keyword"
      },
      "age":{
        "type":"keyword"
      },
      "tel":{
        "type":"keyword"
      },
       "up_time" : {
            "type" : "date"
      }
    }
    
  } 
  
 }
 

5.定时任务配置  jdbc schedule的配置规则,类似linux的crontab的写法,具体语法规则如下: 

a. *    *   *   *  *            
   分   时  天   月 星期

6.执行logstash 

  a.在bin目录下 cmd

  b. 输入 logstash  -f   ../mysql/jdbc.conf 

 

标签:jdbc,last,elasticsearch,student,mysql,type,logstash
From: https://www.cnblogs.com/kaikaichao/p/16892431.html

相关文章

  • Elasticsearch-head安装
    1.安装node要安装elasticsearch-head插件,需要先安装node.js。官网下载地址:https://nodejs.org/en/download/或者使用命令行安装wgethttps://npm.taobao.org/mirrors......
  • mysql详细学习笔记
    连接与断开服务器mysql-h地址-P端口-u用户名-p密码mysql>SHOWPROCESSLIST;显示哪些线程正在运行+----+------+-----------+------+---------+------+----......
  • 通过fluentd配置输出到elasticsearch,启动服务报错 Using Elasticsearch client 8.4.0
    问题描述 通过fluentd的配置文件,将fluentd的输出定向到elasticsearch中,配置文件如下: <matchsyslog.**>@typeelasticsearchhost172.20.58.152port1920......
  • mysqld_safe Directory '/tmp/mysql' for UNIX socket file don't exists.
    报错版本:mysql-5.7.351、报错完整提示信息;[root@localhostbin]#2022-11-15T04:04:43.122905Zmysqld_safeLoggingto'/var/log/mysql.log'.2022-11-15T04:04:43.1......
  • elasticsearch安装(Linux)
    1.下载安装包官网下载,下载地址:https://www.elastic.co/cn/downloads/past-releases#elasticsearch将安装包上传到服务器#创建文件mkdirelasticsearch  2.创建......
  • (初学)记一次dockercompose安装mysql8 以及问题
    docker-compose文件如下:1version:'3'2services:3#mysql服务4service-mysql:5image:docker.io/mysql:8.0.276command:7--def......
  • ElasticSearch深度分页详解
    1前言ElasticSearch是一个实时的分布式搜索与分析引擎,常用于大量非结构化数据的存储和快速检索场景,具有很强的扩展性。纵使其有诸多优点,在搜索领域远超关系型数据库,但依......
  • MySql小知识
    int类型​设计表的时候int(3)并不能限制取值范围和占用空间这个是int类型本身限制的例:int(3)在插入数据的时候1000也能插入成功(一版设计表的时候只填写类型,后面不加(X))......
  • Mysql (数学相关函数机日期函数)
    一、数学相关函数(一)abs绝对值(二)ceiling(number2)向上取整,得到比num2大的最小整数(三)BIN(decimal_number)十进制转二进制(四)conv(number2,from_base,to_base)进制......
  • mysql 配置远程访问
    先使用root用户进入mysqlmysql-uroot-p创建新用户createuser用户名identifiedby'密码';赋予所有权限GRANTALLPRIVILEGESON*.*TO'用户名'@'%'IDENTIFI......