flink 版本: 1.6.0
hive 版本: 3.1.2
sqlSubmit commit
- commit 0de42fabc1a639c3256ad7d074af72f8f2453be5 修改了sql 文件中的 set 命令(set table.sql-dialect=hive;)执行位置, 从 TableConfUtil 统一执行改为按照sql 文件位置执行
原因: 同一sql 文件中的sql 可能需要不同的 sql dialect, 特别是hive 相关的
比如如下SQL, 读取 hive test.user_log,写另一张 hive 表 test.user_log_1
set execution.runtime-mode=BATCH;
set table.sql-dialect=hive;
drop table if exists myHive.test.user_log_1;
CREATE TABLE myHive.test.user_log_1 (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
) PARTITIONED BY (ds STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 min',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
set table.sql-dialect=default;
insert into myHive.test.user_log_1
select * from myHive.test.user_log;
配置 hive catalog
flink 启用 hive catalog 后,可以直接查询 hive 表
val catalog = new HiveCatalog(paraTool.get(Constant.HIVE_CATALOG_NAME), paraTool.get(Constant.HIVE_DEFAULT_DATABASE), paraTool.get(Constant.HIVE_CONFIG_PATH))
tabEnv.registerCatalog(paraTool.get(Constant.HIVE_CATALOG_NAME), catalog)
tabEnv.useCatalog(paraTool.get(Constant.HIVE_CATALOG_NAME))
读 hive
hive 准备
登录 hive 客户端,创建数据库、表,写入数据
create database test;
use test;
CREATE TABLE user_log (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
) PARTITIONED BY (ds STRING)
STORED AS parquet;
insert into user_log
select '001', '001', '001', 'by', '2023-02-02' union all
select '002', '001', '001', 'by', '2023-02-02' union all
select '003', '001', '001', 'by', '2023-02-02' union all
select '004', '001', '001', 'by', '2023-02-02' union all
select '005', '001', '001', 'by', '2023-02-02' union all
select '006', '001', '001', 'by', '2023-02-02'
;
flink 任务
demo 任务,print 结果了
-- read hive, write to print -- batch when read complete, job finish
-- sink
drop table if exists read_hiv_sink;
CREATE TABLE read_hiv_sink (
user_id VARCHAR
,item_id VARCHAR
,category_id VARCHAR
,behavior VARCHAR
,ds VARCHAR
) WITH (
'connector' = 'print'
);
-- set streaming-source.enable = false;
-- set execution.runtime-mode = batch;
insert into read_hiv_sink
select user_id, item_id, category_id, behavior, ds
from myHive.test.user_log;
输出结果:
+I[001, 001, 001, by, 2023-02-02]
+I[002, 001, 001, by, 2023-02-02]
+I[003, 001, 001, by, 2023-02-02]
+I[004, 001, 001, by, 2023-02-02]
+I[005, 001, 001, by, 2023-02-02]
+I[006, 001, 001, by, 2023-02-02]
flink 写 hive 非分区表
hive 表
-- 普通表
CREATE TABLE user_log_no_partition (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
,ds STRING
)
STORED AS parquet;
-- 分区表
CREATE TABLE user_log (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
) PARTITIONED BY (ds STRING)
STORED AS parquet;
flink 任务
drop table if exists user_log;
CREATE TABLE user_log
(
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR
) WITH (
'connector' = 'datagen'
,'rows-per-second' = '200'
,'number-of-rows' = '10000'
,'fields.user_id.kind' = 'random'
,'fields.item_id.kind' = 'random'
,'fields.category_id.kind' = 'random'
,'fields.behavior.kind' = 'random'
,'fields.user_id.length' = '20'
,'fields.item_id.length' = '10'
,'fields.category_id.length' = '10'
,'fields.behavior.length' = '10'
);
-- insert into myHive.test.user_log_no_partition
insert into myHive.test.user_log
select user_id, item_id, category_id, behavior, DATE_FORMAT(now(), 'yyyy-MM-dd')
from user_log;
hive 普通表数据
hive> select * from test.user_log_no_partition limit 10;
OK
user_id item_id category_id behavior ds
001 001 001 by 2023-02-02
002 001 001 by 2023-02-02
003 001 001 by 2023-02-02
004 001 001 by 2023-02-02
005 001 001 by 2023-02-02
006 001 001 by 2023-02-02
e3cf01a03b8703a83503 c1dea1d0b1 0dde257867 f44be1f275 2023-02-02
a81fac74643718fb0224 c303ab08a3 0c2afab7e8 c38ea617de 2023-02-02
02335798857badc2c713 91b0103a93 0f2362982c e4d9c6a9cf 2023-02-02
e13d14a06dbe64fde111 718630b459 4bee13742b 2eb9ee92c9 2023-02-02
Time taken: 0.485 seconds, Fetched: 10 row(s)
hive 分区表数据
hive> select * from user_log limit 10;
OK
user_id item_id category_id behavior ds
19dae5cd466f644935ff b84d6771b4 8dde856e3c 51dce16207 2023-02-02
2da4b0746a3d6b20793b 1c8f23cfea 969ab8ddc8 36fc80cf42 2023-02-02
cb460f98110d2bed28ac 592206f590 4f64d002b2 bb6359a7ff 2023-02-02
2fe0d99fb0a1fbd6f9b1 7a01902c67 e1c56a93b9 14ca4a6e09 2023-02-02
28ffd05b1fe84a363711 de57409f49 536fd35dc6 2c9c0880db 2023-02-02
304736147f0fb968fa7a 6868c7776d 52615aab5d 8a59f0284f 2023-02-02
a7391ba0aab507bb28a3 a1529947ac 0f228939d5 5905efec83 2023-02-02
973f74170e66714c7221 1b6273b98a c191d93bb4 0cbcbf4832 2023-02-02
2637ca991c4a6f090106 fb7810dffd 61067a6c5b f90e7b3553 2023-02-02
be3f2df8f9210cf092c0 9322fbfc57 a0fa4a467f beeeb657ca 2023-02-02
Time taken: 0.467 seconds, Fetched: 10 row(s)
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文