首页 > 其他分享 >Flink客户端操作

Flink客户端操作

时间:2023-10-30 10:37:01浏览次数:22  
标签:flink STRING default Flink hive catalog user 操作 客户端

一、mysql数据准备

mysql -hip -uroot -p密码
CREATE DATABASE flink;
USE flink;
CREATE TABLE user (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512),
email VARCHAR(255)
);
INSERT INTO user VALUES (110,"user_110","Shanghai","123567891230","[email protected]");
INSERT INTO user VALUES (111,"user_111","Shanghai","123567891231","[email protected]");
INSERT INTO user VALUES (112,"user_112","Shanghai","123567891232","[email protected]");
查看数据
select * from user;
+-----+----------+----------+--------------+------------------+
| id | name | address | phone_number | email |
+-----+----------+----------+--------------+------------------+
| 110 | user_110 | Shanghai | 123567891230 | [email protected] |
| 111 | user_111 | Shanghai | 123567891231 | [email protected] |
| 112 | user_112 | Shanghai | 123567891232 | [email protected] |

二、Flink环境准备

1、版本1.14.5

2、配置修改

cp /usr/local/service/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core*.jar /usr/local/service/flink/lib/ cp /usr/local/service/hive/lib/hive-exec-2.3.7.jar /usr/local/service/flink/lib/ flink-sql-connector-mysql-cdc-2.4.1.jar flink-sql-connector-hive-2.3.6_2.12-1.14.5.jar flink-sql-connector-mysql-cdc-2.4.1.jar iceberg-flink-runtime-1.14-0.13.2.jar 3、开启flink yarn,需要切换到hadoop用户,不能用root。 yarn-session.sh -s 1 -jm 1024 -tm 2048 4、新打开终端,使用flink客户端连接集群 sql-client.sh embedded -s yarn-session 5、设置checkpoint,每3秒一次 SET execution.checkpointing.interval = 3s; 6、创建source表
use default_catalog;
CREATE TABLE user_source (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'ip地址',
'port' = '3306',
'username' = 'root',
'password' = '密码',
'database-name' = 'flink',
'table-name' = 'user'
);
7、创建catalog
drop catalog hive_catalog;
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://172.21.0.80:7004',
'clients'='5',
'property-version'='1',
'hive-conf-dir' = '/usr/local/service/hive/conf',
'hive-version' = '2.3.7',
'default-database' = 'default',
'warehouse'='hdfs://HDFS8002254/usr/hive/warehouse/hive_catalog'
);

CREATE TABLE `hive_catalog`.`default`.`sample` (
id BIGINT COMMENT 'unique id',
data STRING
);

INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');

CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/usr/local/service/hive/conf',
'hive-version' = '2.3.7'
);

8、创建iceberg table

use catalog hive_catalog;

CREATE TABLE user_sink (
database_name STRING,
table_name STRING,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (`id`) NOT ENFORCED);

9、流式写入数据

select * from `default_catalog`.`default_database`.`user_source`;

INSERT INTO `hive_catalog`.`default`.`user_sink` select * from
`default_catalog`.`default_database`.`user_source`;

10、实时查看数据

SELECT * FROM `hive_catalog`.`default`.`user_sink`;

 

 

 

 

参考文档:

1、基于 Flink CDC 同步 MySQL 分库分表构建实时数据湖

https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/build-real-time-data-lake-tutorial-zh.html#icebergh

2、腾讯伙伴支持

标签:flink,STRING,default,Flink,hive,catalog,user,操作,客户端
From: https://www.cnblogs.com/robots2/p/17797197.html

相关文章

  • Python 利用pymysql和openpyxl操作MySQL数据库并插入Excel数据
    1.需求分析本文将介绍如何使用Python连接MySQL数据库,并从Excel文件中读取数据,将其插入到MySQL数据库中。2.环境准备在开始本文之前,请确保您已经安装好了以下环境:Python3.xPyMySQL库openpyxl库MySQL数据库3.连接MySQL数据库我们可以使用pymysql库来连接MySQL数据库......
  • EDA工具使用+GIT操作+python编程+C语言编程+Riscv相关+TCL操作
    EDA工具使用Verdi覆盖率转网页urg-full64-dirsimv.vdbVerdi加载sessionverdi-ssrsessionFileVcs分部编译额外选项-partcomp:自动分块编译。-fastpartcomp:使用多核计算系统并行部分编译。-pcmakeprof:查看每部分编译占用的时间,方便对时间更久的进行拆分。-partc......
  • bs64的相关操作
    ......
  • Linux操作系统学习3
    上周学的是Linux操作系统中的文件权限中的基本权限,也就是UGO。这一周学的是基本权限后面的一些知识。主要是文件权限中的高级权限。有以下几个内容:SUID权限,SGID权限,Sticky权限。/usr/bin/passwd/,在这个文件中,第一行的第四个字符为“s”,这个s代的就是特殊权限,也就是SUID权限。任何......
  • 形态学操作--4.梯度运算
    ......
  • python文件操作
    课程目标掌握文本写入的语法掌握文本打开、读取的语法核心知识首先在当前目录下放一个test.txt文件文件读取f=open('test.txt','r',encoding='utf-8')print(f.read())f.close()文件写入除了write()写入语法,还有writelines()直接写入一个列表f=open('test.txt......
  • ensp 简单配置路由添加ip操作小实例
    ensp简单配置路由添加ip操作displaycu#查看路由器配置displayiprouting-table[x.x.x.x]#查看路由表【和x.x.x.x相关的条】案例一、2个路由器配置ip地址给2个路由器配合ip地址,方法一样ipaddressx.x.x.x.x掩码24(255.225.255.0)1.启动2个路由器,有......
  • Linux操作系统 no.2
    一.用户/组:1.创建用户、组:  useradd  groupadd2.删除用户:  userdel3.修改用户密码:  passwd 二.文件权限:chowm:修改文件属主,属组。chgrp:修改文件属组。chmod: 修改文件权限。 chowm:改变文件属性chmod:改变文件访问方式  r(读取)  w(写入)  x(执......
  • Win10 ssh客户端 scp 传输linux文件到windows 端
    1Window10安装openssh服务2启动windowssshd服务netstartsshd3scplinux向windows传输文件Win10中打开powershell,登录ssh把linux下的shell01.sh传到window10的桌面上[email protected]:/C:/Users/admin/Desktop......
  • 操作系统随笔
    1.内核空间和用户空间内核空间定义:内核空间是操作系统内核运行的区域,它包括了操作系统内核代码、数据结构和设备驱动程序等。内核空间通常是操作系统中的一块保护内存区域,只有操作系统内核才能够访问这个区域。用户空间定义:用户空间是指用户应用程序运行的区域,包括用户应用程序......