写在前面
今天继续学习hive部分的知识。
Hive中如何实现行列转换
-
一行变多行
可以对表使用LATERAL VIEW EXPLODE()
,也可以直接使用EXPLAIN()
函数来处理一行数据。SELECT name, col1 FROM testarray2 LATERAL VIEW EXPLODE(weight) t1 AS col1;
-
多行变一行
使用GROUP BY
+COLLECT_SET
/COLLECT_LIST
:GROUP BY
用于分组,分组后可以使用COLLECT_SET
或COLLECT_LIST
对每组数据进行聚合。- 最终会得到
ARRAY
类型的数据,可以使用CONCAT_WS
转成字符串。 COLLECT_SET
会去重,COLLECT_LIST
不会。
SELECT COLLECT_LIST(col1) FROM table GROUP BY name;
Hive中的自定义函数分类及实现方法
Hive 中的自定义函数分为三类:UDF (用户自定义函数),UDAF (用户自定义聚合函数) 和 UDTF (用户自定义表生成函数)
UDF
- 一进一出
- 创建
Maven
项目,并加入依赖:<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.3</version> </dependency>
- 编写代码,继承
org.apache.hadoop.hive.ql.exec.UDF
,实现evaluate()
方法。import org.apache.hadoop.hive.ql.exec.UDF; public class HiveUDF extends UDF { public String evaluate(String col1) { return "#" + col1 + "$"; } }
- 将代码打成
JAR
包并上传至 Linux 虚拟机。 - 使用
ADD JAR
在 Hive 中加载 JAR:ADD JAR /path/to/your/jarfile.jar;
- 注册临时函数:
CREATE TEMPORARY FUNCTION myfunction AS 'com.shujia.HiveUDF';
- 使用函数处理数据:
SELECT myfunction(name) AS myfunction FROM students LIMIT 10;
UDTF
- 一进多出
案例一:
-
转换前:
"key1:value1,key2:value2,key3:value3"
-
转换后:
key1 value1 key2 value2 key3 value3
-
方法一:使用
EXPLAIN
+SPLIT
SELECT SPLIT(t.col1, ":")[0], SPLIT(t.col1, ":")[1] FROM (SELECT EXPLODE(SPLIT("key1:value1,key2:value2,key3:value3", ",")) AS col1) t;
-
方法二:自定义
UDTF
import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.util.ArrayList; public class HiveUDTF extends GenericUDTF { @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { ArrayList<String> fieldNames = new ArrayList<>(); ArrayList<ObjectInspector> fieldObj = new ArrayList<>(); fieldNames.add("col1"); fieldObj.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col2"); fieldObj.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldObj); } public void process(Object[] objects) throws HiveException { String col = objects[0].toString(); String[] splits = col.split(","); for (String str : splits) { String[] cols = str.split(":"); forward(cols); } } public void close() throws HiveException {} }
-
使用 SQL:
SELECT my_udtf("key1:value1,key2:value2,key3:value3");
案例二:
-
数据表:
id, col1, col2, col3, ..., col12
共 13 列 -
数据:
a, 1, 2, 3, 4, 5, ..., 12
-
转换结果:
a, 0时, 1 a, 2时, 2 a, 4时, 3 ...
-
自定义
UDTF
示例:public class HiveUDTF2 extends GenericUDTF { @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { ArrayList<String> filedNames = new ArrayList<>(); ArrayList<ObjectInspector> fieldObj = new ArrayList<>(); filedNames.add("col1"); fieldObj.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); filedNames.add("col2"); fieldObj.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(filedNames, fieldObj); } public void process(Object[] objects) throws HiveException { int hours = 0; for (Object obj : objects) { hours = hours + 1; String col = obj.toString(); ArrayList<String> cols = new ArrayList<>(); cols.add(hours + "时"); cols.add(col); forward(cols); } } public void close() throws HiveException {} }
-
添加
JAR
文件:ADD JAR /path/to/HiveUDF2-1.0.jar;
-
注册
UDTF
:CREATE TEMPORARY FUNCTION my_udtf AS 'com.shujia.HiveUDTF2';
-
使用 SQL:
SELECT id, hours, value FROM udtfData LATERAL VIEW my_udtf(col1, col2, col3, ..., col12) t AS hours, value;
UDAF
- 多进一出
- 有
GROUP BY
等开窗函数,较少自行编写。
Hive操作方式
-
Hive CLI (命令行接口)
Hive 提供命令行界面 (CLI) 执行 HiveQL 查询和管理 Hive 数据库。
启动命令:hive
-
Beeline
Beeline 是 Hive 的 JDBC 客户端,支持连接 Hive Server 2,推荐用于生产环境中。
启动 Beeline:beeline -u jdbc:hive2://<hive-server>:10000
-
Hive JDBC 接口
通过 JDBC 连接 Hive,执行 SQL 查询并获取结果。
示例:String url = "jdbc:hive2://<hive-server>:10000/default"; Connection con = DriverManager.getConnection(url, "username", "password"); Statement stmt = con.createStatement(); ResultSet rs = stmt.executeQuery("SELECT * FROM table_name");
-
Apache Spark 与 Hive 集成
使用
spark-hive
模块,允许使用 Spark SQL 查询 Hive 数据。
配置:--conf spark.sql.warehouse.dir=/user/hive/warehouse \ --conf spark.sql.catalogImplementation=hive
使用 Spark SQL 查询 Hive 数据:
val spark = SparkSession.builder() .appName("Hive Integration") .enableHiveSupport() .getOrCreate() spark.sql("SELECT * FROM table_name").show()
-
Hive命令中的-e/-f参数
-e
:执行一条 Hive 的 SQL 命令。-f
:执行一个 SQL 脚本。
-
Crontab 中定时策略
在 crontab
中,定时策略通过指定一组时间字段来设置任务的执行时间。每个字段代表了一个时间单位,组合起来形成一个定时任务的执行规则。
crontab
的基本格式如下:
* * * * * command_to_execute
- - - - -
| | | | |
| | | | +---- 星期几 (0 - 7) (0 和 7 都代表星期天)
| | | +------ 月份 (1 - 12)
| | +-------- 月中的某一天 (1 - 31)
| +---------- 小时 (0 - 23)
+------------ 分钟 (0 - 59)
时间字段含义
- 分钟(0 - 59):表示任务在每小时的哪一分钟执行。
- 小时(0 - 23):表示任务在每天的哪个小时执行。
- 日期(1 - 31):表示任务在每月的哪一天执行。
- 月份(1 - 12):表示任务在哪个月执行。
- 星期(0 - 7):表示任务在哪一天的星期执行(
0
和7
都代表星期天,1
代表星期一,以此类推)。
特殊字符
- 星号
*
:表示“每”或“任意”。例如*
在分钟字段中表示每一分钟都执行任务。 - 逗号
,
:用于指定多个值。例如5,10,15
表示在第 5、10 和 15 分钟执行。 - 连字符
-
:用于指定范围。例如1-5
表示从第 1 天到第 5 天。 - 斜杠
/
:表示步进值。例如*/5
表示每 5 个单位执行一次。
详细解释
-
每周一到周五的中午 12 点执行:
0 12 * * 1-5 command_to_execute
0
:在每个小时的第 0 分钟执行(即整点)。12
:在每天的中午 12 点执行。*
:每天都执行。1-5
:仅在星期一至星期五执行(星期一为1
,星期五为5
)。
-
每个月的 1 号和 15 号凌晨 3 点执行:
0 3 1,15 * * command_to_execute
0
:在每小时的第 0 分钟执行。3
:在每天的 3 点执行。1,15
:仅在每月的 1 号和 15 号执行。*
:每个月都执行。
-
每天的 1:30 AM 执行:
30 1 * * * command_to_execute
30
:在每个小时的第 30 分钟执行。1
:在每天的 1 点执行。
-
每 10 分钟执行一次:
*/10 * * * * command_to_execute
*/10
:表示每隔 10 分钟执行一次任务。
-
每个星期天的下午 2 点 30 分执行任务:
30 14 * * 0 command_to_execute
30
:在每个小时的第 30 分钟执行。14
:在下午 2 点执行。0
:仅在星期天(0
或7
)执行。
Hive 建表时注意事项
-
分区,分桶
一般是按照业务日期进行分区,每天的数据放在一个分区里。 -
一般使用外部表,避免数据误删
使用外部表时,数据存储在 HDFS 上,不会因为删除 Hive 表而删除实际数据。 -
选择适当的文件储存格式及压缩格式
选择合适的文件格式,如 Parquet、ORC 等,这些格式支持列式存储并且压缩效率高。 -
命名要规范
避免表名、列名使用空格、特殊字符,遵循统一的命名规范。 -
数据分层,表分离,但也不要分的太散
对数据进行合理的分层设计,避免数据过于分散,影响查询性能。
Hive 优化
1. 处理数据倾斜
数据倾斜原因
- Key 分布不均匀
- 触发了 Shuffle 操作
GROUP BY
或DISTINCT
JOIN
或UNION
数据倾斜的表现
- 任务进度长时间停滞在 99%(或 100%)。
- 单一 reduce 处理的数据量远大于其他 reduce。
解决方案
- 检查数据源头,过滤无效数据。
- 对重复的
key
值进行拆分或哈希。 - 避免不必要的
Shuffle
操作。
2. 建表(分区分桶)
-
分区,分桶
一般按业务日期进行分区,每天的数据放在一个分区里。 -
使用外部表
使用外部表避免数据误删。 -
选择适当的文件存储格式及压缩格式
使用 Parquet、ORC 等列式存储格式,适当压缩数据,减少磁盘占用。 -
命名要规范
表名和列名应符合统一命名规范。 -
数据分层,表分离
采用合理的数据分层设计,避免表过多或过少,避免分得过散。
3. SQL 规范
-
合理分区分桶
查询时进行分区裁剪,避免扫描无关数据。 -
WHERE 过滤
先进行过滤,再进行JOIN
或计算。 -
MapJoin
在 Hive 1.2 之后,MapJoin
默认启用。如果没有自动启用,可以手动指定:SELECT /*+mapjoin(b)*/ a.xx, b.xxx FROM a LEFT OUTER JOIN b ON a.id = b.id;
左连接时,通常大表放在左边,小表放在右边。
-
合并小文件
合并小文件以提升性能。 -
适当的子查询
使用子查询时,确保子查询数量合理,避免嵌套过深影响性能。 -
适当的排序方式
ORDER BY
:全局排序,只能有一个 reduce。SORT BY
:每个 reduce 内部排序。DISTRIBUTE BY
:对指定字段进行分区。CLUSTER BY
:类似于DISTRIBUTE BY
+SORT BY
。
-
适当的执行引擎
- 使用
MR
(MapReduce)、Tez
或Spark
等引擎,根据不同情况选择最合适的执行引擎。
- 使用
4. 开启相关参数,调整参数
-
set mapred.reduce.tasks;
设置 Reduce 的数量。 -
set hive.auto.convert.join=true;
开启MapJoin
,提高JOIN
性能。 -
set hive.mapjoin.smalltable.filesize=20000000;
设置小表的文件大小为 20M,默认值为 25M。 -
set hive.groupby.skewindata=true;
开启数据倾斜优化。 -
set hive.map.aggr = true;
开启预聚合功能。 -
set hive.exec.parallel=true;
开启并行执行。 -
set hive.exec.parallel.thread.number=16;
设置 SQL 允许的最大并行度,默认为 8。
什么是事实表、维度表?
-
事实表
事实表用于存储数据仓库中的业务过程量化数据,通常包含数字型数据(如销售额、订单数量、利润等)。事实表与维度表通过外键连接,提供更多上下文信息。特点:
- 包含度量值,如销售量、收入、支出等。
- 每行代表一个具体的业务事件。
- 外键字段与维度表连接。
-
维度表
维度表存储描述事实表中数据的相关信息,通常是文本型数据(如产品名称、客户姓名、商店地址、时间等)。特点:
- 包含描述性数据,如客户名称、产品类别、时间等。
- 主键字段与事实表中的外键关联。
- 较小的数据量。
星型模型与雪花模型的区别?
-
星型模型:
事实表在中心,周围是维度表。简洁、易于理解,但会导致数据冗余。 -
雪花模型:
维度表进一步拆分成子维度表,规范化存储,减少冗余,但复杂度较高,查询性能较差。
数据仓库的分层设计
为什么要设计数据分层?
数据分层有助于数据在流转过程中保持秩序,使数据生命周期清晰、可控。避免层级混乱或复杂依赖结构。
数据分层的好处:
- 清晰数据结构:每一层有明确的职责和作用,便于使用和理解。
- 减少重复开发:通用数据和中间层可以减少重复计算。
- 统一数据口径:统一数据出口,确保输出数据的一致性。
- 复杂问题简单化:将复杂任务分解成多个层次完成。
分层设计
ODS 层(Operational Data Store 数据运营层)
ODS 层接近数据源,通常不做过多的数据清洗,原始数据直接存入该层。主要用于追溯原始数据问题。
DW 层(Data Warehouse 数据仓库层)
DW 层是核心层,数据从 ODS 层抽取后,进行进一步的清洗、聚合,并按主题建立数据模型。DW 层可分为以下几层:
-
DWD 层(Data Warehouse Detail 数据明细层):
保持与 ODS 层相同的数据粒度,做数据清洗和部分聚合。 -
DWM 层(Data Warehouse Middle 数据中间层):
对 DWD 层数据进行轻度聚合,生成中间表,提升指标复用性。 -
DWS 层(Data Warehouse Service 数据服务层):
又称数据集市或宽表。按照业务划分,如流量、订单、用户等,生成字段比较多的宽表,用于提供后续的业务查询,OLAP分析,数据分发等。
一般来讲,该层的数据表会相对比较少,一张表会涵盖比较多的业务内容,由于其字段较多,因此一般也会称该层的表为宽表。在实际计算中,如果直接从DWD或者ODS计算出宽表的统计指标,会存在计算量太大并且维度太少的问题,因此一般的做法是,在DWM层先计算出多个小的中间表,然后再拼接成一张DWS的宽表。由于宽和窄的界限不易界定,也可以去掉DWM这一层,只留DWS层,将所有的数据在放在DWS亦可。
ADS/APP/DM层(Application Data Store/Application/DataMarket 数据应用层/数据集市):
在这里,主要是提供给数据产品和数据分析使用的数据,一般会存放在 ES、PostgreSql、Redis等系统中供线上系统使用,也可能会存在 Hive 或者 Druid 中供数据分析和数据挖掘使用。比如我们经常说的报表数据,一般就放在这里。
DIM层(Dimension 维表层)
维表层主要包含两部分数据:
-
高基数维度数据:一般是用户资料表、商品资料表类似的资料表。数据量可能是千万级或者上亿级别。
-
低基数维度数据:一般是配置表,比如枚举值对应的中文含义,或者日期维表。数据量可能是个位数或者几千几万。