-- 定义 source 表
CREATE TABLE source_orgcode_info (
ID BIGINT,
ORGANIZATION_NAME varchar(64),
ORG_CODE varchar(8),
PRIMARY KEY(ID) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.10.100',
'port' = '3700',
'username' = 'test',
'password' = 'test123456',
'database-name' = 'test',
'table-name' = 'orgcode_info'
);
-- 定义sink表
drop table IF EXISTS orgcode_info;
CREATE TABLE IF NOT EXISTS orgcode_info
(ID BIGINT,
ORGANIZATION_NAME varchar(64),
ORG_CODE varchar(8))
with (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://192.168.10.100:3600/test',
'connector.table' = 'orgcode_info',
'connector.username' = 'test',
'connector.password' = 'test123456');
-- 数据从mysql 插入 mysql
insert into orgcode_info select ID ,ORGANIZATION_NAME ,ORG_CODE from source_orgcode_info ;
source表设置
mysql有主见,需要加 PRIMARY KEY(ID) NOT ENFORCED 关键字
如果mysql没有设置主键,with里面要加'scan.incremental.snapshot.enabled' = 'false'否则会报错:
参考原文链接:https://blog.csdn.net/lbship/article/details/114839662
标签:info,cdc,flink,connector,source,orgcode,mysql,ID From: https://www.cnblogs.com/whiteY/p/16937477.html