- 运行 docker-compose.yml 搭建数据库源,官方 mysql 样例数据源无法启动,改用其他 mysql 镜像
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=1234
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: mysql
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
elasticsearch:
image: elastic/elasticsearch:7.6.0
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: elastic/kibana:7.6.0
ports:
- "5601:5601"
-
在 maven 仓库中没有找到 flink-1.8.0 版本的 elasticsearch 包,改用下载 flink-1.7.0 版本,bin 目录没有 bat 文件,需要上传到 linux 环境执行
-
服务器安装 java-11-openjdk,java 8 版本运行会报错,java 17 版本运行 flink-1.7.0 也会报反射相关的错误,所有用官方推荐的 java 11 版本
-
默认安装的 mysql 需要先修改时区,不然同步数据时会报错。时区永久生效需要写入 my.cnf
SET GLOBAL time_zone = 'Asia/Shanghai';
- 修改 flink 配置 conf/flink-conf.yaml,web 端默认监听 127.0.0.1,改成 0.0.0.0
rest.bind-address: 0.0.0.0
- 下载 jar 包,放入到 flink lib 目录下
flink-sql-connector-postgres-cdc-2.4.2.jar
flink-sql-connector-mysql-cdc-2.4.2.jar
flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
- 启动 flink
./start-cluster.sh
./sql-client.sh
-
录入官方数据库样例数据
-
flink 创建表,官方给的 shipments 样例数据会报错, shipments 需要添加一行
'slot.name' = 'flink'
- 其他步骤参考官方教程,可以顺利同步数据到 elasticsearch
参考链接:
基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL