首页 > 数据库 >FlinkCDCSQL数据同步mysql->clickhouse

FlinkCDCSQL数据同步mysql->clickhouse

时间:2024-07-01 18:31:30浏览次数:21  
标签:COMMENT name org flink game mysql FlinkCDCSQL type clickhouse

Flink CDC (Change Data Capture) SQL 用于实现数据库的数据变更捕获,并通过 SQL 接口进行处理。以下是一个基本的示例,全量+增量数据mysql同步到clickhouse,展示如何使用 Flink CDC SQL 进行数据同步。 首先,确保你有 Flink 和 Flink CDC 的环境配置好。

1.mysql测试source表(准备)

CREATE TABLE `game_type` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `type_name` varchar(100) DEFAULT NULL COMMENT '游戏类型名称',
  `name_json` text COMMENT '多语言名称json数据',
  `home_icon` varchar(255) DEFAULT NULL COMMENT '首页图标',
  `icon` varchar(255) DEFAULT NULL COMMENT '图标',
  `icon_active` varchar(255) DEFAULT NULL COMMENT '选择图标',
  `status` int DEFAULT NULL COMMENT '状态:0:启用 1:禁用',
  `sort_no` int DEFAULT NULL COMMENT '排序',
  `operator` varchar(255) DEFAULT NULL COMMENT '操作人',
  `gmt_create` bigint DEFAULT NULL COMMENT '创建时间',
  `gmt_update` bigint DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=42 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='三方游戏类型配置';

2.Clickhouse测试sink表(准备)

-- pplive_test.game_type_local definition  分布式表

CREATE TABLE pplive_test.game_type_local  
(

    `id` Int64 COMMENT '主键',

    `type_name` Nullable(String) COMMENT '游戏类型名称',

    `name_json` String COMMENT '多语言名称json数据',

    `icon` Nullable(String) COMMENT '图标',

    `icon_active` Nullable(String) COMMENT '选择图标',

    `status` Nullable(Int64) COMMENT '状态:0:启用 1:禁用',

    `sort_no` Nullable(Int32) COMMENT '排序'
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/pplive_test.game_type_local/{shard}',
 '{replica}')
ORDER BY id
SETTINGS index_granularity = 8192
COMMENT '三方游戏类型配置';


-- pplive_test.game_type_local definition  本地表

CREATE TABLE IF NOT EXISTS  pplive_test.game_type  ON CLUSTER default_cluster as pplive_test.game_type_local
ENGINE = Distributed(default_cluster, pplive_test, game_type_local, rand());

3.Demo代码

package org.example;

import org.apache.flink.connector.clickhouse.ClickHouseDynamicTableSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


/**
 * Flink-SQL 方式
 * 如果是相对简单的job,对数据不做任何处理,或者涉及表较少时,选择Flink-SQL/CLI 方式方式较为便捷
 */
public class FlinkCDC_Sql_MysqlToCk_demo2 {
    public static void main(String[] args) throws Exception {
        //1.创建flinkCDC执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.getConfig().getConfiguration().setString("pipeline.name", FlinkCDC_Sql_MysqlToCk_demo2.class.getName());

        // 配置MySQL CDC源
        String sourceDDL = "" +
                " CREATE TABLE game_type_source  ( \n" +
                "  id BIGINT ,\n" +
                "  type_name STRING ,\n" +
                "  name_json STRING ,\n" +
                "  home_icon STRING ,\n" +
                "  icon STRING ,\n" +
                "  icon_active STRING ,\n" +
                "  status INT ,\n" +
                "  sort_no INT ,\n" +
                "  operator STRING ,\n" +
                "  gmt_create BIGINT ,\n" +
                "  gmt_update BIGINT ,\n" +
                "  PRIMARY KEY (`id`) NOT ENFORCED \n" +
                ")  WITH ( \n" +
                "   'connector' = 'mysql-cdc' ,\n" +
                "   'hostname' = '127.0.0.1',\n" +
                "   'port' = '3306' ,\n" +
                "   'username' = 'root' ,\n" +
                "   'password' = '123456' ,\n" +
                "   'database-name' = 'live' ,\n" +
                "   'table-name' = 'game_type'\n" +
                ")";
        tableEnv.executeSql(sourceDDL);


        // 配置ClickHouse sink
        String sinkDDL = "" +
                "CREATE TABLE game_type_sink  (\n" +
                "  id BIGINT ,\n" +
                "  type_name STRING ,\n" +
                "  name_json STRING ,\n" +
                "  icon STRING ,\n" +
                "  icon_active STRING ,\n" +
                "  status INTEGER ,\n" +
                "  sort_no INTEGER ,\n" +
                "  PRIMARY KEY (`id`) NOT ENFORCED \n" +
                ") WITH (" +
                "   'connector' = 'clickhouse',\n" +
                "   'database-name' = 'pplive_test',\n" +
                "   'table-name' = 'game_type',\n" +
                "   'url' = 'clickhouse://13.229.64.238:18123/',\n" +
                "   'username' = 'testzone',\n" +
                "   'password' = 'zck8aec1',\n" +
                "   'sink.batch-size' = '10',\n" +        //最大刷新大小,超过此大小将刷新数据。
                "   'sink.flush-interval' = '1000',\n" +   //Buffer刷新时间间隔,取值范围为 1000 ms~3600000 ms。
                "   'sink.max-retries' = '3'\n" +          //最大重试次数,取值范围为0~10。
                ")";
        tableEnv.executeSql(sinkDDL);

//        //数据打印  查询输出并转换流输出
//        String query = "select cast(id as BIGINT),type_name,name_json,icon,icon_active,status,sort_no from game_type_source";
//        tableEnv.executeSql(query).print();
//        Table table = tableEnv.sqlQuery(query);

        // 编写SQL查询  查询输出并转换流输出
        String transformSQL = "insert into game_type_sink select cast(id as BIGINT),type_name,name_json,icon,icon_active,status,sort_no from game_type_source";
        TableResult tableResult = tableEnv.executeSql(transformSQL);

        // 等待flink-cdc完成快照
        tableResult.print();
        env.execute("sync-flink-cdc");
    }
}

4.POM.XML


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink-version>1.18.1</flink-version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>


<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.31</version>
        </dependency>
        <!-- clickhouse jdbc driver -->

        <!--clickhouse jdbc连接-->
        <dependency>
            <groupId>com.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.3.2-patch11</version>
        </dependency>

        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.3.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.1.1-jre</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-clickhouse</artifactId>
            <version>1.16.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

标签:COMMENT,name,org,flink,game,mysql,FlinkCDCSQL,type,clickhouse
From: https://www.cnblogs.com/kods/p/18278598

相关文章

  • MySQL 使用 ALTER TABLE 语句修改表结构的方法
    MySQL使用ALTERTABLE语句修改表结构的方法基本概念与作用作用说明完整代码示例示例一:添加新列示例二:修改列的数据类型示例三:删除列示例四:重命名列示例五:添加约束示例六:删除约束示例七:更改表名示例八:更改表的存储引擎功能使用思路实际工作开发技巧在数据库开发......
  • MySQL Public Key Retrieval is not allowed 解决指南
    MySQLPublicKeyRetrievalisnotallowed解决指南基本概念与作用说明完整代码示例与解决方案示例一:检查用户权限示例二:检查KMS配置示例三:检查加密列定义示例四:重置密钥功能使用思路与最佳实践实际工作开发技巧在现代数据库管理中,加密和密钥管理是保障数据安全......
  • MySQL主从复制与读写分离
    一、MySQL主从复制概述1.MySQL主从复制原理MySQL的主从复制和读写分离紧密相连,首先部署主从复制,才能在此基础上进行读写分离。2.MySQL支持的复制类型基于语句的复制:在主服务器上执行的语句,在从服务器上执行同样语句。MySQL默认采用该语句,效率较高。基于行的复制:把改变的......
  • MySQL中update语法的使用(超详细)
    在MySQL中,UPDATE 语句用于修改已存在的表中的记录。以下是对 UPDATE 语句的详细解释和使用方法:语法UPDATEtable_nameSETcolumn1=value1,column2=value2,...WHEREcondition;table_name:要更新的表名。SET:用于指定要更新的列和新的值。column1,column2,.........
  • mysql用户
    一、数据库用户管理DCL:数据控制语言,用于设置或者更改数据库用户或角色权限1.新建用户createuser'用户名'@'源地址'identifiedby'密码';'用户名'指定将创建的用户名'来源地址'指定新创建的用户可在哪些主机上登录可使用IP地址、网段、主机名的形式本地用户可用l......
  • 深入MySQL锁机制:原理、死锁解决及Java防范技巧
    引言在数据库系统中,锁机制是为了保证数据一致性和完整性的重要手段。MySQL作为广泛使用的关系型数据库管理系统,其锁机制尤为重要。本文将详细介绍MySQL的锁机制原理及实现,并说明在生产环境中如何解决死锁问题,以及在后续开发中如何编写Java代码避免死锁。MySQL锁机制概述MySQ......
  • 大厂面试官问我:在同步binlog的时候主库是一个时间,从库是一个时间,底层是怎么解决的?【后
    本文为【Mysql日志八股文合集(2)】初版,后续还会进行优化更新,欢迎大家关注交流~大家第一眼看到这个标题,不知道心中是否有答案了?在面试当中,面试官经常对项目亮点进行深挖,来考察你对这个项目亮点的理解以及思考!这个时候,你如果可以回答出面试官的问题,甚至是主动说出自己的思考,那在......
  • MySQL索引怎么优化
    索引优化无非就是两点:把SQL的写法进行优化,对于无法应用索引,或导致出现大数据量检索的语句,改为精准匹配的语句。对于合适的字段上建立索引,确保经常作为查询条件的字段,可以命中索引去检索数据。连接查询时尽量不关联太多表关联太多会导致执行效率变慢多表查询时一定要以大驱......
  • MySQL中的网络命名空间支持
    NetworkNamespaceSupport(网络命名空间支持)提供了在Linux系统中创建和管理多个隔离网络空间的能力。网络命名空间是来自主机系统的网络堆栈的逻辑副本。网络命名空间对于设置容器或虚拟环境非常有用。每个名称空间都有自己的IP地址、网络接口、路由表等等。默认命名空间或全......
  • clickhouse集群及单节点库表占用存储
    1、单节点查询库表存储占用‘system’:库名SELECT  databaseAS`库名`,  tableAS`表名`,  sum(rows)AS`总行数`,  formatReadableSize(sum(data_uncompressed_bytes))AS`原始大小`,  formatReadableSize(sum(data_compressed_bytes))AS`压......