首页 > 数据库 >How to synchronize Elasticsearch with MySQL

How to synchronize Elasticsearch with MySQL

时间:2024-12-18 23:09:37浏览次数:8  
标签:index MySQL How books Elasticsearch mysql data

How to synchronize Elasticsearch with MySQL

Using Logstash to create a data pipe linking Elasticsearch to MySQL in order to build an index from scratch and to replicate any changes occurring on the database records into Elasticsearch

https://towardsdatascience.com/how-to-synchronize-elasticsearch-with-mysql-ed32fc57b339

 

Quick Access Link

Technology Stack

  • MySQL as a main database (version 8.0.22)
  • Elasticsearch as a text search engine (version 7.9.3)
  • Logstash as a connector or data pipe from MySQL to Elasticsearch (version 7.9.3)
  • Kibana for monitoring and data visualization (version 7.9.3)
  • JDBC Connector/J (version 8.0.22)

Content

  • Problem Statement
  • Solution
    - Benefits from using Logstash
    - Use Cases
  • Implementing a proof of concept
    - Creating a host directory for the project
    - Setup a MySQL database
    - Setup Elasticsearch and Kibana
    - Setup Logstash to pipe data from MySQL to Elasticsearch:
    * First Scenario — Creating an Elasticsearch index from scratch
    * Second Scenario — Replicating changes on the database records to Elasticsearch
  • Conclusion
  • Resources

Photo by Sébastien Jermer on Unsplash

Problem Statement

A common scenario for tech companies is to start building their core business functionalities around one or more databases, and then start connecting services to those databases to perform searches on text data, such as searching for a street name in the user addresses column of a users table, or searching for book titles and author names in the catalog of a library.

So far everything is fine. The company keeps growing and acquires more and more customers. But then searches start to become slower and slower, which tends to irritate their customers and makes it hard to convince new prospects to buy in.

Looking under the hood, the engineers realize that MySQL’s FULLTEXT indexes are not ideal for text look-ups on large datasets, and in order to scale up their operations, the engineers decide to move on to a more dedicated and battle-tested text search engine.

Enters Elasticsearch and its underlying Lucene search engine. Elasticsearch indexes data using an inverted document index, and this results in a blazing-fast full-text search.

A new challenge then comes in: How to get the data that is in a MySQL database into an Elasticsearch index, and how to keep the latter synchronized with the former?

Solution

The modern data plumber’s toolkit contains a plethora of software for any data manipulation task. In this article, we’ll focus on Logstash from the ELK stack in order to periodically fetch data from MySQL, and mirror it on Elasticsearch. This will take into consideration any changes on the MySQL data records, such as create , update , and delete , and have the same replicated on Elasticsearch documents.

Benefits from using Logstash

The problem we are trying to solve here — sending data periodically from MySQL to Elasticsearch for syncing the two — can be solved with a Shell or Python script ran by a cron job or any job scheduler, BUT this would deprive us from the benefits otherwise acquired by configuring Logstash and its plugin-based setup:

Use Cases

We will cover two scenarios in the following steps:

  1. Creating an Elasticsearch index and indexing database records from scratch.
  2. Incremental update of the Elasticsearch index based on changes occurring on the database records (creation, update, deletion).

Implementing a proof of concept

Imagine you are opening an online library where avid readers can search your catalog of books to find their next reading. Your catalog contains millions of titles, ranging from scientific literature volumes to pamphlets about exotic adventures.

We will create an initial database table books with a few thousands of records with book titles, authors, ISBN and publication date. We’ll use the Goodreads book catalog that can be found on Kaggle. This initial table will serve prototyping the use case (1) Building an index from scratch.

We will create triggers on the table books that will populate a journal table books_journal with all changes on the books table (e.g. create , update , delete ). This means that whenever a record on the books table is created, updated, or deleted, this action will be recorded on the books_journal table, and the same action will be done on the corresponding Elasticsearch document. This will serve prototyping the use case (2) Incremental update of the Elasticsearch index.

We’ll prototype and test the above concepts by defining a micro-services architecture using docker-compose.

Architecture of this project — Image by Author

Prerequisites

Steps

Full source code can be found on GitHub at sync-elasticsearch-mysql.

  1. Start by creating a directory to host this project (named e.g. sync-elasticsearch-mysql) and create a docker-compose.yaml file inside that directory with the following initial setup:
version: "3"
services:
...
# Here will come the services definition
...

2. Setup a MySQL database: Create a directory data/ where we’ll store MySQL dump files with the pre-cleaned books data for the books table, as well as the triggers for the books table (on create, update, and delete), and a new_arrivals table with books we will add to our catalog to simulate new records, and the table books_journal .

You can find these dump files in the project repository. Please copy them to the data/ directory so that the MySQL container will add them to the books database during the startup process.

version: "3"
services:
# add this:
mysql:
image: mysql:8
container_name: sem_mysql
ports:
- 3306:3306
environment:
MYSQL_RANDOM_ROOT_PASSWORD: "yes"
MYSQL_DATABASE: books
MYSQL_USER: avid_reader
MYSQL_PASSWORD: i_love_books
volumes:
# Dump files for initiating tables
- ./data/:/docker-entrypoint-initdb.d/

Note that storing usernames and passwords in plain text in your source code is not advised. The above is done for the sake of prototyping this project only and is not in any case a recommended practice.

To start MySQL and check that data has been added successfully, from you project’s directory run on a terminal:

docker-compose up -d mysql # -d is for detached mode# Once container started, log into the MySQL container
docker exec -it sem_mysql bash# Start a MySQL prompt
mysql -uavid_reader -pi_love_books# Check that tables have been loaded from dump files
use books;
show tables;

Listing of the available tables and a sample of the data — Image by Author

To show the triggers with a JSON-like formatting, use command:

SHOW TRIGGERS \G;# To exit the MySQL prompt press CTRL+D
# To logout from the MySQL container, press CTRL+D

Now that we have our database properly set up, we can move on to the meat and potatoes of this project.

3. Setup Elasticsearch and Kibana: To setup Elasticsearch (without indexing any document yet), add this to your docker-compose.yaml file:

version: "3"
services:
...
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.9.3
container_name: sem_elasticsearch
environment:
- discovery.type=single-node
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- ./volumes/elasticsearch:/usr/share/elasticsearch/data
logging:
driver: "json-file"
options:
max-size: "10k"
max-file: "10"
kibana:
image: docker.elastic.co/kibana/kibana:7.9.3
container_name: sem_kibana
environment:
- "ELASTICSEARCH_URL=http://elasticsearch:9200"
- "SERVER_NAME=127.0.0.1"
ports:
- 5601:5601
depends_on:
- elasticsearch

Note that the volumes definition is recommended in order to mount the Elasticsearch index data from the docker volume to your directory file system.

To get Elasticsearch and Kibana started, run the following from your project directory:

docker-compose up -d elasticsearch kibana # -d is for detached mode# To check if everything so far  is running as it should, run:
docker-compose ps

After executing the above commands, 3 containers should be up and running — Image by Author

Let’s check if there is any index in our Elasticsearch node so far:

# On your terminal, log into the Elasticsearch container
docker exec -it sem_elasticsearch bash# Once logged in, use curl to request a listing of the indexes
curl localhost:9200/_cat/indices

If Kibana is up and running, you’ll see a list of indices used for metrics and visualization, but nothing related to our books yet — and if Kibana hasn’t been started, the list of indices will be empty.

List of indexes created by Kibana — Image by Author

4. Setup Logstash to pipe data from MySQL to Elasticsearch:

To connect Logstash to MySQL, we will use the official JDBC driver available at this address.

Let’s create a Dockerfile (named Dockerfile-logstash in the same directory) to pull a Logstash image, download the JDBC connector, and start a Logstash container. Add these lines to your Dockerfile:

FROM docker.elastic.co/logstash/logstash:7.9.3# Download JDBC connector for Logstash
RUN curl -L --output "mysql-connector-java-8.0.22.tar.gz" "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.22.tar.gz" \
&& tar -xf "mysql-connector-java-8.0.22.tar.gz" "mysql-connector-java-8.0.22/mysql-connector-java-8.0.22.jar" \
&& mv "mysql-connector-java-8.0.22/mysql-connector-java-8.0.22.jar" "mysql-connector-java-8.0.22.jar" \
&& rm -r "mysql-connector-java-8.0.22" "mysql-connector-java-8.0.22.tar.gz"ENTRYPOINT ["/usr/local/bin/docker-entrypoint"]

Then add the following snippet to your docker-compose.yaml file:

version: "3"
services:
...
logstash:
build:
context: .
dockerfile: Dockerfile-logstash
container_name: sem_logstash
depends_on:
- mysql
- elasticsearch
volumes:
# We will explain why and how to add volumes below

Logstash uses defined pipelines to know where to get data from, how to filter it, and where should it go. We will define two pipelines: one for creating an Elasticsearch index from scratch (first scenario), and one for incremental updates on changes to the database records (second scenario).

Please check documentation for an explanation of each of the fields used in the pipeline definition:

4.a. First Scenario — Creating an Elasticsearch index from scratch:

In your project directory, create a volumes folder (if not already created) then create a directory to host our Logstash configuration:

mkdir -p volumes/logstash/config

Then in this config directory, create a file pipelines.yml containing:

- pipeline.id: from-scratch-pipeline
path.config: "/usr/share/logstash/pipeline/from-scratch.conf"

We then create a folder pipeline to host our pipelines definitions:

mkdir volumes/logstash/pipeline

and there create a file from-scratch.conf with the following content:

input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.22.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306"
jdbc_user => "avid_reader"
jdbc_password => "i_love_books"
clean_run => true
record_last_run => false
statement_filepath => "/usr/share/logstash/config/queries/from-scratch.sql"
}
}filter {
mutate {
remove_field => ["@version", "@timestamp"]
}
}output {
# stdout { codec => rubydebug { metadata => true } }
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "books"
action => "index"
document_id => "%{isbn}"
}
}

We define where to find the JDBC connector in jdbc_driver_library , setup where to find MySQL in jdbc_connection_string , instruct the plugin to run from scratch in clean_run , and we define where to find the SQL statement to fetch and format the data records in statement_filepath .

The filter basically removes extra fields added by the plugin.

In the output, we define where to find the Elasticsearch host, set the name of the index to books (can be a new or an existing index), define which action to perform (can be index , create , update , delete see docs), and setup which field will serve as a unique ID in the books index — ISBN is an internationally unique ID for books.

To add the SQL query referenced by statement_filepath , let’s create a folder queries :

mkdir volumes/logstash/config/queries/

Then add file from-scratch.sql with as little as:

SELECT * FROM books.books

to get our index built with all the records available so far in our books table. Note that the query should NOT end with a semi-colon (;).

Now to mount the configuration files and definitions created above, add the following to your docker-compose.yaml file:

version: "3"
services:
...
logstash:
...
volumes:
- ./volumes/logstash/pipeline/:/usr/share/logstash/pipeline/
- ./volumes/logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml
- ./volumes/logstash/config/queries/:/usr/share/logstash/config/queries/

Note that volumes directives are a way to tell Docker to mount a directory or file inside the container (right hand side of the :) onto a directory or file on your machine or server (left hand side of the :). Checkout the official Docker Compose documentation on volumes.

Testing

In order to test the implementation for this first scenario, run on your terminal:

docker-compose up logstash

If the run is successful, you will see a message similar to Logstash shut down and the container will exit with error code 0.

Now head to Kibana on you browser (to this link for example) and let’s start playing around to see if we have a books index and how we can search for books.

The books index has been successfully created and contains 1k documents — Image by Author

In the Dev Tools panel, you can run custom queries to fetch documents by field value. For example, let’s search for books with the word “magic” in their title. Paste this query on the Dev Tools console:

GET books/_search
{
"query": {
"match": {
"title": "magic"
}
}
}

We get 9 documents:

Searching for books containing the word “magic” in their title — Image by Author

 

Image by Giphy

 

4.b. Second Scenario — Replicating changes on the database records to Elasticsearch:

Most of the configuring and tweaking has been done in the previous part. We will simply add another pipeline that will take charge of the incremental update (replication).

In the file volumes/logstash/config/pipelines.yml, add these two line:

- pipeline.id: incremental-pipeline
path.config: "/usr/share/logstash/pipeline/incremental.conf"

In the same file, you may want to comment out the two previous lines related to the “from-scratch” part (first scenario) in order to instruct Logstash to run this incremental pipeline only.

Let’s create a file incremental.conf in the directory volumes/logstash/pipeline/ with the following content:

input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.22.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306"
jdbc_user => "avid_reader"
jdbc_password => "i_love_books"
statement_filepath => "/usr/share/logstash/config/queries/incremental.sql"
use_column_value => true
tracking_column => "journal_id"
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
}
}filter {
if [action_type] == "create" or [action_type] == "update" {
mutate { add_field => { "[@metadata][action]" => "index" } }
} else if [action_type] == "delete" {
mutate { add_field => { "[@metadata][action]" => "delete" } }
}mutate {
remove_field => ["@version", "@timestamp", "action_type"]
}
}output {
# stdout { codec => rubydebug { metadata => true } }
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "books"
action => "%{[@metadata][action]}"
document_id => "%{isbn}"
}
}

We see a few extra parameters compared to the previous pipeline definition.

The schedule parameter has a cron-like syntax (with a resolution down to the second when adding one more value the left), and uses the Rufus Scheduler behind the scene. Here we instruct Logstash to run this pipeline every 5 seconds with */5 * * * * * .

The website crontab.guru can help you read crontab expressions.

During each periodic run, the parameter tracking_column instructs Logstash to store the journal_id value of the last record fetched and store it somewhere on the filesystem (See documentation here for last_run_metadata_path ). In the following run, Logstash will fetch records starting from journal_id + 1 , where journal_id is the value stored in the current run.

In the filter section, when the database action type is “create” or “update”, we set the Elasticsearch action to “index” so that new documents get indexed, and existing documents get re-indexed to update their value. Another approach to avoid re-indexing existing documents is to write a custom “update” script and to use the “upsert” action. For “delete” actions, the document on Elasticsearch gets deleted.

Let’s create and populate the file incremental.sql in directory volumes/logstash/config/queries/ with this query:

SELECT
j.journal_id, j.action_type, j.isbn,
b.title, b.authors, b.publication_date
FROM books.books_journal j
LEFT JOIN books.books b ON b.isbn = j.isbn
WHERE j.journal_id > :sql_last_value
AND j.action_time < NOW()
ORDER BY j.journal_id

Testing

Let’s have a look at the table books.new_arrival . It contains books that just got delivered to us and we didn’t have time to add them to our main books.books table.

Result of “SELECT * FROM books.new_arrival;” — Image by Author

As we can see, none of the books in that table are in our Elasticsearch index. Let’s try with the 3 book in the table above “The Rocky Road to Romance (Elsie Hawkins #4)”, ISBN 9780060598891:

Search for ISBN 9780060598891 returns empty result — Image by Author

Now let’s transfer that book from the new arrivals table to our main books table:

INSERT INTO books
SELECT * FROM new_arrival WHERE isbn = 9780060598891;

Running the same search on Elasticsearch, we are happy to see that the document is now available:

Now ISBN 9780060598891 is available on the Elasticsearch index — Image by Author

The sync is working!

Let’s test with an update of the same ISBN:

UPDATE books
SET title = "The Rocky what ??"
WHERE isbn = 9780060598891;
Updates on the database records are successfully replicated to Elasticsearch documents — Image by Author

…and a delete of the same ISBN:

DELETE from books WHERE isbn = 9780060598891;
A record deleted from the database will also be removed from the Elasticsearch index — Image by Author   Image by Giphy  

Conclusion

We managed to get a proof of concept up and running in a short amount of time of how to index data into Elasticsearch from a MySQL database, and how to keep Elasticsearch in sync with the database.

The technologies we used are a gold standard in the industry and many businesses rely on them daily to serve their customer, and it’s very likely that many have encountered or will encounter the same problem we solved in this project.

Get in touch

Shoot me a message! I am on these social platforms:

If you find this post helpful, feel free to share it and follow to get my latest articles!

 

Integration of ElasticSearch With MySQL

https://www.baeldung.com/ops/elasticsearch-mysql

 

 

refer

https://github.com/rayjasson98/docker-mysql-elk

https://github.com/Colibri-code/ElasticSearch-Docker-MySql

https://github.com/zhangjunjie6b/mysql2elasticsearch

https://github.com/r13i/sync-elasticsearch-mysql

 

标签:index,MySQL,How,books,Elasticsearch,mysql,data
From: https://www.cnblogs.com/lightsong/p/18616032

相关文章

  • MySQL基础 -----MySQL数据类型
    目录INT类型tinyint类型类型大小范围测试tinyint类型数据float类型测试:测试正常数据范围的数据测试插入范围超过临界值的数据:测试float类型的四舍五入​编辑decimal类型同样测试:字符串类型char类型测试:varchar类型测试:溢出测试:DATA类型:date类型测试:dat......
  • Python知识分享第二十九天-PyMySQL
    PyMySQL介绍:概述:它是Python的1个库(模块),可以实现通过Python代码,操作MySQL数据库.该库需要手动安装一下.安装方式:方式1:导包时自动安装.方式2:在PyCharm的Settings->Python编辑器或者Anaconda->安装方式3:通过pip方式,在命令行中......
  • Mysql连接报错:1130-host ... is not allowed to connect to this MySql server如何处
    这个问题是因为在数据库服务器中的mysql数据库中的user的表中没有权限(也可以说没有用户),下面将记录我遇到问题的过程及解决的方法。在搭建完LNMP环境后用Navicate连接出错遇到这个问题首先到mysql所在的服务器上用连接进行处理1、连接服务器:mysql-uroot-p......
  • docker高级篇(大厂进阶):安装mysql主从复制
    @目录1.Docker复杂安装详说1.1安装mysql主从复制本人其他相关文章链接1.Docker复杂安装详说1.1安装mysql主从复制主从搭建步骤:1)新建主服务器容器实例33072)进入/mydata/mysql-master/conf目录下新建my.cnf3)修改完配置后重启master实例4)进入mysql-master容器5)master容器实......
  • MySQL存储引擎-存储结构
    Innodb存储结构BufferPool(缓冲池):BP以Page页为单位,页默认大小16K,BP的底层采用链表数据结构管理Page。在InnoDB访问表记录和索引时会在Page页中缓存,以后使用可以减少磁盘IO操作,提升效率。○Page根据状态可以分为三种类型:■freepage:空闲page,未被使用。■cleanp......
  • 基于Spring Boot + SSM(Spring + Spring MVC + MyBatis)+Vue+MySQL实现一个简单的用户管
    后端代码(SpringBoot+SSM部分)1.创建SpringBoot项目使用SpringInitializr(可以通过IDEA等IDE自带的创建SpringBoot项目功能,或者访问Spring官网的Initializr页面)创建一个基础的SpringBoot项目,添加相关依赖,比如Web(用于构建Web应用)、MyBatis、MyBatisSpringBootStarter、My......
  • 一文读懂MySQL创建索引技巧,让你的查询**嗖一下**就快起来
    一文读懂MySQL创建索引技巧,让你的查询嗖一下就快起来!......
  • Showrunner AI技术浅析(四):多智能体模拟
    多智能体模拟技术涉及多个智能体(Agents)在虚拟环境中的行为和互动,每个智能体都有自己的属性、目标和行为规则。1.多智能体模拟概述多智能体模拟技术通过模拟多个智能体在虚拟环境中的互动来生成复杂的剧情和场景。每个智能体都有其独特的属性和行为逻辑,通过与环境和其他智能......
  • MySQL Explain 分析SQL语句性能
     一、EXPLAIN简介使用EXPLAIN关键字可以模拟优化器执行SQL查询语句,从而知道MySQL是如何处理你的SQL语句的。分析你的查询语句或是表结构的性能瓶颈。(1)通过EXPLAIN,我们可以分析出以下结果:表的读取顺序数据读取操作的操作类型哪些索引可以使用哪些索引被实际使用表之间的引......
  • 【Elasticsearch】高亮搜索:从原理到Web呈现
    ......