首页 > 数据库 >flink-cdc实时同步(oracle to mysql)

flink-cdc实时同步(oracle to mysql)

时间:2024-08-13 15:54:43浏览次数:13  
标签:__ GRANT flink oracle flinkuser mysql cdc SELECT

Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracle 的支持。

Flink下载地址

https://flink.apache.org/downloads/

其他必需的jar包(cdc、jdbc、mysq和oracle等驱动包)

 下载Flink后,直接解压到指定目录下即可;

tar zxvf flink-1.20.0-bin-scala_2.12.tgz

 将所有必须的jar包放在lib目录下,我这边的目录为/u01/flink-1.20.0/lib;

启动flink:

[root@gcv-b-test-gmes-oracle bin]# /u01/flink-1.20.0/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host gcv-b-test-gmes-oracle.
Starting taskexecutor daemon on host gcv-b-test-gmes-oracle.

 

如果需要web登录查看flink,需要修改配置文件(/u01/flink-1.20.0/conf/config.yaml)

address: 10.240.12.219
bind-address: 0.0.0.0

 

登录web界面:

 

配置Oracle:

必须开启归档(步骤查资料);

测试用户及表
create user flink identified by "123456";
grant connect ,resource to flink;
create table flink.user_info(id number primary key,name varchar2(100),age number);

##开启数据库级别补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
##开启该表的列附加日志
ALTER TABLE flink.user_info ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

创建用于cdc解析的表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/oradata/sharedb/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

创建flinkuser复制用户
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
  GRANT CREATE SESSION TO flinkuser;
  GRANT SET CONTAINER TO flinkuser;
  GRANT SELECT ON V_$DATABASE to flinkuser;
  GRANT FLASHBACK ANY TABLE TO flinkuser;
  GRANT SELECT ANY TABLE TO flinkuser;
  GRANT SELECT_CATALOG_ROLE TO flinkuser;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
  GRANT SELECT ANY TRANSACTION TO flinkuser;
  GRANT LOGMINING TO flinkuser;
  GRANT ANALYZE ANY TO flinkuser;

  GRANT CREATE TABLE TO flinkuser;
  -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
  GRANT LOCK ANY TABLE TO flinkuser;
  GRANT ALTER ANY TABLE TO flinkuser;
  GRANT CREATE SEQUENCE TO flinkuser;

  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

  GRANT SELECT ON V_$LOG TO flinkuser;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
  GRANT SELECT ON V_$LOGFILE TO flinkuser;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

  

 

 

启动sql-client(SQL 客户端 的目的是提供一种简单的方式来编写、调试和提交表程序到 Flink 集群上,而无需写一行 Java 或 Scala 代码。SQL 客户端命令行界面(CLI) 能够在命令行中检索和可视化分布式应用中实时产生的结果)

我的理解就是帮你智能生成java代码,不需要自己写代码。

[root@gcv-b-test-gmes-oracle conf]# /u01/flink-1.20.0/bin/sql-client.sh

                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░

    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|

        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Command history file path: /root/.flink-sql-history

Flink SQL>

  

配置Oracle连接器:

CREATE TABLE user_info (
     ID INT NOT NULL,
     NAME STRING,
     AGE int,
     PRIMARY KEY(ID) NOT ENFORCED
     ) WITH (
     'connector' = 'oracle-cdc',
     'hostname' = '10.240.12.219',
     'port' = '1521',
     'username' = 'flinkuser',
     'password' = 'flinkpw',
     'database-name' = 'sharedb',
     'schema-name' = 'FLINK',
     'table-name' = 'USER_INFO',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true'
);
##这里有个坑,字段必须大写啊(因为oracle默认都是大写,这里是严格区分大小写的)
##如果大小写不一致,会识别不到字段。查询的时候报错如下:
##org.apache.flink.table.api.TableException: Column 'id' is NOT NULL,
##however, a null value is being written into it.)

  

 查看数据:

Flink SQL> select * from user_info;
[INFO] Result retrieval cancelled.

  

 

mysql数据库同步的表:

create database flink;

CREATE TABLE `user_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(100) NOT NULL,
  `age` bigint(20) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

 

sql-client配置mysql:

CREATE TABLE user_info_mysql (
     id INT NOT NULL,
     name STRING,
     age int,
     PRIMARY KEY(id) NOT ENFORCED
 ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:mysql://10.251.93.3:3306/flink',
	'driver' = 'com.mysql.cj.jdbc.Driver',
     'username' = 'rdsroot',
     'password' = '757mmc%#%',
     'table-name' = 'user_info');


Insert into user_info_mysql select * from user_info;

  

 

验证:

oracle:插入数据

 mysql验证:

 

感觉上手难度不大,有些jar包容易漏,导致异常。

 

参考文档:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/flink-sources/tutorials/oracle-tutorial/

                  https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/flink-sources/oracle-cdc/

 

标签:__,GRANT,flink,oracle,flinkuser,mysql,cdc,SELECT
From: https://www.cnblogs.com/muzisanshi/p/18356836

相关文章

  • 【华为云MySQL技术专栏】MySQL 8.0事务提交原理解析!
    摘要:当多个引擎/节点同时访问和修改数据时,如何保证数据在各个引擎/节点之间的一致性成为了一项挑战。本文将深入探讨MySQL集群在保持数据一致性的解决方案。本文分享自华为云社区《【华为云MySQL技术专栏】MySQL8.0事务提交原理解析!》,作者:GaussDB数据库。 1.概述MySQL是一......
  • prometheus监控mysql数据库
    监控需要安装一个工具mysqld_exportermysqld_exporter-0.14.0.linux-386.tar.gz上传到服务器后,解压压缩包tar-zxvf mysqld_exporter-0.14.0.linux-386.tar.gzcd mysqld_exporter-0.14.0.linux-386touchmy.cnfvimy.cnf加入有权限的用户名,密码,数据库地址等信息。[cli......
  • 【原创】java+swing+mysql校园表白墙系统设计与实现
    个人主页:程序员杨工个人简介:从事软件开发多年,前后端均有涉猎,具有丰富的开发经验博客内容:全栈开发,分享Java、Python、Php、小程序、前后端、数据库经验和实战开发背景:昨天七夕,大家都去约会了,趁着有时间写了一个校园表白墙系统。在校园环境中,学生们正处于青春期,情感丰富且......
  • 【原创】java+swing+mysql简单图书信息管理系统设计与实现
    个人主页:程序员杨工个人简介:从事软件开发多年,前后端均有涉猎,具有丰富的开发经验博客内容:全栈开发,分享Java、Python、Php、小程序、前后端、数据库经验和实战开发背景:编程小白们刚入门,尤其在学了一点java的基础之后,想通过自己动手来实现一个比较基础的小项目,由于编程经验......
  • mysql: 用户权限的操作
    一,查看mysql内置的权限有哪些?SHOWPRIVILEGES;如图:二,管理给用户的权限1,授予权限:mysql>GRANTSELECT,INSERT,DELETE,UPDATEONnews.*TO'laoliu'@'127.0.0.1';QueryOK,0rowsaffected(0.01sec)2,查询指定用户有哪些权限?mysql>showgrantsfor'laoliu'......
  • pbootcms网站是使用sqlite数据库好还是使用mysql数据库好?
    众多周知pbootcms程序支持sqlite数据库和mysql数据库,目前默认常用最多的是sqlite数据库,有需要转成mysql数据库的可以联系我们。pbootcms数据库sqlite无缝转换mysql数据库 本人从接触pbootcms开始一直都是使用mysql数据库,很少出现被黑和各种不明原因报错。建议有条件的朋友尽量......
  • mysql: auth_socket登录
    一,默认安装的mysql用户root是auth_socket方式登录root@localhost的authentication_string为空,   它的plugin为auth_socket二,如何登录?1,从命令行正常登录会报错:liuhongdi@lhdpc:/data/site/gsapi$mysql-uroot-hlocalhost-pEnterpassword:ERROR1698(28000):......
  • Flink1.19 JobSubmitHandler源码解析
    文章目录概要整体架构流程概要JobGraph在客户端生成后,需要发送到服务端,首先会被JobSubmitHandler(WebMonitor内处理http请求的处理类)接收处理,然后会发送到Dispatcher进一步处理整体架构流程首先会进入JobSubmitHandler对象的handleRequest方法有两个参数:request:封......
  • mysql: 用户管理
    一,新建用户CREATEUSER'laoliu'@'127.0.0.1'IDENTIFIEDBY'laoliupassword'; 二,修改用户密码8.0.3及以后SETPASSWORDFOR'laoliu'@'127.0.0.1'='laoliupass1';8.0.3之前ALTERUSER'laoliu'@'127.......
  • mysql: Usage权限
    一,Usage权限的功能1,官方的解释可以看到官方的说明:无权限,只允许连接到数据库2,Usage是连接(登陆)权限,当建立一个用户时,就会自动授予其usage权限(默认授予)。该权限只能用于数据库登陆,不能执行任何操作;且usage权限不能被回收,也即REVOKE用户并不能删除用户。 二,测试:创建用户后......