首页 > 数据库 >使用logstash同步mysql到ES

使用logstash同步mysql到ES

时间:2023-11-01 15:25:21浏览次数:37  
标签:jdbc tb mysql update id time type logstash ES

环境:
OS:Centos 7
es:6.8.5
logstash:6.8.5
mysql:5.7

 

1.mysql创建表

create table tb_es (
  id bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
  name varchar(32) not null,
  f_int int,
  f_dou double(10,2),
  f_flo float(9,2),
  create_time timestamp not null default current_timestamp,
  update_time timestamp not null default current_timestamp on update current_timestamp,
  primary key (id)
);

 

2.写入测试数据

insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name1',100,123.12,16.26,now(),now());
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name2',200,323.12,26.46,now(),now());
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name3',300,423.12,36.36,now(),now());
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name4',400,623.12,46.56,now(),now());
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name5',500,723.12,56.66,now(),now());

 

3.logstash配置文件

vi sync_mysql2es.conf

input {
  #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
  jdbc {
    jdbc_driver_library => "/soft/mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.1.14:13306/db_hxl"
    jdbc_user => root
    jdbc_password => mysql
    # 是否开启分页
    jdbc_paging_enabled => true
    # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
    use_column_value => true
    #用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名
    tracking_column => "unix_ts_in_secs"
    # tracking_column 对应字段的类型
    tracking_column_type => "numeric"
    # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
    schedule => "*/5 * * * * *"
    #同步数据的查询sql语句
    statement => "select *, unix_timestamp(update_time) as unix_ts_in_secs from tb_es where (unix_timestamp(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
  }
}

#logstash输入数据的字段匹配和数据过滤
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["@version", "@timestamp","unix_ts_in_secs"]
  }
}

#logstash输出配置
output {
  #采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
  stdout { codec =>  "rubydebug"}

  #指定输出到ES的具体索引
  elasticsearch {
      hosts => ["http://192.168.1.134:19200"]
      user => "elastic"
      password => "elastic"
      index => "index_tb_es"
      document_id => "%{[@metadata][_id]}"
  }
}

 

4.执行同步
/opt/logstash-6.8.5/bin/logstash -f /opt/logstash-6.8.5/config/sync_mysql2es.conf

 

5.查看同步的数据

查看index

[root@host135 soft]# curl -u elastic:elastic -H "Content-Type: application/json" -X GET 'http://192.168.1.134:19200/_cat/indices?v'
health status index              uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   index_tb_es        fLGPfjfWSfCH0WLsn0176w   5   1          5            0     20.3kb         20.3kb
yellow open   vaccination_report mD_OEHpHQNeT7-S7wMhP9A   5   1     147693            0     52.7mb         52.7mb
green  open   .security-6        KUv3Qpw5Qg-bDQ6kE0csHQ   1   0          6            0     19.5kb         19.5kb

 

查看index数据

curl -u elastic:elastic -H "Content-Type: application/json" -XPOST '192.168.1.134:19200/index_tb_es/_search?pretty' -d '
{
"query": { "match_all": {} },
"size":10
}'

 

查看mapping

[root@host135 logstash-6.8.5]# curl -u elastic:elastic -H "Content-Type: application/json" -XGET "http://192.168.1.134:19200/index_tb_es/_mappings?pretty=true"
{
  "index_tb_es" : {
    "mappings" : {
      "doc" : {
        "properties" : {
          "create_time" : {
            "type" : "date"
          },
          "f_dou" : {
            "type" : "float"
          },
          "f_flo" : {
            "type" : "float"
          },
          "f_int" : {
            "type" : "long"
          },
          "id" : {
            "type" : "long"
          },
          "name" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "update_time" : {
            "type" : "date"
          }
        }
      }
    }
  }
}

 

6.重新做全量同步

每次做完同步后,同步的时间点会记录到文件.logstash_jdbc_last_run,该文件是隐藏文件,文件位置使用find查找
[root@host135 config]# find / -name *logstash_jdbc_last_run*
/root/.logstash_jdbc_last_run

删除该文件重新同步

 

##################提前创建好mapping的同步#########################

1.提前创建好空的index和mapping

curl -u elastic:elastic -X PUT "192.168.1.134:19200/index_tb_es01?pretty" -H 'Content-Type: application/json' -d'
{}
'

 

2.创建mapping

6.8.5版本的type默认值为doc,所有我们这里使用doc

curl -u elastic:elastic -H 'Content-Type: application/json' -XPOST "http://192.168.1.134:19200/index_tb_es01/doc/_mapping?pretty" -d ' 
{
        "properties" : {
          "create_time" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
          },
          "f_dou" : {
            "type" : "float"
          },
          "f_flo" : {
            "type" : "float"
          },
          "f_int" : {
            "type" : "long"
          },
          "id" : {
            "type" : "long"
          },
          "name" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "update_time" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
          }
        }
}'

 

执行同步发现报错误:

"reason"=>"failed to parse field [create_time] of type [date] in document with id '8'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Invalid format: \"2023-11-01T01:19:38.000Z\" is malformed at \"T01:19:38.000Z\""}}}}}

 

解决办法:
使用date_format格式化日期字段,如下:
date_format(create_time,'%Y-%m-%d %H:%i:%S') AS createTime

 

修改同步的配置文件:

[root@host135 config]# more sync_mysql2es.conf 
#logstash输入配置
input {
  #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
  jdbc {
    jdbc_driver_library => "/soft/mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.1.14:13306/db_hxl"
    jdbc_user => root
    jdbc_password => mysql
    # 是否开启分页
    jdbc_paging_enabled => true
    # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
    use_column_value => true
    #用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名
    tracking_column => "unix_ts_in_secs"
    # tracking_column 对应字段的类型
    tracking_column_type => "numeric"
    # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
    schedule => "*/5 * * * * *"
    #同步数据的查询sql语句
    statement => "select id,name,f_int,f_dou,f_flo,date_format(create_time,'%Y-%m-%d %H:%i:%S') as create_time,date_fo
rmat(update_time,'%Y-%m-%d %H:%i:%S') as update_time, unix_timestamp(update_time) as unix_ts_in_secs from tb_es where 
(unix_timestamp(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
  }
}

#logstash输入数据的字段匹配和数据过滤
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["@version","@timestamp", "unix_ts_in_secs"]
  }
}

#logstash输出配置
output {
  #采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
  stdout { codec =>  "rubydebug"}

  #指定输出到ES的具体索引
  elasticsearch {
      hosts => ["http://192.168.1.134:19200"]
      user => "elastic"
      password => "elastic"
      index => "index_tb_es01"
      document_id => "%{[@metadata][_id]}"
  }
}

 

标签:jdbc,tb,mysql,update,id,time,type,logstash,ES
From: https://www.cnblogs.com/hxlasky/p/17803187.html

相关文章

  • linux安装MySQL数据库初始化报错
    在使用如下初始化命令进行数据库初始化时报错,./bin/mysqld--user=mysql--basedir=/usr/local/mysql/mysql/--datadir=/usr/local/mysql/mysql/data/--initialize--lower-case-table-names=1; 权限不足导致,修改命令为:./bin/mysqld--user=root--basedir=/usr/local......
  • WCF restful 上传文件 返回413 request entity too large
    网上各种加binding都不行最后找到了在配置文件中加 webHttpBinding1<system.serviceModel>2<bindings>3<webHttpBinding>4<binding5maxBufferPoolSize="2048576000"6......
  • doris FE启动异常:org.yaml.snakeyaml.representer.Representer: method <init>()V not
    dorisFF启动异常,异常信息如下:  2023-11-0109:53:22,691INFO(main|1)[PaloFe.start():124]PaloFEstarting...2023-11-0109:53:22,699INFO(main|1)[FrontendOptions.analyzePriorityCidrs():107]configuredprior_cidrsvalue:10.252.226.5/242023-11-0109:5......
  • mysql主从复制
    一、什么是Binlog?Mysql的二进制日志可以是Mysql最重要的日志,记录了所有的DDL和DML语句(除了数据查询语句之外的语句)语句,以事件形式记录,还包含语句所执行的消耗时间,Mysql的二进制日志是事务安全型的。二进制日志包含两类文件:1、二进制日志索引文件(文件后缀为".index")用于记录有......
  • ZEGO 即构科技首发适配鸿蒙系统的 Express SDK 1.0 版本
    ​ 2019年8月,华为在开发者大会上正式发布鸿蒙系统。HarmonyOS鸿蒙系统是一款“面向未来”、面向全场景(移动办公、运动健康、社交通信、媒体娱乐等)的分布式操作系统。在传统的单设备系统能力的基础上,HarmonyOS提出了基于同一套系统能力、适配多种终端形态的分布式理念,能够支持......
  • Exception in thread "main" java.net.BindException: Cannot assign requested addre
    两种情况1.端口号被占用,导致地址无法绑定#windows查看端口pidnetstat-aon|findstr8080(端口号)#linux查看端口占用netstat-anp|grep80802.ip地址与本机地址不匹配,导致地址无法绑定#windows查看ipipconfig#linux查看ipifconfig......
  • 线性代数 · 矩阵 · Matlab | Cholesky 分解代码实现
    (搬运外网的代码,非原创;原网址)(其实是专业课作业,但感觉国内博客没有合适的代码实现,所以就搬运到自己博客了)背景-Cholesky分解:若A为n阶实对称正定矩阵,则存在非奇异下三角矩阵L,使得A=LL^T。是特殊的LU分解(下三角上三角分解)。若限定L的对角元素为正,则这种分解......
  • Oracle转为Mysql的数据结构差别
     Oracle的表空间相关函数TABLESPACE"SYSTEM"LOGGINGNOCOMPRESSPCTFREE10INITRANS1STORAGE(INITIAL65536NEXT1048576MINEXTENTS1MAXEXTENTS2147483645FREELISTS1FREELISTGROUPS1BUFFER_POOLDEFAULT)PARALLEL1NOCACHEDISABLE......
  • WhatsApp Business为什么会被封号?该如何解决
    目前,作为全球即时通讯领域的重要平台之一的WhatsApp已成为企业在营销和与客户沟通时的首选工具。但是长时间、高强度的营销行为很容易导致WhatsApp Business账户突然被封禁,无法再使用账号。即使后续再去进行申诉,要求官方解封该账户,也仍然需要等待一段时间。在这段时间里账号仍然是......
  • mysql安装步骤(windows版zip包)
    1.官网下载安装包https://cdn.mysql.com//Downloads/MySQL-8.2/mysql-8.2.0-winx64.zip2.在下载后的目录下找到Mysql压缩包并将其解压至自己创建的一个文件夹内(注意:目录名不可以是中文)3.my.ini内容如下,注意路径改成自己的,第5行basedir=、第7行datadir=[mysqld]#设置3306端口port......