HiveSQL 原理
join
join 分为 MapJoin、ReduceJoin 两种,其中 MapJoin 思想是将小表存内存,然后大表分片,与小表完成连接操作。
MapJoin
- Map 阶段分为两个操作:
- 将小表数据读入内存,生成分片文件后存储到分布式存储系统中;
- 每个 Mapper 从分布式存储系统中读取文件分片到内存,然后顺序扫描大表,在 Map 阶段完成 join;
这种方案只适用于一份小数据表和大表的联合。
ReduceJoin
- Map 阶段以关联键作为 Key,查询后的列作为 Value 输出;
- shuffle 阶段按照 Key 值hash 将数据发送到不同 Reducer;
- Reduce 阶段按照 Key 值执行联合操作。
ReduceJoin 存在几个问题:
- Map 阶段没有对数据瘦身,导致 Shuffle 阶段传输数据量很大;
- Reduce 阶段需要对 Value 做乘积运算,容易导致 OOM。
GroupBy
GroupBy
执行过程可以分为三个阶段:
- map阶段:将 groupby 后的字段作为 key(多个就多个列看出整体),将 groupby 之后要进行聚合的字段作为值,如果要进行
count(xxx)
,则值为1;如果sum(xxx)
,则值为该字段;该阶段能够将数据平分给各个节点,总数据量维持恒定,所以不存在数据倾斜; - shuffle阶段:按照 key 排序并将不同数据分发到指定 Reducer,shuffle 阶段取决于 Map 的结果,无法控制。
- reduce阶段:如果是 count 就统计各个 Key 的值累加之和,否则按照需要的聚合操作执行;Reduce 阶段会因为 Shuffle 后各个节点的数据量不同产生数据倾斜问题,最终计算时间取决于数据量多的那个 Reduce。
with tmp1 as (
select
'a' as pro,
'1' as city
union all
select
'a',
'1'
union all
select
'a',
'1'
union all
select
'b',
'0'
)
select
pro,
city,
count(*)
from
tmp1
group by
pro,
city
如何优化?
- 针对 Key 做处理:
- 将 Key 替换为随机数;
- 提前挑选出大数量级的 Key 单独处理;
- 参数配置:
set hive.map.aggr = true
:开启 Map 端的聚合操作,减少传送到 Reducer 的数据量,同时需要设置hive.groupby.mapaggr.checkinterval
规定 Map 端进行聚合操作的数目;mapred.reduce.tasks
:执行 Reduce 任务的数量,配置为较大值来减少每个 Reducer 处理的数量;set hive.groupby.skewindata = true
:开启自动的负载均衡,它会生成两个 MR Job,第一个会随机分布 Map 的结果到不同 Reduce 中,这样导致按照相同 Key 分组的数据会被发送到不同的 Reduce 中;第二个 MR Job 按照 Key 进行正确分组;
Distinct 单字段原理
- map阶段:Distinct 执行时,Hive 会将操作转换为一个 MapReduce 任务,并按照
groupBy
+distinct
字段组合作为 Key,value 设置为 1 来作为 Map 任务输出; - shuffle阶段:将 GroupBy 的字段作为分区键进行数据分区,发送到不同 Reduce 任务;
- reduce阶段:由于 Map 阶段的排序,输入天然就是按组合 Key 排好序的,此时按照顺序取出组合键中的 distinct 字段,依次遍历 distinct 字段找到每一个不同的值,如果是 count 操作,计数器就自增1,最终得到输出。
with tmp1 as (
select
'a' as pro,
'1' as city,
'张三' as userid
union all
select
'a',
'1',
'张三'
union all
select
'a',
'1',
'张三'
union all
select
'b',
'0',
'张三'
)
select
pro,
count(distinct userid)
from
tmp1
group by
pro
count(distinct xxx)
会导致大量的数据倾斜,因为 Map 阶段没有完成数据去重操作,而是将 Map 输出的结果交给一个 Reducer 处理执行 Count(xxx)
合并操作,这个操作会成为整个作业的 IO瓶颈。
如何优化?
在 Map 阶段完成对数据的去重,可以嵌套子查询,子查询完成对 id字段的去重(可并发执行),第二阶段对去重后的 id 值合并计数,这样 Map 阶段输出数据更少,Reduce 阶段即使指定一个 Reducer 也不会成为性能瓶颈。
SELECT
COUNT(*)
FROM
(
SELECT
DISTINCT id
FROM
table_name
WHERE
....
) t;
Distinct 多字段原理
SQL 调优
计算资源调优
针对 Hive On MR
,计算资源调整主要涉及 Yarn、MR。
YARN 配置: 主要包括 Container执行内存、CPU核数
yarn.nodemanager.resource.memory-mb
:每个NodeManager
节点分配给Container
使用的内存。yarn.nodemanager.resource.cpu-vcores
:每个NodeManager
节点分配给Container
使用的 CPU 核心数。yarn.scheduler.maximum-allocation-mb
:每个 Container 所能使用的最大内存。yarn.scheduler.minimum-allocation-mb
:每个 Container 所能使用的最小内存。
MapReduce 配置: 主要包括 MapTask 的内存和 CPU 核数、ReduceTask 的内存和 CPU 核数
mapreduce.map.memory.mb
:每个 MapTask 所使用的 Container 内存;mapreduce.map.cpu.vcores
:每个 MapTask 所使用的 Container CPU 核心数;mapreduce.reduce.memory.mb
:每个 ReduceTask 所使用的 Container 内存;mapreduce.reduce.cpu.vcores
:每个 ReduceTask 所使用的 Container CPU 核心数。
执行计划调优
Explain 执行计划:
- 由一系列 Stage 组成,每个 Stage 对应一个 MapReduce Job 或者文件系统操作,各个 Stage 之间有先后顺序。
- Map/Reduce 端计算逻辑分别由 Map操作树、Reduce操作树组成,一个操作代表某个阶段的单一逻辑操作,比如
TableScan Operator
、Select Operator
、Join Operator
; - hive 中常见 Operator:
- TableScan:扫描表操作;
- Select Operator:选取;
- Group By Operator:分组;
- Reduce Output Operator:输出;
- Filter Operator:过滤;
- Join Operator:连接;
- File Output Operator:文件输出;
- Fetch Operator:客户端获取数据操作。
Explain 语句模板: EXPLAIN [FORMATTED | EXTENDED | DEPENDENCY] query-sql
- FORMATTED:将执行计划以 JSON 字符串形式输出;
- EXTENDED:输出执行计划中额外信息,通常是读写的文件名等信息;
- DENPENDENCY:输出计划中额外信息,通常是读写文件名等信息。
分组聚合优化
未优化执行过程: Hive 未经优化的分组聚合,是通过一个 MapReduce 任务完成的。Map 端负责读取数据,并且按照分组字段分区,通过 shuffle
,然后将数据发送到 Reduce 端,各组数据在 Reduce 端完成最终的聚合运算。
优化后的执行过程: 优化主要围绕减少 shuffle
数据量进行,具体做法是 map-side
聚合。在 Map 端内存中维护一个哈希表,利用哈希表完成部分结果的聚合,然后按照分组字段分区,将结果发送至 Reduce 端,完成最终聚合操作。map-side
的聚合操作能有效减少 shuffle 的数据量。
如何配置:
# 开启 map-side
set hive.map.aggr=true
# 检测是否有必要执行 map-side,若聚合后的条数和聚合前的条数比值小于该值,则认为适合做聚合,否则不适合
set hive.map.aggr.hash.min.reduction=0.5;
# 检测源表是否适合 map-side 聚合的条数
set hive.groupby.mapaggr.checkinterval=100000;
# map-side 聚合使用的哈希表占用 Map端内存的最大比例,若超过该值执行一次哈希表 flush 操作
set hive.map.aggr.hash.force.flush.memory.threshold=0.9;
Join 优化
Hive 包括的 join 算法:Common Join、Map Join、Bucket Map Join、Sort Merge Bucket Map Join;
Join 算法介绍
Common Join:
- 最稳定的 Join 算法,通过一个 MapReduce 任务完成。Map 端负责读取 Join 操作所需要的表数据,并且按照关联字段分区,通过
shuffle
,然后将数据发送到 Reduce 端,相同 Key 的数据最终在 Reduce 端完成最终的 Join 操作。 - 如果多表联合 Join,关联字段一致,那么只会通过一个 Commmon Join 任务完成,先读取 Join 操作需要的表数据,然后通过
shuffle
将相同 Key 的数据发送到 Reduce 端;如果关联字段不一致,则必须要通过多个 Common Join 任务。
SELECT
a.val,
b.val,
c.val
FROM a
JOIN b ON (a.key = b.key1)
JOIN c ON (c.key = b.key1)
两个 join 操作均关联到相同的字段(b 表 key1),所以可以进行一次 shuffle,由一个 Common Join 任务完成。
SELECT
a.val,
b.val,
c.val
FROM a
JOIN b ON (a.key = b.key1)
JOIN c ON (c.key = b.key2)
两个 join 操作关联字段不一致,所以必须两次 Join 完成。
Map Join:
- 通过两个只有 Map 阶段的 Job 完成一个 join 操作,适用场景为大表 join 小表;
- 第一个 Job 会读取小表数据,然后根据关联字段计算哈希值,上传到分布式缓存的不同节点中(HDFS);
- 第二个 Job 会从分布式缓存中读取对应小表数据,并缓存在 MapTask 内存中,然后扫描大表数据,完成联合操作。
- 整个过程不需要 Reduce 操作,通过两次 Map 操作减少了 shuffle 开销。
Bucket Map Join:
- 如果参与 join 的表均为分桶表,且关联字段为分桶字段,且一张表的分桶数量是另一张表分桶数量的整数倍,此时就可以在两表的分桶间进行 Map Join 操作。这样第二个 Job 的 Map 端就不需要缓存小表的全表数据了,而只需要缓存其所需的分桶即可。分桶的目的是为了解决表数据无法缓存到内存的问题。
- 第一个 Job 会读取小表分桶后的数据,然后上传到分布式缓存(HDFS);
- 第二个 Job 首先会根据大表的分桶数启动对应数量的 MapTask,每个 MapTask 从分布式缓存读取数据,只需要缓存自己需要的小表桶数据。
- Bucket Map Join 适用于大表与大表的联合操作,通过分治思想将其中较小的表数据分桶。同时每次处理大表的一个桶数据,启动一个 MapTask 任务,加载需要的小表桶数据到内存中,然后执行联合操作,整个过程无需 Reduce 操作,避免了 shuffle 开销。
Sort Merge Bucket Map Join:
- 要求参与联合的表均为分桶表,且分桶内的数据均为有序的,且分桶字段、排序字段和关联字段为相同字段,且一张表的分桶数是另一张表的分桶数的整数倍。
Bucket Map Join
的联合原理是通过Hash Join
算法;而Sort Merge Bucket Map Join
的原理是通过Sort Merge Join
算法。Sort Merge Join
由于两张表都是按照关联字段排好序的,所以无需将整张表的数据加载到内存,而只需要顺序读两张表,对相同的数据值的数据行执行联合操作。- Sort Merge Bucket Map Join 不需要缓存桶数据到内存,所以对内存没有要求,另外因为数据是按照关联字段排序的,只需要顺序读取即可。
Map Join 优化
- Hive 在编译 SQL 语句阶段,起初所有 join 操作均采用
Common join
实现。之后在执行阶段,会根据每个Common join
任务涉及的表大小判断能够转化为Map join
任务。 - 但是存在一些特殊场景在编译时不能确定涉及表大小,比如对子查询执行 join 操作。针对这种情况,hive 会在编译阶段生成一个条件任务,它下面会包含一个计划列表,计划列表中包含转换后的
Map join
任务以及原有的Common join
任务。
Map Join
核心判断逻辑:
hive.auto.convert.join
:是否自动优化 Common Join 任务;- 寻找大表候选人,根据 join 类型判断能否做 Map Join。比如
a left join b
,只能将 b 作为缓存表,这样才能全量输出 a 表数据,在此基础上判断此时能否进行 Map Join 优化。 hive.auto.convert.join.noconditionaltask = false
:需要开启条件任务。条件任务会以每个大表候选人作为大表,生成 Map join 计划,若某大表候选人大小已知,且其之外的已知的表大小总和大于hive.mapjoin.smalltable.filesize
,则不生成对应的 Map Join 计划。hive.auto.convert.join.noconditionaltask = true
:不需要开启条件任务,则除开大表候选人之外其余表大小必须已知,且总和小于hive.auto.convert.join.noconditionaltask.size
,如果满足生成最优的 Map Join 计划。若不满足上述条件,还是会生成条件任务。- 若已经生成最优 Map Join 计划,此时子任务也是 Map Join,并且子任务和当前任务的所有小表大小均已知,并且总和小于
hive.auto.convert.join.noconditionaltask.size
,则会将当前 Map Join 和子任务的 Map Join 合并为一个任务。即使此时出现多表联合的情况,比如 a、b、c 三表联合,如果是 Common Join,一定为生成两个 Join 任务执行,同时过程中含有 shuffle;如果是 Map Join,则变成 a-b、b-c 两个 Map Join 任务,最终可以合并为一个 Map Join 任务(a 和 b 先联合,然后根据结果和 c 再联合)。
案例:
现有以下三张表实现联合操作:
select
*
from order_detail od
join product_info product on od.product_id = product.id
join province_info province on od.province_id = province.id;
- 方案一:MapJoin 优化
# 配置 MapJoin 自动转换开启
set hive.auto.convert.join=true;
# 不使用无条件转换 MapJoin
set hive.auto.convert.join.noconditionaltask=false;
# 调整内存大小,使其大于 product_info 表大小
set hive.mapjoin.smalltable.filesize=25285707;
如图每一步操作都会生成多个执行计划,不同的执行计划采取不同措施。
- 方案二:无条件 MapJoin 优化
# 配置 MapJoin 自动转换
set hive.auto.convert.join=true;
# 使用无条件转换
set hive.auto.convert.join.noconditionaltask=true;
# 调整内存大小,使其大于等于 product_info、province_info 表大小之和
set hive.auto.convert.join.noconditionaltask.size=25286076;
上述配置会转化为两个 MapJoin Operator,并且由于两个 MapJoin Operator 小表之和小于内存大小,所以可以合并为一个 MapJoin 任务。这种方式效率最高,但是占用内存最多。
Bucket Map Join 优化
SELECT
*
FROM (
SELECT
*
FROM order_detail where dt = '2020-06-14'
) od
JOIN
(
SELECT
*
FROM payment_detail
WHERE dt = '2020-06-14';
) pd
ON od.id = pd.order_detail_id;
分析:
第二张表大小为 300M,按一般压缩率为 10:1 来算,需要至少 3G 内存才能存储哈希表数据。因此 Map Join 优化不现实,考虑 Bukcet Map Join 优化。
考虑将order_detail
拆分为 16 个桶,payment_detail
拆分为 8 个桶,两个表的桶数成倍数关系。
- 建表语句
# 删除分桶表
drop table if exists order_detail_bucketed;
# 重新创建分桶表
create table order_detail_bucketed (
id string comment '订单ID',
user_id string comment '用户ID',
product_id string comment '商品ID',
province_id string comment '省份ID',
create_time string comment '创建时间',
product_number int comment '商品数量',
total_amount decimal(16, 2) comment '订单金额'
)
clustered by (id) into 16 buckets
row format delimited fields terminated by '\t';
drop table if exists payment_detail_bucketed;
create table payment_detail_bucketed(
id string comment '支付id',
order_detail_id string comment '订单明细id',
user_id string comment '用户id',
payment_time string comment '支付时间',
total_amount decimal(16, 2) comment '支付金额'
)
clustered by (order_detail_id) into 8 buckets
row format delimited fields terminated by '\t';
- 导入数据到各个分桶表
insert overwrite table order_detail_bucketed
select
id,
user_id,
product_id,
province_id,
create_time,
product_num,
total_amount
from order_detail
where dt='2020-06-14';
insert overwrite table payment_detail_bucketed
select
id,
order_detail_id,
user_id,
payment_time,
total_amount
from payment_detail
where dt='2020-06-14';
- 开启 Bucket Map Join 配置
# 关闭 cbo 优化,cbo 导致 Bucket Map Join 失效
set hive.cbo.enable=false;
# 启用 Bucket Map Join
set hive.optimize.bucketmapjoin=true;
- 重写 SQL 如下
select /*+ mapjoin(pd) */
*
from order_detail_bucketd od
join payment_detail_bucketed pd on od.id = pd.order_detail_id;
Sort Merge Bucket Map Join 优化
如何触发 sort merge bucket map join
:
set hive.optimize.bucketmapjoin.sortedmerge = true;
:启动 Sort Merge Bucket Map Join 优化;set hive.auto.convert.sortmerge.join = true;
:开启自动转换 Sort Merge Bucket Map Join 优化;
优化前:
select
*
from (
select * from order_detail
where dt = '2020-06-14'
) od
join (
select * from payment_detail
where dt = '2020-06-14'
) pd
on od.id = pd.order_detail_id;
包含一个 join 语句,因此执行一个 Common Join
任务,通过一个 MapReduce Job
实现。
参考上述数据,最小的数据大小为 319M,预估需要 3G 的内存才能成功执行。因此最好使用 Bucket Map Join
,但是 Sort Merge Bucket Map Join
优化后对分桶大小没有要求,占用内存更小。
依旧是按照 Bucket Map Join
优化方式先创建分桶表、导入数据,然后配置如下参数开启优化:
set hive.optimize.bucketmapjoin.sortedmerge = true;
:开启 Sort Merge Bucket Map Join 优化;set hive.auto.convert.sortmerge.join = true;
:开启自动转换 Sort Merge Bucket Map Join 优化;
最后重写 SQL:
select
*
from order_detail_sorted_bucketed od
join payment_detail_sorted_bucketed pd
on od.id = pd.order_detail_id;
数据倾斜调优
分组聚合导致数据倾斜
比如执行 groupby
等操作会导致 Map 操作后,相同 key 的数据发送到同一个 Reduce 节点,从而导致数据倾斜。
方案一:Map端聚合
原理:数据分区后,Map端再执行数据聚合,减少了发送到 Reduce 节点的数据量。
# 开启 Map 端聚合
set hive.map.aggr = true;
# 检测是否有必要执行 map-side,聚合后的数据条数/聚合前的数据条数比值小于该值,认为适合聚合
set hive.map.aggr.hash.min.reduction = xxx;
# MapTask 内存大于该值会执行一次 flush
set hive.map.hash.force.flush.memory.threshold = xxx;
方案二:Skew-GroupBy聚合
原理:启动两个 MR 任务,一个按照随机数分区,将数据随机发送到 Reduce;一个按照分组字段分区,完成最终聚合。
set hive.groupby.skewindata = true;
Join 导致的数据倾斜
前文提到的 join 实际上是通过一个 MapReduce 任务完成的,默认是使用 Common Join 算法,Map端负责读取目标字段并按照关联字段分区,然后通过 shuffle 将数据发送到 Reduce,最终相同的 Key 完成联合操作。这样就可能出现某个关联字段值的数据量非常大,都会 shuffle 发送到同一个 Reduce节点上。
方案一:Map Join
原理:适合大小表联合,Join 操作在 Map 端完成,没有 shuffle 的性能开销。
# 开启 MapJoin 自动转换
set hive.auto.convert.join = true;
# 内存限制,判断是否可以生成 MapJoin 任务
set hive.mapjoin.smalltable.filesize = xxx;
# 开启无条件转换 MapJoin,否则会生成多个可能的执行计划
set hive.auto.convert.join.noconditionaltask = true;
# 无条件转换为 MapJoin 打开后,内存大小阈值,若小于该阈值,不会再生成备用计划
set hive.auto.convert.join.noconditionaltask.size = xxx;
方案二:Skew Join
原理:为倾斜的大 Key 单独启动一个 Map Join 任务计算,其余 Key 进行正常的 Common Join。
# 启动 Skew Join 优化
set hive.optimize.skewjoin = true;
# 触发 Skew Join 的阈值,若某个 Key 行数超过该值则触发
set hive.skewjoin.key = 10000;
方案三:调整 SQL 语句
假设存在数据倾斜问题如下:
select * from A join B on A.id = B.id;
调整后的 sql 语句如下:
select
*
from (
select // 按照id字段打散
concat(id, '_', cast(rand() * 2 as int)) id,
value
from A
) ta
join (
select // 小表数据扩容
concat(id, '_', 0) id,
value
from B
union all
select
concat(id, '_', 1) id,
value
from B
) tb
on ta.id = tb.id;
分别对存在数据倾斜的表的值加随机数打散(concat),再对小表数据进行扩容(union all),完成对 SQL 的优化。
任务并行度优化
Map 端并行度
Map 端并行度是由 Map 的个数决定的,取决于输入文件的切片数,一般情况下,Map端的并行度无需手动调整。
- 小文件优化
如果 Map 端存在过多的小文件,会导致启动大量的 MapTask,所以需要对小文件进行合并。通过配置 set hive.input.format = org.apache.hadoop.hive.ql.io.HiveInputFormat;
控制 MapTask 的个数。
- 增加MapTask数量
在计算资源充足的前提下,可以增大 Map 端并行度(MapTask 数量),每个 MapTask 计算数据减少一些。通过配置 set mapreduce.input.fileinputformat.split.maxsize = xxx;
Reduce 端并行度
# 指定 Reduce 端并行度,如果值为-1,表示用户未指定
set mapreduce.job.reduces = xxx;
# Reduce 端并行度最大值
set hive.exec.reducers.max = xxx;
# 单个 Reduce Task 计算的数据量
set hive.exec.reducers.bytes.per.reducer = xxx;
参考:
标签:Map,set,Join,hive,SQL,join,hive06,优化,id From: https://www.cnblogs.com/istitches/p/18348545Hive_On_Spark 和 Spark_On_Hive 区别:https://www.cnblogs.com/liugp/p/16209394.html;