一、mysql数据准备
mysql -hip -uroot -p密码
CREATE DATABASE flink;
USE flink;
CREATE TABLE user (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512),
email VARCHAR(255)
);
INSERT INTO user VALUES (110,"user_110","Shanghai","123567891230","user_110@foo.com");
INSERT INTO user VALUES (111,"user_111","Shanghai","123567891231","user_111@foo.com");
INSERT INTO user VALUES (112,"user_112","Shanghai","123567891232","user_112@foo.com");
查看数据
select * from user;
+-----+----------+----------+--------------+------------------+
| id | name | address | phone_number | email |
+-----+----------+----------+--------------+------------------+
| 110 | user_110 | Shanghai | 123567891230 | user_110@foo.com |
| 111 | user_111 | Shanghai | 123567891231 | user_111@foo.com |
| 112 | user_112 | Shanghai | 123567891232 | user_112@foo.com |
二、Flink环境准备
1、版本1.14.5
2、配置修改
cp /usr/local/service/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core*.jar /usr/local/service/flink/lib/ cp /usr/local/service/hive/lib/hive-exec-2.3.7.jar /usr/local/service/flink/lib/ flink-sql-connector-mysql-cdc-2.4.1.jar flink-sql-connector-hive-2.3.6_2.12-1.14.5.jar flink-sql-connector-mysql-cdc-2.4.1.jar iceberg-flink-runtime-1.14-0.13.2.jar 3、开启flink yarn,需要切换到hadoop用户,不能用root。 yarn-session.sh -s 1 -jm 1024 -tm 2048 4、新打开终端,使用flink客户端连接集群 sql-client.sh embedded -s yarn-session 5、设置checkpoint,每3秒一次 SET execution.checkpointing.interval = 3s; 6、创建source表use default_catalog;
CREATE TABLE user_source (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'ip地址',
'port' = '3306',
'username' = 'root',
'password' = '密码',
'database-name' = 'flink',
'table-name' = 'user'
);
7、创建catalog
drop catalog hive_catalog;
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://172.21.0.80:7004',
'clients'='5',
'property-version'='1',
'hive-conf-dir' = '/usr/local/service/hive/conf',
'hive-version' = '2.3.7',
'default-database' = 'default',
'warehouse'='hdfs://HDFS8002254/usr/hive/warehouse/hive_catalog'
);
CREATE TABLE `hive_catalog`.`default`.`sample` (
id BIGINT COMMENT 'unique id',
data STRING
);
INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/usr/local/service/hive/conf',
'hive-version' = '2.3.7'
);
8、创建iceberg table
use catalog hive_catalog;
CREATE TABLE user_sink (
database_name STRING,
table_name STRING,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (`id`) NOT ENFORCED);
9、流式写入数据
select * from `default_catalog`.`default_database`.`user_source`;
INSERT INTO `hive_catalog`.`default`.`user_sink` select * from
`default_catalog`.`default_database`.`user_source`;
10、实时查看数据
SELECT * FROM `hive_catalog`.`default`.`user_sink`;
参考文档:
1、基于 Flink CDC 同步 MySQL 分库分表构建实时数据湖
https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/build-real-time-data-lake-tutorial-zh.html#icebergh
2、腾讯伙伴支持
标签:flink,STRING,default,Flink,hive,catalog,user,操作,客户端 From: https://www.cnblogs.com/robots2/p/17797197.html