首页 > 数据库 >使用Logstash同步Mysql到Easysearch

使用Logstash同步Mysql到Easysearch

时间:2023-08-15 10:01:11浏览次数:35  
标签:jdbc Easysearch mysql id Mysql table Logstash es

从 Mysql 同步数据到 ES 有多种方案,这次我们使用 ELK 技术栈中的 Logstash 来将数据从 Mysql 同步到 Easysearch 。

方案前提

  1. Mysql 表记录必须有主键,比如 id 字段。通过该字段,可将 Easysearch 索引数据与 Mysql 表数据形成一对一映射关系,支持修改。
  2. Mysql 表记录必须有时间字段,以支持增量同步。

如果上述条件具备,便可使用 logstash 定期同步新写入或修改后的数据到 Easysearch 中。

方案演示

版本信息

Mysql: 5.7
Logstash: 7.10.2
Easysearch: 1.5.0

MySQL 设置

创建演示用的表。

CREATE DATABASE es_db;
USE es_db;
DROP TABLE IF EXISTS es_table;
CREATE TABLE es_table (
id BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY unique_id (id),
client_name VARCHAR(32) NOT NULL,
modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

说明

  • id 字段: 主键、唯一键,将作为 Easysearch 索引中的 doc id 字段。
  • modification_time 字段: 表记录的插入和修改都会记录在此。
  • client_name: 代表用户数据。
  • insertion_time: 可省略,用来记录数据插入到 Mysql 数据的时间。
插入数据
INSERT INTO es_table (id, client_name) VALUES (1, 'test 1');
INSERT INTO es_table (id, client_name) VALUES (2, 'test 2');
INSERT INTO es_table (id, client_name) VALUES (3, 'test 3');

Logstash

配置文件

input {
  jdbc {
    jdbc_driver_library => "./mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.56.3:3306/es_db"
    jdbc_user => "root"
    jdbc_password => "password"
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    last_run_metadata_path => "./.mysql-es_table-sql_last_value.yml"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
  jdbc {
    jdbc_driver_library => "./mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.56.3:3306/es_db"
    jdbc_user => "root"
    jdbc_password => "password"
    schedule => "*/5 * * * * *"
    statement => "SELECT count(*) AS count,'es_table' AS table_name  from es_table"
  }

}
filter {
if ![table_name] {
        mutate {
           copy => { "id" => "[@metadata][_id]"}
           remove_field => ["@version", "unix_ts_in_secs","@timestamp"]
           add_field => { "[@metadata][target_index]" => "mysql_es_table" } }
      } else {
        mutate {
           add_field => { "[@metadata][target_index]" => "table_counts" }
           remove_field => ["@version"]
               }
        uuid {
           target  => "[@metadata][_id]"
           overwrite => true
             }
      }
}
output {
#  stdout { codec =>  rubydebug { metadata => true } }
  elasticsearch {
      hosts => ["https://localhost:9200"]
      user => "admin"
      password => "f0c6fc61fe5f7b084c00"
      ssl_certificate_verification => "false"
      index => "%{[@metadata][target_index]}"
      manage_template => "false"
      document_id => "%{[@metadata][_id]}"
  }
}
  • 每 5 秒钟同步一次 es_table 表的数据到 mysql_sync_idx 索引。
  • 每 5 秒统计一次 es_table 表的记录条数到 table_counts 索引,用于监控。
启动 logstash
./bin/logstash -f sync_es_table.conf

查看同步结果, 3 条数据都已同步到索引。

使用Logstash同步Mysql到Easysearch_Easysearch

Mysql 数据库新增记录

INSERT INTO es_table (id, client_name) VALUES (4, 'test 4');

Easysearch 确认新增

使用Logstash同步Mysql到Easysearch_Easysearch_02

Mysql 数据库修改记录

UPDATE es_table SET client_name = 'test 0001' WHERE id=1;

Easysearch 确认修改

使用Logstash同步Mysql到Easysearch_Logstash_03

删除数据

Logstash 无法直接删除操作到 ES ,有两个方案:

  1. 在表中增加 is_deleted 字段,实现软删除,可达到同步的目的。查询过滤掉 is_deleted : true 的记录,后续通过脚本等方式定期清理 is_deleted : true 的数据。
  2. 执行删除操作的程序,删除完 Mysql 中的记录后,继续删除 Easysearch 中的记录。

同步监控

数据已经在 ES 中了,我们可利用 INFINI Console 的数据看板来监控数据是否同步,展示表记录数、索引记录数及其变化。

使用Logstash同步Mysql到Easysearch_Easysearch_04

标签:jdbc,Easysearch,mysql,id,Mysql,table,Logstash,es
From: https://blog.51cto.com/u_15963473/7085420

相关文章

  • mysql添加索引的方法(Navicat可视化加索引和sql语句加索引)
    mysql添加索引的方法(Navicat可视化加索引和sql语句加索引) 使用索引的场景:阿里云日志里出现了慢sql 然后发现publish_works_id字段会经常用于一些关联,所以决定把这个字段加上索引,优化sql可视化navicat操作字段加索引,选择字段所在的表,第一步:右键->设计表第二步......
  • Zabbix监控Mysql主从
    一、主机规划服务器IPzabbix-server192.168.131.12mysql-master,zabbix-agent192.168.131.13mysql-slave,zabbix-agent192.168.131.14二、部署&配置zabbix-server2.1部署zabbix#安装Zabbix仓库。这里部署zabbix6.4wgethttps://repo.zabbix.com/zabbix/6.4/ubuntu/pool/main/z/zabb......
  • Mysql配置文件设置与了解
    [client]port=3306[mysql]default-character-set=gbk[mysqld]port=3306socket=/tmp/mysql.sock#设置mysql的安装目录basedir=F:\\HzqSoft\\MySqlServer51GA#设置mysql数据库的数据的存放目录,必须是data,或者是\\xxx-datadatadir=F:\\HzqSoft\\MyS......
  • WEEK08:MYSQL备份及恢复
    ......
  • 开源数据库Mysql_DBA运维实战 (DCL/日志)
    SQL(StructuredQueryLanguage即结构化查询语言)a.DDL语句 数据库定义语言:数据库,表,视图,索引,存储过程,函数,创建删除ALTER(CREATEDROPALTER) b.DML语句数据库操纵语言:插入数据INSERT、删除数据DELETE、更新数据UPDATEc.DQL语句 数据库查询语言:查询数据SELECTd.DCL语句数......
  • MySQL 使用表的自联结,lag,lead得到该行记录所在连续段长度
    目录题目地址代码题目地址https://leetcode.cn/problems/human-traffic-of-stadium/description/代码##WriteyourMySQLquerystatementbelow##本质上就是连续签到问题呗#SELECTVersion()#8.0.33,用户变量编程用不了witht1as(SELECT*fromstadium......
  • MySQL 可重复读边查边插,边删边查
    测试1:边查边插1--会话1查询select*fromt_sjq--431576--会话2INSERT一行,没有阻塞--会话1再次查询,多了一条select*fromt_sjq--431577测试1:边查边插2--会话1查询select*fromt_sjq--423577(会执行十几秒)--会话2会话1执行开始后马上插入1000条数据-......
  • MySQL数据库不可不学的一个数据库福利来了
    Spring常用注解redis视频集合,看完这些别说不会redis代码资料.zip代码资料解压密码:wosn.net第1章数据库简介-8-1[wosn.net].mp4第3章SQL语句规范-8-3[wosn.net].mp4第2章数据库的安装及配置-8-2.mp4第4章数据库的相关操作-8-4.mp4第10章测试字符串类型-8-10[wosn.net].mp4第11......
  • mysql 加索引
    1.PRIMARY  KEY(主键索引)    mysql>ALTER  TABLE  `table_name`  ADD  PRIMARY  KEY(  `column`  ) 2.UNIQUE(唯一索引)      mysql>ALTER  TABLE  `table_name`  ADD  UNIQUE(`column`) 3.INDEX(普通索引)    mysql>ALTER ......
  • 一文玩转MQTT (ESP8266 DHT11 MQTT MYSQL方案)
    本文我们来聊一聊esp8266利用mqtt协议进行通信。并将数据数据存入数据库的操作。关于MQTTMQTT(消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。搭建MQTT服务器......