Flink CDC (Change Data Capture) SQL 用于实现数据库的数据变更捕获,并通过 SQL 接口进行处理。以下是一个基本的示例,全量+增量数据mysql同步到clickhouse,展示如何使用 Flink CDC SQL 进行数据同步。 首先,确保你有 Flink 和 Flink CDC 的环境配置好。
1.mysql测试source表(准备)
CREATE TABLE `game_type` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`type_name` varchar(100) DEFAULT NULL COMMENT '游戏类型名称',
`name_json` text COMMENT '多语言名称json数据',
`home_icon` varchar(255) DEFAULT NULL COMMENT '首页图标',
`icon` varchar(255) DEFAULT NULL COMMENT '图标',
`icon_active` varchar(255) DEFAULT NULL COMMENT '选择图标',
`status` int DEFAULT NULL COMMENT '状态:0:启用 1:禁用',
`sort_no` int DEFAULT NULL COMMENT '排序',
`operator` varchar(255) DEFAULT NULL COMMENT '操作人',
`gmt_create` bigint DEFAULT NULL COMMENT '创建时间',
`gmt_update` bigint DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=42 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='三方游戏类型配置';
2.Clickhouse测试sink表(准备)
-- pplive_test.game_type_local definition 分布式表
CREATE TABLE pplive_test.game_type_local
(
`id` Int64 COMMENT '主键',
`type_name` Nullable(String) COMMENT '游戏类型名称',
`name_json` String COMMENT '多语言名称json数据',
`icon` Nullable(String) COMMENT '图标',
`icon_active` Nullable(String) COMMENT '选择图标',
`status` Nullable(Int64) COMMENT '状态:0:启用 1:禁用',
`sort_no` Nullable(Int32) COMMENT '排序'
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/pplive_test.game_type_local/{shard}',
'{replica}')
ORDER BY id
SETTINGS index_granularity = 8192
COMMENT '三方游戏类型配置';
-- pplive_test.game_type_local definition 本地表
CREATE TABLE IF NOT EXISTS pplive_test.game_type ON CLUSTER default_cluster as pplive_test.game_type_local
ENGINE = Distributed(default_cluster, pplive_test, game_type_local, rand());
3.Demo代码
package org.example;
import org.apache.flink.connector.clickhouse.ClickHouseDynamicTableSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* Flink-SQL 方式
* 如果是相对简单的job,对数据不做任何处理,或者涉及表较少时,选择Flink-SQL/CLI 方式方式较为便捷
*/
public class FlinkCDC_Sql_MysqlToCk_demo2 {
public static void main(String[] args) throws Exception {
//1.创建flinkCDC执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.getConfig().getConfiguration().setString("pipeline.name", FlinkCDC_Sql_MysqlToCk_demo2.class.getName());
// 配置MySQL CDC源
String sourceDDL = "" +
" CREATE TABLE game_type_source ( \n" +
" id BIGINT ,\n" +
" type_name STRING ,\n" +
" name_json STRING ,\n" +
" home_icon STRING ,\n" +
" icon STRING ,\n" +
" icon_active STRING ,\n" +
" status INT ,\n" +
" sort_no INT ,\n" +
" operator STRING ,\n" +
" gmt_create BIGINT ,\n" +
" gmt_update BIGINT ,\n" +
" PRIMARY KEY (`id`) NOT ENFORCED \n" +
") WITH ( \n" +
" 'connector' = 'mysql-cdc' ,\n" +
" 'hostname' = '127.0.0.1',\n" +
" 'port' = '3306' ,\n" +
" 'username' = 'root' ,\n" +
" 'password' = '123456' ,\n" +
" 'database-name' = 'live' ,\n" +
" 'table-name' = 'game_type'\n" +
")";
tableEnv.executeSql(sourceDDL);
// 配置ClickHouse sink
String sinkDDL = "" +
"CREATE TABLE game_type_sink (\n" +
" id BIGINT ,\n" +
" type_name STRING ,\n" +
" name_json STRING ,\n" +
" icon STRING ,\n" +
" icon_active STRING ,\n" +
" status INTEGER ,\n" +
" sort_no INTEGER ,\n" +
" PRIMARY KEY (`id`) NOT ENFORCED \n" +
") WITH (" +
" 'connector' = 'clickhouse',\n" +
" 'database-name' = 'pplive_test',\n" +
" 'table-name' = 'game_type',\n" +
" 'url' = 'clickhouse://13.229.64.238:18123/',\n" +
" 'username' = 'testzone',\n" +
" 'password' = 'zck8aec1',\n" +
" 'sink.batch-size' = '10',\n" + //最大刷新大小,超过此大小将刷新数据。
" 'sink.flush-interval' = '1000',\n" + //Buffer刷新时间间隔,取值范围为 1000 ms~3600000 ms。
" 'sink.max-retries' = '3'\n" + //最大重试次数,取值范围为0~10。
")";
tableEnv.executeSql(sinkDDL);
// //数据打印 查询输出并转换流输出
// String query = "select cast(id as BIGINT),type_name,name_json,icon,icon_active,status,sort_no from game_type_source";
// tableEnv.executeSql(query).print();
// Table table = tableEnv.sqlQuery(query);
// 编写SQL查询 查询输出并转换流输出
String transformSQL = "insert into game_type_sink select cast(id as BIGINT),type_name,name_json,icon,icon_active,status,sort_no from game_type_source";
TableResult tableResult = tableEnv.executeSql(transformSQL);
// 等待flink-cdc完成快照
tableResult.print();
env.execute("sync-flink-cdc");
}
}
4.POM.XML
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink-version>1.18.1</flink-version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.31</version>
</dependency>
<!-- clickhouse jdbc driver -->
<!--clickhouse jdbc连接-->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-patch11</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-clickhouse</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
</dependencies>
标签:COMMENT,name,org,flink,game,mysql,FlinkCDCSQL,type,clickhouse
From: https://www.cnblogs.com/kods/p/18278598