首页 > 其他分享 >flink 1.16 读写 hive 表

flink 1.16 读写 hive 表

时间:2023-02-02 17:25:12浏览次数:42  
标签:1.16 02 flink hive 001 user 2023 id

flink 版本: 1.6.0
hive 版本: 3.1.2

sqlSubmit commit

  1. commit 0de42fabc1a639c3256ad7d074af72f8f2453be5 修改了sql 文件中的 set 命令(set table.sql-dialect=hive;)执行位置, 从 TableConfUtil 统一执行改为按照sql 文件位置执行
    原因: 同一sql 文件中的sql 可能需要不同的 sql dialect, 特别是hive 相关的

比如如下SQL, 读取 hive test.user_log,写另一张 hive 表 test.user_log_1

set execution.runtime-mode=BATCH;
set table.sql-dialect=hive;
drop table if exists myHive.test.user_log_1;
CREATE TABLE myHive.test.user_log_1 (
    user_id STRING
    ,item_id STRING
    ,category_id STRING
    ,behavior STRING
) PARTITIONED BY (ds STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 min',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

set table.sql-dialect=default;
insert into myHive.test.user_log_1
select * from myHive.test.user_log;

配置 hive catalog

flink 启用 hive catalog 后,可以直接查询 hive 表

val catalog = new HiveCatalog(paraTool.get(Constant.HIVE_CATALOG_NAME), paraTool.get(Constant.HIVE_DEFAULT_DATABASE), paraTool.get(Constant.HIVE_CONFIG_PATH))
tabEnv.registerCatalog(paraTool.get(Constant.HIVE_CATALOG_NAME), catalog)
tabEnv.useCatalog(paraTool.get(Constant.HIVE_CATALOG_NAME))

读 hive

hive 准备

登录 hive 客户端,创建数据库、表,写入数据

create database test;
use test;

CREATE TABLE user_log (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
) PARTITIONED BY (ds STRING) 
STORED AS parquet;


insert into user_log 
select '001', '001', '001', 'by', '2023-02-02' union all 
select '002', '001', '001', 'by', '2023-02-02' union all 
select '003', '001', '001', 'by', '2023-02-02' union all 
select '004', '001', '001', 'by', '2023-02-02' union all 
select '005', '001', '001', 'by', '2023-02-02' union all 
select '006', '001', '001', 'by', '2023-02-02'
;

demo 任务,print 结果了

-- read hive, write to print -- batch when read complete, job finish
-- sink
drop table if exists read_hiv_sink;
CREATE TABLE read_hiv_sink (
  user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior VARCHAR
  ,ds VARCHAR
) WITH (
  'connector' = 'print'
);

-- set streaming-source.enable = false;
-- set execution.runtime-mode = batch;
insert into read_hiv_sink
select user_id, item_id, category_id, behavior, ds
from myHive.test.user_log;

输出结果:

+I[001, 001, 001, by, 2023-02-02]
+I[002, 001, 001, by, 2023-02-02]
+I[003, 001, 001, by, 2023-02-02]
+I[004, 001, 001, by, 2023-02-02]
+I[005, 001, 001, by, 2023-02-02]
+I[006, 001, 001, by, 2023-02-02]

hive 表

-- 普通表
CREATE TABLE user_log_no_partition (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ds STRING
)
STORED AS parquet;

-- 分区表
CREATE TABLE user_log (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
) PARTITIONED BY (ds STRING) 
STORED AS parquet;

drop table if exists user_log;
CREATE TABLE user_log
(
    user_id     VARCHAR,
    item_id     VARCHAR,
    category_id VARCHAR,
    behavior    VARCHAR
) WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '200'
      ,'number-of-rows' = '10000'
      ,'fields.user_id.kind' = 'random'
      ,'fields.item_id.kind' = 'random'
      ,'fields.category_id.kind' = 'random'
      ,'fields.behavior.kind' = 'random'
      ,'fields.user_id.length' = '20'
      ,'fields.item_id.length' = '10'
      ,'fields.category_id.length' = '10'
      ,'fields.behavior.length' = '10'
      );

-- insert into myHive.test.user_log_no_partition
insert into myHive.test.user_log
select user_id, item_id, category_id, behavior, DATE_FORMAT(now(), 'yyyy-MM-dd')
from user_log;

hive 普通表数据

hive> select * from test.user_log_no_partition limit 10;
OK
user_id	item_id	category_id	behavior	ds
001	001	001	by	2023-02-02
002	001	001	by	2023-02-02
003	001	001	by	2023-02-02
004	001	001	by	2023-02-02
005	001	001	by	2023-02-02
006	001	001	by	2023-02-02
e3cf01a03b8703a83503	c1dea1d0b1	0dde257867	f44be1f275	2023-02-02
a81fac74643718fb0224	c303ab08a3	0c2afab7e8	c38ea617de	2023-02-02
02335798857badc2c713	91b0103a93	0f2362982c	e4d9c6a9cf	2023-02-02
e13d14a06dbe64fde111	718630b459	4bee13742b	2eb9ee92c9	2023-02-02
Time taken: 0.485 seconds, Fetched: 10 row(s)

hive 分区表数据

hive> select * from user_log limit 10;
OK
user_id	item_id	category_id	behavior	ds
19dae5cd466f644935ff	b84d6771b4	8dde856e3c	51dce16207	2023-02-02
2da4b0746a3d6b20793b	1c8f23cfea	969ab8ddc8	36fc80cf42	2023-02-02
cb460f98110d2bed28ac	592206f590	4f64d002b2	bb6359a7ff	2023-02-02
2fe0d99fb0a1fbd6f9b1	7a01902c67	e1c56a93b9	14ca4a6e09	2023-02-02
28ffd05b1fe84a363711	de57409f49	536fd35dc6	2c9c0880db	2023-02-02
304736147f0fb968fa7a	6868c7776d	52615aab5d	8a59f0284f	2023-02-02
a7391ba0aab507bb28a3	a1529947ac	0f228939d5	5905efec83	2023-02-02
973f74170e66714c7221	1b6273b98a	c191d93bb4	0cbcbf4832	2023-02-02
2637ca991c4a6f090106	fb7810dffd	61067a6c5b	f90e7b3553	2023-02-02
be3f2df8f9210cf092c0	9322fbfc57	a0fa4a467f	beeeb657ca	2023-02-02
Time taken: 0.467 seconds, Fetched: 10 row(s)

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文
flink 菜鸟公众号

标签:1.16,02,flink,hive,001,user,2023,id
From: https://www.cnblogs.com/Springmoon-venn/p/17086583.html

相关文章

  • Hive的分区和分桶
    为了避免全表扫描,优化查询性能,我们可以使用分区和分桶表将数据细化,分桶表是分区表的进阶阶段,分桶表是使用表的字段进行进一步细分数据,分区则是指定外部的字段来分区分区表cr......
  • Hive提取小时内,分组排名前3的sql
    表的结构是这样的,时间的范围我是提取了几个小时内的数据createtable`alibaba.user_bea`(user_idbigint,item_idbigint,cate_idbigint,timesstring......
  • hive的Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
    早上起来去跑个hive的sql,稍微复杂点sql,就会报错如Causedby:org.apache.hadoop.hdfs.BlockMissingException:Couldnotobtainblock:BP-572947236等,经过一个一个小时......
  • flinksql的初始化
    Mavn的依赖<properties><java.version>1.8</java.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.targ......
  • 【Flink】详解StreamGraph
    【Flink】详解StreamGraph大家好,我们的gzh是朝阳三只大明白,满满全是干货,分享近期的学习知识以及个人总结(包括读研和IT),跪求一波关注,希望和大家一起努力、进步!!概述没有看上一......
  • Hive 刷题——查询每个用户登录日期的最大空档期
    需求描述从登录明细表(user_login_detail)中查询每个用户两个登录日期(以login_ts为准)之间的最大的空档期。统计最大空档期时,用户最后一次登录至今的空档也要考虑在内,假设今......
  • hive优化
    hive优化1Fetch抓取Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT*FROMemployees;在这种情况下,Hive可以简单地读取employee对应的存储目......
  • hive架构原理
    Hive架构原理1)用户接口:ClientCLI(command-lineinterface)、JDBC/ODBC(jdbc访问hive)、WEBUI(浏览器访问hive)2)元数据:Metastore元数据包括:表名、表所属的数据库(默认是default)、......
  • hive类型转化
    hive类型转化Hive的原子数据类型是可以进行隐式转换的,类似于Java的类型转换,例如某表达式使用INT类型,TINYINT会自动转换为INT类型,但是Hive不会进行反向转化,例如,某表达式使用T......
  • Hive 刷题——统计每日商品1和商品2销量的差值
    需求描述从订单明细表(order_detail)中统计每天商品1和商品2销量(件数)的差值(商品1销量-商品2销量)期望结果如下:create_date diff 2020-10-08-24......