首页 > 数据库 >flink-cdc同步mysql数据到elasticsearch

flink-cdc同步mysql数据到elasticsearch

时间:2023-08-10 20:47:09浏览次数:39  
标签:product cdc int flink 120 elasticsearch id view

1,什么是cdc

CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

2,flink的cdc

项目地址:https://github.com/ververica/flink-cdc-connectors

项目文档:https://ververica.github.io/flink-cdc-connectors/master/

 

3,环境准备

  • mysql
  • elasticsearch
  • flink on yarn

说明:如果没有安装hadoop,那么可以不用yarn,直接用flink standalone环境吧。

本例使用版本如下:

 下面两个地址下载flink的依赖包,放在lib目录下面。 

  下载地址:

  1、https://repo.maven.apache.org/maven2/com/alibaba/ververica/

  flink-sql-connector-mysql-cdc-1.4.0.jar

  此仓库提供的最新版本为1.4.0,如需新版本可自行编译或者去https://mvnrepository.com/下载。

  2、https://repo.maven.apache.org/maven2/org/apache/flink/

  flink-sql-connector-elasticsearch7_2.11-1.13.5.jar

  小坑:此处使用的是es7,由于本地环境是es8导致无法创建索引,又重新安装es7测试成功。

 

4,启动flink

启动flink集群

./start-cluster.sh

启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

 启动flink sql client

./sql-client.sh

启动成功后,可以看到如下的页面:

 

5,数据同步初始化

1)mysql数据库原始表

CREATE TABLE `product_view` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) NOT NULL,
`product_id` int(11) NOT NULL,
`server_id` int(11) NOT NULL,
`duration` int(11) NOT NULL,
`times` varchar(11) NOT NULL,
`time` datetime NOT NULL,
PRIMARY KEY (`id`),
KEY `time` (`time`),
KEY `user_product` (`user_id`,`product_id`) USING BTREE,
KEY `times` (`times`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 样本数据
INSERT INTO `product_view` VALUES ('1', '1', '1', '1', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('2', '1', '1', '1', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('3', '1', '1', '3', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('4', '1', '1', '2', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('5', '8', '1', '1', '120', '120', '2020-05-14 13:14:00');
INSERT INTO `product_view` VALUES ('6', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');
INSERT INTO `product_view` VALUES ('7', '8', '1', '3', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('8', '8', '1', '3', '120', '120', '2020-04-23 13:14:00');
INSERT INTO `product_view` VALUES ('9', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');

2)flink 创建source数据库关联表

CREATE TABLE product_view_source (
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.34.100.209',
'port' = '3306',
'username' = 'root',
'password' = '123',
'database-name' = 'flinkcdc_test',
'table-name' = 'product_view',
'server-id' = '5401' );

这样,我们在flink-sql client操作这个表相当于操作mysql里面的对应表。

3)flink 创建sink,数据库关联表elasticsearch

CREATE TABLE product_view_sink(
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://10.34.100.156:9200',
'index' = 'product_view_index'
);

这样,es里的product_view_index这个索引在数据同步时会被自动创建,如果想指定一些属性,可以提前手动创建好索引。往product_view_sink里面插入数据,可以发现es中已经有数据了。

查看flink创建的表

 查看flink表数据

select * from product_view_source;

 

select * from product_view_sink;

 由此可见,sink不能直接使用sql查询。

4)建立同步任务

insert into product_view_sink select * from product_view_source;

这个时候是可以退出flink sql-client的,然后进入flink web-ui,可以看到mysql表数据已经同步到elasticsearch中了,对mysql进行插入删除更新,elasticsearch都是同步更新的。

查看任务

 查看es数据

 

6,数据实时同步

1)新增记录

mysql数据库插入一条记录

INSERT INTO `product_view` VALUES ('10', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');

查询es,新增一条记录

 

2)删除记录

mysql数据库删除一条记录

DELETE FROM `product_view` where id=10;

查询es,减少一条记录

 3)更新记录

es原始记录

 mysql更新一条记录

UPDATE `product_view` SET user_id=100,product_id=101 WHERE id=2;

变更后es记录

 

7,遇到的问题

1)资源不足

flink默认taskmanager.numberOfTaskSlots=1即只能运行一个子任务,一般设置为机器的CPU核心数。

 2)重复server-id

 

前提:每个insert语句就会形成一个job,就是一个同步任务。

结论:通过实践可知,不同的job无法共享相同server-id的source表,一个job中也不能存在相同server-id的source表。

场景:假如source1和source2表具有相同的server-id,如果job1中使用source1或source2,那其他job就不能在用source1、source2了。

分析:先提交一个job1并且已经在同步了,此时如果提交的job2中有source表与job1中source表有相同的server-id,或job2中使用和job1中重复的source表,那job2也从job1已经读到的binlog位置开始读就会有问题,导致丢失数据,报如下错误。

最佳实践:一张source表对应一个server-id,如相同的source表需要提交到多个job中,可以在每个job中设置不同的表名及server-id。

举例说明:如order表需要在3个job中使用,job1中name=order1,server-id=5401;job2中name=order2,server-id=5402;job3中name=order3,server-id=5403;这样3个job就会各自维护各自的binlog状态

标签:product,cdc,int,flink,120,elasticsearch,id,view
From: https://www.cnblogs.com/wangjiayu/p/17621443.html

相关文章

  • 1.Elasticsearch集群包安装、二进制安装与Docker安装
    ElasticsearchElasticsearch是一个实时的全文搜索,存储库和分析引擎https://www.elastic.co/cn/what-is/elasticsearchElasticsearch在速度和可扩展性方面都表现出色,而且还能够索引多种类型的内容,可用于多种场景:应用程序搜索网站搜索企业搜索日志处理和分析基础设施指标和......
  • 2.Elasticsearch单节点安装脚本
    #!/bin/bashES_VERSION=7.17.5#ES_VERSION=7.9.3#ES_VERSION=7.6.2UBUNTU_URL="https://mirrors.tuna.tsinghua.edu.cn/elasticstack/7.x/apt/pool/main/e/elasticsearch/elasticsearch-${ES_VERSION}-amd64.deb"RHEL_URL="https://mirrors.tuna.tsingh......
  • 3.Elasticsearch集群安装脚本
    #!/bin/bashES_VERSION=7.17.5#ES_VERSION=7.9.3#ES_VERSION=7.6.2UBUNTU_URL="https://mirrors.tuna.tsinghua.edu.cn/elasticstack/7.x/apt/pool/main/e/elasticsearch/elasticsearch-${ES_VERSION}-amd64.deb"RHEL_URL="https://mirrors.tuna.tsinghu......
  • 4.Elasticsearch插件Head和Serebro实现Elasticsearch的图形化管理
    Elasticsearch访问Elasticsearch支持各种语言使用RESTfulAPI通过端口9200与之进行通信,可以用你习惯的web客户端访问Elasticsearch可以用三种方式和Elasticsearch进行交互curl命令和其它浏览器:基于命令行,操作不方便插件:在node节点上安装head,Cerebro等插件,实现图形操......
  • 5.Elasticsearch内存优化建议
    Elasticsearch内存优化建议内存优化建议:为了保证性能,每个ES节点的JVM内存设置具体要根据node存储的数据量来估算,建议符合下面约定1、在内存和数据量有一个建议的比例:对于一般日志类文件,1G内存能存储48G~96GB数据2、JVM堆内存最大不要超过30GB3、单个分片控制在30-50GB,......
  • 6.Filebeat的安装及收集日志到Elasticsearch并使用自定义索引
    利用Filebeat收集日志Filebeat是用于转发和集中日志数据的轻量级传送程序.作为服务器上的代理安装,Filebeat监视指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或Logstash进行索引.Logstash也可以直接收集日志,但需要安装JDK并且会占用至少500M以上的内存生产......
  • 8.利用Filebeat收集Nginx的Json格式访问日志和错误日志到Elasticsearch
    生产环境中经常需要获取Web访问用户的信息,比如:网站的PV,UV,状态码,用户来自哪个地区,访问时间等可以通过收集的Nginx的访问日志实现默认Nginx的每一次访问生成的访问日志是一行文本,ES没办法直接提取有效信息,不利于后续针对特定信息的分析可以将Nginx访问日志转换为JSON......
  • k8s部署 elasticsearch7集群,其中一台节点报错无法域名解析 :resolving host...
    部署es7集群其中一个节点一直报错resolvinghost考虑有两点,要么是coredns组件出问题了,无法解析,要么是calico网络组件出问题了,首先我就去看网络组件了,果然发现问题,我这台机器有两个网卡,一个是enp9s0(172.16.2.30地址),一个是enp11s0(172.16.2.25地址),我加入的节点是30机器,但是calico绑定......
  • Elasticsearch笔记
    拉呱,无论是当作全文检索工具,还是仅仅当作NOSQL,Elasticsearch的性能,牛的没法说!!!奈何和它相见恨晚点击进入官网中文文档一.使用场景全文检索-像淘宝京东类似的网上商城,当我们在在搜索框搜索某个商品名称时,网络没有问题的话,获取响应的速度,几乎和我们键盘起落的速度是一致的......
  • 监控Elasticsearch的关键指标
    Elasticsearch的核心职能就是对外提供搜索服务,所以搜索请求的吞吐和延迟是非常关键的,搜索是靠底层的索引实现的,所以索引的性能指标也非常关键,Elasticsearch由一个或多个节点组成集群,集群自身是否健康也是需要我们监控的。lasticSearch的架构非常简单,一个节点就可以对外提供服务,......