一、Hive基础知识
Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能(hive的HQL语法设计实际模仿Mysql的语法)。其本质是将SQL转换为MapReduce的任务进行运算,底层由HDFS来提供数据的存储,说白了hive可以理解为一个将SQL转换为MapReduce的任务的工具,甚至更进一步可以说hive就是一个MapReduce的客户端。对于客户端提交的HQL语句,Hive解析器(主要包括编译器,优化器和执行器)对其进行优化,并编译为一个MR任务提交给具体的hive引擎执行。
hive接口、引擎架构图上传失败,后续优化
Hive引擎介绍:MapReduce(注意:本段中的MapReduce指的是hive的MapReduce引擎,不是具体的HQL解析成的MapReduce代码,该引擎在hive2中将被弃用)将一个算法抽象成Map和Reduce两个阶段进行处理;Tez引擎将Map和Reduce两个操作进一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output, Reduce被拆分成Input、Shuffle、Sort、Merge、Processor和Output等依赖DAG:Mapreduce没有DAG一说,Tez将map和reduce阶段拆分成多个阶段,分解后的元操作可以任意灵活组合,产生新的操作,这些操作经过一些控制程序组装后,可形成一个大的DAG作业落地磁盘:MapReduce会有多次落地磁盘;Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少。Spark更像是一个通用的计算引擎,提供内存计算,实时流处理,机器学习等多种计算方式,适合迭代计算。Hive三种引擎实现核心原理和对比参考:https://www.jianshu.com/p/357fceaa4042
备注:培训内容不针对于Spark作为Hive引擎的情况。
通过参数声明方式设置三种引擎的方式:
1)、配置mapreduce计算引擎
set hive.execution.engine=mr;
2)、配置tez计算引擎
set hive.execution.engine=tez;
3)、配置spark计算引擎
set hive.execution.engine=spark;
可以通过命令:set hive.execution.engine 查看当前hive的引擎信息。
Hive参数大全:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
二、Wordcount功能的MapReduce代码和HQL语句对比
在hadoop1.x中提出MapReduce计算框架,到hadoop2.x中保留了这种计算框架,运行在yarn上。MapReduce的核心思想是分而治之,数据量越大优势越大。概括起来包括三个阶段,八大步骤(简称天龙八部)。三个阶段包括:
Map阶段:把复杂的任务分解为若干个“简单的任务”来处理,可以拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。一般一个map任务可以处理一个block块(128MB)中的数据。
Shuffle阶段:对Map阶段的输出数据进行重新洗牌,主要包括:分区,排序,规约,分组几个步骤,这部分由MapReduce框架自己实现,属于隐藏的细节。
Reduce阶段:对Map阶段的结果进行全局汇总并输出。
reduceTask运行机制图上传失败
天龙八部:
第一步:读取文件,解析成key-value对,即key1,value1。
第二步:Map分:接收第一步读取 的key1和value1,解析成新的key2,value2输出。
第三到六步:Shuffle:编程不涉及。分区:相同key的数据发送到同一个reduce里边去,形成一个集合,这里的key指的是key2;排序:对数据进行字典序排序;规约:在Map端对数据做一次聚合,减少输出的key2数据量;分组:将相同的数据发送到同一个组里面去调用一次reduce逻辑。
第七步:Reduce合:MapReduce框架简单的抽象模型,
第八步:结果输出,整体步骤如图所示:
天龙八部图上传失败
MapReduce代码偏模板化编程,非常死板,只需要自定义Map代码和Reduce代码,甚至于可以不用自定义reduce代码
map方法会被反复调用,每一行数据都会调用一次。如果是有多个Mapper,则每一行记录可以分配到不同的Mapper中去,增加并行性。
reduce方法也会被反复调用,每一个记录来了之后,都会执行一次reduce方法,如果开了多个Reducer,多个记录可以分配到不同的Reduce中去。代码中任务主类被省略,包括完整的定义天龙八部的八个步骤,指定输入文件位置,设置map类,shuffle过程可以省略,设置reduce类,设置输出(输出的路径需要设置为不存在的路径,这一点不确认新版是否有优化)。Map和Reduce任务都是单独启动的进程,在源码实现中,需要单独启动JVM虚拟机进程。这个设计主要目的是保证资源隔离,同时也是一种安全机制,避免用户提交的Map或Reduce代码产生问题后,导致TaskTracker进程死掉(详细内容可参考:https://blog.csdn.net/wawmg/article/details/8744455)。
HQL语句实现:
CREATE TABLE docs(line STRING);
LOAD DATA INPATH 'docs' OVERWRITE INTO TABLE docs;
CREATE TABLE word_counts AS
SELECT word,count(1) AS count FROM
(SELECT explode(split(line,'\s')) AS word FROM docs) w
GROUP BY word
ORDER BY word;
备注:如果有GROUP BY语句,那么SELECT语句后边的具体字段一定要和GROUP BY对应上(这一点MySQL做了优化),不支持select *。
在上边HQL语句建表之后,将hdfs路径上的docs文件导入到这张表中,然后创建一张表,并将select的数据导入到这张表中,Hive的作用就是将HQL语句转换成Mapreduce代码,交给引擎进行执行。通过MapReduce的实现可以看出,每一个reduce都是独立处理自己的任务,reduce和reduce之间处理的数据相互之间没有关联,如果涉及到order by这样一个全局排序操作,则只能有一个reduce任务,数据量大的情况下会非常耗时,第三节会介绍具体的调优方案。
三、Hive通用的技术调优方案(本节不针对于Spark引擎)
不结合实际业务的调优都是耍流氓。
导出hive配置
beeline -e 'set'> /tmp/hive
Fetch抓取(Hive可以避免进行MapReduce)
能够避免使用mr的,就尽量不要用mr,因为mr太慢了,设置方式:set hive.fetch.task.conversion=more ,表示我们的全局查找,字段查找,limit查找都不走mr
select * from employees;
select * from employees where id = 1 and name = 'zhangsan';
select id from employees;
select * from employees limit 3;
参数取值范围和含义:
none:关闭fetch task优化,所有任务都要走mr程序;
minimal:只在select *、使用分区列过滤、带有limit的语句上进行优化;
more:在minimal的基础上更加强大了,select不仅仅可以是*,还可以单独选择几列,并且filter也不再局限于分区字段,同时支持虚拟列(别名);
Fetch抓取完全不适用于聚合函数。
本地模式
hive的本地模式,即小任务模式,比如hdfs上有四个block块,每个block中有2kb数据,默认会启四个MapTask去执行,这种情况就导致了给每个MapTask分配资源的时间远大于任务执行时间,设置本地模式之后,在单台服务器上或者某些情况下在单个进程中执行Map任务,就避免了这个问题,开启本地模式命令为:
set hive.exec.mode.local.auto=true 开启本地模式,解决多个小文件输入的时候,分配资源时间超过数据的计算时间。
set hive.exec.mode.local.auto.inputbytes.max=51234560; 设置输入的数据临界值(这里设置48MB左右),如果小于这值都认为是小任务模式,启动本地模式来执行
set hive.exec.mode.local.auto.input.files.max=10; 设置输入文件个数的临界值,如果小于这个数量,那么也认为是小任务模式
表和HQL语句的优化
1.去重的优化:
select count(distinct s_id) from score;这种写法所有的去重数据都会在一个reduce当中去执行,造成数据处理比较慢,可进行优化:
select count(1) from (select s_id from score group by s_id) bysid; 这种写法,使用了一个嵌套子查询,先对数据进行group by去重,然后再进行统计
2.大SQL拆分:尽量避免大sql,可以将一个很大的sql拆成多段,分步的去执行。在产生问题的时候能方便定位具体问题。
3.空key的过滤:两张表执行join关联,on的字段可能包含多个空值的情况,造成笛卡尔乘积,导致速度变慢。
不过滤:
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id;
结果:
No rows affected (152.135 seconds)
过滤:过滤掉我们所有的为null的id,使得我们的输入数据量变少
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id;
结果:
No rows affected (141.585 seconds)
4.空key的转换:
如果规定这些空key过滤不掉,那么我们可以对空key进行转换
SELECT a.*
FROM nullidtable a
LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN 'hive' ELSE a.id END = b.id;
如果空key比较多,那么就会将大量的空key转换成 hive,那么就会遇到一个问题,数据倾斜
数据倾斜的表现形式:有一个reduce处理的数据量远远比其他reduce处理的数据量要大,造成其他的reduce数据都处理完了,这个还没处理完
5.空key的打散:
SELECT a.*
FROM nullidtable a
LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN concat('hive', rand()) ELSE a.id END = b.id;
通过将空key打散成不同的随记字符串,就可以解决我们hive的数据倾斜的问题
6.mapjoin:
hive已经开启了自动的map端的join功能,不管是我们的大表join小表,还是小表join大表,都会将我们的小表加载到内存当中来。
执行命令:set hive.auto.convert.join = true; 默认为true
实现原理:首先是启动Task A,它是一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache(分布式缓存)中。接下来是启动Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。
7.map端聚合优化:
默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。开启Map端聚合参数设置:
(1)是否在Map端进行聚合,默认为True
set hive.map.aggr=true;
(2)在Map端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;
(3)有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata = true;
当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。
8.避免笛卡尔积:任何时候都要避免笛卡尔积,避免无效的on条件,避免使用where替代on。
select from A left join B -- on A.id = B.id
-- where A.id = B.id
9.使用分区裁剪,列裁剪:
分区裁剪:如果是我们的分区表,那么查询的时候,尽量带上我们的分区条件。
列裁剪:尽量避免使用select * ,需要查询哪些列,就选择哪些列。
10.使用动态分区动态的添加数据:
分区表的数据加载有两种方式:
(1)从本地目录或者hdfs目录通过文件方式加载数据到分区表制定分区
load data local inpath '/export/servers/hivedatas/score.csv' into table score partition (month='201806'); //加载数据到单分区表
load data local inpath '/export/servers/hivedatas/score.csv' into table score2 partition(year='2018',month='06',day='01'); //加载数据到多分区表
(2)通过insert 分区表 select 具体表信息的方式插入
insert overwrite table xxx partition (month = 'xxx') select xxx
hive提供了动态分区机制,说白了就是以第一个表的分区规则,来对应第二个表的分区规则,将第一个表的所有分区,全部拷贝到第二个表中来,第二个表在加载数据的时候,不需要指定分区了,直接用第一个表的分区即可。
开启动态分区参数设置
(1)开启动态分区功能(默认true,开启)
set hive.exec.dynamic.partition=true;
(2)设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。)
set hive.exec.dynamic.partition.mode=nonstrict;
(3)在所有执行MR的节点上,最大一共可以创建多少个动态分区。
set hive.exec.max.dynamic.partitions=1000;
(4)在每个执行MR的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。
set hive.exec.max.dynamic.partitions.pernode=100
(5)整个MR Job中,最大可以创建多少个HDFS文件。
在linux系统当中,每个linux用户最多可以开启1024个进程,每一个进程最多可以打开2048个文件,即持有2048个文件句柄,下面这个值越大,就可以打开文件句柄越大
set hive.exec.max.created.files=100000;
(6)当有空分区生成时,是否抛出异常。一般不需要设置。
set hive.error.on.empty.partition=false;
案例:
创建一张原始分区表(案例中的源表):
create table ori_partitioned(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string)
PARTITIONED BY (p_time bigint) row format delimited fields terminated by '\t';
从本地文件目录加载两个文件到分区表指定分区中
load data local inpath '/export/servers/hivedatas/small_data' into table ori_partitioned partition (p_time='20111230000010');
load data local inpath '/export/servers/hivedatas/small_data' into table ori_partitioned partition (p_time='20111230000011');
创建目标分区表:
create table ori_partitioned_target(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string)
PARTITIONED BY (p_time STRING) row format delimited fields terminated by '\t';
在开启了动态分区之后,直接通过指定分区字段名称,不指定具体分区的方式动态插入数据到新的分区表中
INSERT overwrite TABLE ori_partitioned_target PARTITION (p_time)
SELECT id, time, uid, keyword, url_rank, click_num, click_url, p_time FROM ori_partitioned;
注意:如果要使用动态分区添加数据,最后一个字段一定要是我们的分区字段,如果是多个分区字段可以并列写在后边。
备注:查询分区HQL:show partitions ori_partitioned_target;
11.分桶表:分区是分文件夹,分桶是分文件,根据具体字段的值将数据分成多个文件。执行两张表join计算时,以分桶字段为on 条件字段能够优化查询。
数据倾斜
主要的点在于合理控制map个数和reduce个数,大量map任务处理多个小文件,单个map或reduce任务处理数据量过大都属于数据倾斜的表现形式,需要结合业务实际情况进行分析。需要综合分析原始输入文件数据量,数据大小,数据内容。
在map任务前可以执行小文件合并,可参考:
set mapred.max.split.size=112345600;
set mapred.min.split.size.per.node=112345600;
set mapred.min.split.size.per.rack=112345600;
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
这个参数表示执行前进行小文件合并,前面三个参数确定合并文件块的大小,大于文件块大小128m(按照实际block块大小,可能是256MB或者其他值)的,按照128m来分隔,小于128m,大于100m的,按照 100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),进行合并。
如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理 的拆分成多个,这样就可以用多个map任务去完成。可以通过如下语句对a表进行拆分:
set mapred.reduce.tasks=10;
create table a_1 as
select * from a
distribute by rand(123); -- distribute by的作用是根据表的具体字段分配到不同的reduce任务中,distribute by rand这种方式 -- 能够使数据分配均匀
这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。
reduce任务数量可参考设置:
(1)每个Reduce处理的数据量默认是256MB
hive.exec.reducers.bytes.per.reducer=256123456
(2)每个任务最大的reduce数,默认为1009
hive.exec.reducers.max=1009
(3)计算reducer数的公式
N=min(参数2,总输入数据量/参数1)
使用EXPLAIN(执行计划)
基本语法:EXPLAIN[EXTENDED | DEPENDENCY | AUTHORIZATION] query
EXPLAIN关键字之后可以接中括号中的任意一个关键字,后边接具体的HQL语句(query参数)
(1)查看下面这条语句的执行计划
hive (default)> explain select * from course;
hive (default)> explain select s_id ,avg(s_score) avgscore from score group by s_id;
(2)查看详细执行计划
hive (default)> explain extended select * from course;
hive (default)> explain extended select s_id ,avg(s_score) avgscore from score group by s_id;
详细执行信息会以目录树的格式进行输出。
并行执行
Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段,或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么job可能就越快完成。通过设置参数hive.exec.parallel值为true,就可以开启并发执行。在共享集群中,需要注意下,如果job中并行阶段增多,那么集群利用率就会增加。
执行命令:
set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。
严格模式
设置严格模式会严格限制以下三种类型的HQL语句执行:
1、分区表查询需要带上分区字段。
2、order by 必须使用limit。
3、笛卡尔积不能执行。主要体现在:JOIN语句必须正确使用ON关键字,不能不加ON或者使用where替代。
执行命令:hive.mapred.mode=strict,命令默认值是nonstrict。
JVM重用
JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。
我们的container的里面的任务执行完成之后,不要马上释放资源,留着资源给下一个任务执行。比如在同一个yarn节点上,要执行同一个job的map任务和reduce任务,reduce任务需要等待map任务执行完成,那么在给map任务分配的container容器执行完map任务之后,可以不释放资源,而是留给接下来的reduce任务继续执行。这样就避免了同时分配两个container,同时reduce等待map执行的情况。执行命令:
set mapred.job.reuse.jvm.num.tasks=10;
这个功能的缺点是,开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。
推测执行
在分布式集群环境下,因为程序Bug(包括Hadoop本身的bug),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。为了避免这种情况发生,Hadoop采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。关于调优这些推测执行变量,还很难给一个具体的建议。如果用户对于运行时的偏差非常敏感的话,那么可以将这些功能关闭掉。如果用户因为输入数据量很大而需要执行长时间的map或者Reduce task的话,那么启动推测执行造成的浪费是非常巨大大。maptask的推测执行以及reducetask的推测执行命令如下:
一般都直接关闭maptask以及reducetask的推测执行
set mapreduce.map.speculative=false; //关闭map端的推测执行,设置为true为开启
set mapreduce.reduce.speculative=false; //关闭reduce端的推测执行,设置为true为开启
压缩
压缩:snappy 源始数据存储:TextFile
处理之后的数据存储:ORC,PARQUET
其他调优参考
MRS官网:https://support.huaweicloud.com/cmpntguide-mrs/mrs_01_0983.html
阿里云博客园:https://www.cnblogs.com/SpeakSoftlyLove/p/6063908.html
标签:set,分区,reduce,hive,学习,任务,调优,id From: https://www.cnblogs.com/duoduomu/p/17428369.html