首页 > 其他分享 >hive04_DQL操作

hive04_DQL操作

时间:2024-08-08 11:05:33浏览次数:6  
标签:HiveException throws Reduce public DQL hive04 操作 order id

注意点

全局排序 OrderBy

SELECT <select_expression>, <select_expression>, ...
    FROM <table_name>
    ORDER BY <col_name> [ASC|DESC] [,col_name [ASC|DESC], ...]
  • Hive 中使用全局排序时,会将所有数据交给一个 Reduce 任务进行计算,实现查询结果的全局排序。所以数据量大的情况下会耗费大量的时间。
  • Hive 适用于离线处理,执行全量计算任务时,一般不会用到全局排序。如果涉及到全局排序场景,需要将 Hive 处理后的数据存放到快速查询的产品中,比如 Presto、Impala、ClickHouse 等等。
  • 数据处理过程中的全局排序,最好使用 UDF 转换为局部排序。
    • 先预估数据的范围,将数据划分为多个批次;
    • 每个批次会分发到一个 Reducer 执行任务,然后在每个 Reduce 作业中进行局部排序。
    • 一般不涉及到全局排序,可以先通过子查询减小查询范围,然后再排序。

如果是 TOPN 的情况,先用子查询对每个 Reducer 排序,然后取前 N 个数据,最后对结果集进行全局排序。

select t.id, t.name from 
(
    select id, name from <table_name>
    distributed by length(name) sort by length(name) desc limit 10
) t
order by length(t.user_name) desc limit 10;

局部排序 SortBy

SELECT <select_expression>, <select_expression>, ...
    FROM <table_name>
    SORT BY <col_name> [ASC|DESC] [,col_name [ASC|DESC], ...]

局部排序操作,Hive 会在每个 Reduce 任务中对数据进行排序,当启动多个 Reduce 任务时,OrderBy 输出一个文件,SortBy 输出多个文件且局部有序。

聚合操作 GroupBy、DistributeBy、ClusterBy

  • GroupBy:按照某些字段的值进行分组,在底层 MapReduce 执行过程中,同一组的数据会发送到同一个 Reduce 任务中,意味着每个 Reduce 会包含多组数据,同一组的数据会单独进行聚合运算。

可以配置 Reducer 数量 mapred.reduce.tasks,或者配置 hive.groupby.skewindata=true 来优化数据倾斜问题。

select col1, [col2], count(1), sel_expr(聚合操作) from table
where condition                  -- Map端执行
group by col1 [,col2]            -- Reduce端执行
[having]                         -- Reduce端执行

img

  • DistributeBy:通过哈希取模的方式,将列值相同的数据发送到同一个 Reducer 任务,只是单纯的分散数据,不执行其他操作。
SELECT <select_expression>, <select_expression>, ...
    FROM <table_name>
    DISTRIBUTE BY <col_list>
    [SORT BY <col_name> [ASC|DESC] [, col_name [ASC|DESC], ...] ]

DistrubuteBy 通常和 SortBy 一起使用,实现先聚合后排序。并且可以指定升序 ASC 还是降序 DESC,但 DistributeBy 必须在 SortBy 之前。

  • ClusterBy:把相同值的数据聚合到一起并且排序,效果等价于 distribute by col sort by col
SELECT <select_expression>, <select_expression>, ...
    FROM <table_name>
    CLUSTER BY <col_list>

ClusterBy 没有 DistributeBy 那么灵活,并且不能自定义排序,当 DistributeBy 和 SortBy 列完全相同且按照升序排序时,等价于执行 ClusterBy。

Join优化

MySQL Join 优化

https://blog.csdn.net/norminv/article/details/108020102

MySQL JOIN 都是通过循环嵌套的方式实现,用小表驱动大表减少多次连接操作带来的性能开销。

  • left join:小表 left join 大表;
  • right join:大表 right join 小表;
  • 用子查询代替 JOIN 减少驱动表扫描行数。

举个例子,如下用子查询优化示例:

select
  o.no,s_order.no,sum(s_item.count),sum(after_sale_item.count)
  from
  buyer_order o
  left join seller_order s_order on o.id = s_order.buyer_order_id
  left join seller_order_item s_item on s_order.id = s_item.seller_order_id
  left join seller_order_after_sale after_sale on s_order.id = after_sale.seller_order_id
  left join seller_order_after_sale_item after_sale_item on after_sale.id = after_sale_item.after_sale_id
where o.add_time >='2019-05-01'
group by
  o.id,s_order.id
order by
  o.id
limit 0,10

用子查询优化后:

select
  o.id,o.no,s_order.no,
  (select sum(sot.count) from seller_order so
    left join seller_order_item sot on so.id = sot.seller_order_id
        where so.id =s_order.id ),
  (select sum(osat.count) from seller_order_after_sale osa
    left join seller_order_after_sale_item osat on osa.id = osat.after_sale_id
        where osa.seller_order_id = s_order.id )
  from
  buyer_order o
  left join seller_order s_order on o.id = s_order.buyer_order_id
where o.addTime >='2019-05-01'
order by
  o.id
limit 0,10
  • 通过子查询减少了 left join 次数,从而减少驱动表的数据量;
  • 减少了 groupby 的使用,方案一中先分组再取后 10 条;方案二先取后 10 条再执行聚合操作,效率更高。

SteamTable

Hive 执行 Join 操作时,默认会将前面的表直接加载进缓存,后一张表进行 stream 处理,即 shuffle 操作。这样可以减少 shuffle 过程,因为直接加载到缓存中的表,只需要等待后面 stream表的数据,不需要进行 shuffle。

使用时通过声明 /*+ STREAMTABLE(xxx) */ 来定义 stream 表:

SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)

MapJoin

MapJoin 即将小表直接加载到 Map 作业中,减少 shuffle 开销。

SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a JOIN b ON a.key=b.key

数据倾斜

数据倾斜主要表现在,执行 map/reduce 时 reduce 大部分节点执行完毕,但是有一个或几个 reduce 节点运行很慢,导致程序整体处理时间很长;

数据倾斜发生原因:

  • join:使用 join 关键字处理的问题;
    • 小表驱动大表,但是 key 比较集中导致分发到某个 Reduce 上的数据远高于平均值;
    • 大表驱动大表,但是分桶判断字段空值过多,空值由一个 Reduce 处理;
  • group by:先分派后聚合,某个 Reduce 处理耗时很长;
  • count distinct:特殊值过多。

解决方案:

参数调节

  • Map 端部分聚合:配置 hive.map.aggr=true
  • 数据倾斜时进行负载均衡:配置 hive.groupby.skewindata=true;它生成的查询计划有两个 MR Job,一个 MR Job 会将 Map 的结果随机分不到 Reduce 中;另外一个 MR Job 则根据预处理结果按照 Key 值相同分布到同一个 Reduce 中,最后完成聚合操作。

SQL调节

  • 大小表Join:MapJoin 让小表先进内存,在 Map 端完成 Reduce 操作,减少 shuffle;
  • 大表大表Join:将空值变成字符串加上随机数,将倾斜数据分散到不同 Reduce,避免零值/空值分布到同一个 Reduce 导致倾斜;

用户自定义函数

Hive 除了支持内置函数外,还允许用户自定义函数来扩充函数的功能;
UDF 对每一行数据处理,输出一行数据;
UDAF 对多行数据处理,最终输出一行数据,一般用于聚合操作;
UDTF 对一行数据处理,输出多个结果,比如将一行字符串按照某个字符拆分后进行存储,表的行数会增加。

创建函数:

-- 临时创建
ADD JARS[S] <local_hdfs_path>;
CREATE TEMPORARY FUNCTION <function_name> AS <class_name>;
DROP TEMPORARY FUNCTION <function_name>;

--- 永久创建
CREATE PERMANENT FUNCTION <function_name> AS <class_name> [USING JAR|FILE <file_uri>];
DROP PERMANENT FUNCTION <function_name>;

UDF

实现方式有两种:继承UDF、继承 GenericUDF,其中 GenericUDF 处理起来更加灵活。

继承 GenericUDF 的步骤:

  • initialize 方法,检查输入数据并初始化;
  • evaluate 方法,执行数据处理过程,返回最终结果;
  • getDisplayString 方法,定义 explain 的返回内容。
@org.apache.hadoop.hive.ql.exec.Description(name = "AvgScore",
        extended = "示例:select AvgScore(score) from src;",
        value = "_FUNC_(col)-对Map类型保存的学生成绩进行平均值计算")
public class AvgScore extends GenericUDF {
    
    // 检查输入数据,初始化输入数据
    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {…}
    
    // 数据处理,返回最终结果
    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {…}
    
    // 函数执行 HQL Explain 展示的字符串内容
    @Override
    public String getDisplayString(String[] strings) {…}
}
  • 然后将函数打包为 jar 上传到服务器对应路径 /xxxx/xxx/xxx.jar
  • 将 jar 包添加到 hive 的 classpath:add jar /xxxx/xxx/xxx.jar
  • 创建临时函数与开发好的 java class 关联:create temporary function func_name as "xxxx.xxx.xxx.MyUDF"
  • hql 中使用临时函数:select func_name(col) from src;

UDAF

实现方式:继承 UDAF、AbstractGenericUDAFResolver,其中 AbstractGenericUDAFResolver 更加灵活。

使用 AbstractGenericUDAFResolver 的步骤:

  • 继承 AbstractAggregationBuffer,来保存中间结果;
  • 继承 GenericUDAFEvaluator,实现 UDAF 处理流程;
  • 继承 AbstractGenericUDAFResolver,注册 UDAF。
// UDAF 注册函数
public class FieldLength extends AbstractGenericUDAFResolver {
    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        return super.getEvaluator(info);
    }
}


// 保存中间结果
class FieldLengthAggregationBuffer extends GenericUDAFEvaluator.AbstractAggregationBuffer {
    private Integer value = 0;

    public Integer getValue() {
        return value;
    }

    public void setValue(Integer value) {
        this.value = value;
    }

    @Override
    public int estimate() {
        return JavaDataModel.PRIMITIVES1;
    }

    public void add(int addVal) {
        synchronized (value) {
            value += addVal;
        }
    }
}


// 数据处理函数
class FieldLengthUDAFEvaluator extends GenericUDAFEvaluator {
    // 输入
    private PrimitiveObjectInspector inputOI;
    // 输出
    private ObjectInspector outputOI;
    // 前一个阶段输出
    private PrimitiveObjectInspector integerOI;

    /**
     * 数据校验、数据初始化
     * 由于 UDAF 会执行 Map、Reduce 两个阶段任务,所以根据 Mode.xxx 区分具体阶段
     * @param m
     * @param parameters
     * @return
     * @throws HiveException
     */
    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
        super.init(m, parameters);
        // Map 阶段,输入为原始数据
        if (Mode.PARTIAL1.equals(m) || Mode.COMPLETE.equals(m)) {
            inputOI = (PrimitiveObjectInspector) parameters[0];
        } else {
            // combiner、redeuce 阶段基于前一个阶段的返回值作为输入
            integerOI = (PrimitiveObjectInspector) parameters[0];
        }
        // 指定输出类型
        outputOI = ObjectInspectorFactory.getReflectionObjectInspector(
                Integer.class,
                ObjectInspectorFactory.ObjectInspectorOptions.JAVA
        );
        return outputOI;
    }

    /**
     * 获取中间存放结果对象
     * @return
     * @throws HiveException
     */
    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
        return new FieldLengthAggregationBuffer();
    }

    /**
     * 重置中间结果
     * @param aggregationBuffer
     * @throws HiveException
     */
    @Override
    public void reset(AggregationBuffer aggregationBuffer) throws HiveException {
        ((FieldLengthAggregationBuffer)aggregationBuffer).setValue(0);
    }

    /**
     * Map 阶段
     * @param aggregationBuffer
     * @param objects
     * @throws HiveException
     */
    @Override
    public void iterate(AggregationBuffer aggregationBuffer, Object[] objects) throws HiveException {
        if (objects == null || objects.length < 1) {
            return;
        }
        Object javaobj = inputOI.getPrimitiveJavaObject(objects[0]);
        ((FieldLengthAggregationBuffer)aggregationBuffer).add(String.valueOf(javaobj).length());
    }

    /**
     * 返回 Map、Combiner 阶段结果
     * @param aggregationBuffer
     * @return
     * @throws HiveException
     */
    @Override
    public Object terminatePartial(AggregationBuffer aggregationBuffer) throws HiveException {
        return terminate(aggregationBuffer);
    }

    /**
     * Reduce 阶段
     * @param aggregationBuffer
     * @param o
     * @throws HiveException
     */
    @Override
    public void merge(AggregationBuffer aggregationBuffer, Object o) throws HiveException {
        ((FieldLengthAggregationBuffer) agg).add((Integer)integerOI.getPrimitiveJavaObject(partial));
    }

    /**
     * 返回最终结果
     * @param aggregationBuffer
     * @return
     * @throws HiveException
     */
    @Override
    public Object terminate(AggregationBuffer aggregationBuffer) throws HiveException {
        return ((FieldLengthAggregationBuffer)aggregationBuffer).getValue();
    }
}

UDTF

继承 GenericUDTF 类,并重写 initialize、process、close 方法:

  • initialize:初始化返回值类型;
  • process:具体数据处理过程;
  • close:清理收尾工作;
  • forward:传递输出给收集器;
public class JsonParser extends GenericUDTF {
    private PrimitiveObjectInspector stringOI = null;

    // 输入数据解析,初始化
    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
        if (argOIs.length != 1) {
            throw new UDFArgumentException("take only one argument");
        }
        // 输入必须为 PRIMITIVE 类型,且具体类型必须为 String
        if (argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE &&
                ((PrimitiveObjectInspector)argOIs[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            throw new UDFArgumentException("take only one string argument");
        }
        // 初始化输入
        stringOI = (PrimitiveObjectInspector) argOIs[0];
        // 定义输出类型
        List<String> fieldNames = new ArrayList<String>();
        List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        fieldNames.add("name");
        fieldNames.add("value");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    private ArrayList<Object[]> parseInputRecord(String feature) {
        ArrayList<Object[]> resultList = null;
        //......
        //.....
        return resultList;
    }

    @Override
    public void process(Object[] objects) throws HiveException {
        final String feature = stringOI.getPrimitiveJavaObject(objects[0]).toString();
        ArrayList<Object[]> results = parseInputRecord(feature);
        Iterator<Object[]> it = results.iterator();
        while (it.hasNext()) {
            Object[] strs = it.next();
            // 结果 Key-Value 传递给收集器
            forward(strs);
        }
    }

    @Override
    public void close() throws HiveException {

    }
}

标签:HiveException,throws,Reduce,public,DQL,hive04,操作,order,id
From: https://www.cnblogs.com/istitches/p/18348540

相关文章

  • 如何把Connection 封装到工具类里面 调用工具类方法实现 增删改查操作 java JDBC
    如何把Connection封装到工具类里面调用工具类方法实现增删改查操作javaJDBC使用数据库连接池以HikariCP为例在JDBC中,使用数据库连接池是一个常见的做法,以提高数据库操作的效率和性能。连接池管理着一组数据库连接,这些连接可以被重用而不是每次需要时都创建新的连接。......
  • 如何把Connection 封装到工具类里面 调用工具类方法实现 增删改查操作 java JDBC使用
    如何把Connection封装到工具类里面调用工具类方法实现增删改查操作javaJDBC使用C3P0数据库连接池答:当使用C3P0作为数据库连接池时,你可以按照类似的模式来配置和使用它。以下是一个示例,展示了如何在Java项目中配置C3P0连接池,并创建一个工具类来管理数据库连接和执行基本的......
  • 文件目录的相关操作
    opendir函数包含头文件:确保在代码中包含必要的头文件。#include<stdio.h>#include<dirent.h>#include<sys/types.h>打开目录:使用opendir函数打开目录。该函数返回一个指向DIR类型的指针。DIR*dir=opendir("/path/to/directory");if(dir==NULL){......
  • 介绍一款新奇的开源操作系统:GodoOS
    在快节奏的现代办公环境中,一款高效、集成化的操作系统无疑是提升工作效率的利器。今天,我们要为您隆重介绍——GodoOS,一款专为内网办公环境设计的全能操作系统。它不仅仅是一个工具,更是您团队协作与文件管理的得力助手,将彻底改变您的工作方式,带来前所未有的便捷体验! 【全能......
  • 嵌入式实时操作系统(RT-Thread、FreeRTOS、UCOSIII)
    实时操作系统(RT-Thread、FreeRTOS、UCOSIII)文章目录`实时操作系统(RT-Thread、FreeRTOS、UCOSIII)``专有名词概念``1、什么是嵌入式``嵌入式系统的特点``2、什么是实时``3、什么是操作系统``操作系统主要功能和特性``常见的操作系统类型包括``4、嵌入式实时操作系统``关......
  • 使用触发器来审计表的DML、DDL操作
    最近帮客户排查某问题时,因为怀疑应用对某张配置表有变更,所以需要对这张表的所有操作进行审计。原本Oracle对某张表的审计是非常方便的,一条命令就可以实现,也不需要费心自定义审计表。--启用对表DEPT的插入、更新和删除操作的审计AUDITINSERT,UPDATE,DELETEONDEPTBYACCE......
  • Java数据结构 | 二叉树基础及基本操作
    二叉树一、树型结构1.1树的概念1.2关于树的一些常用概念(很重要!!!)1.3树的表示形式1.4树的应用二、二叉树2.1二叉树的概念2.2两种特殊的二叉树2.3二叉树的性质2.4二叉树的存储2.5二叉树的基本操作2.5.1代码说明2.5.2二叉树的遍历2.5.3二叉树的基本操作1、获取树......
  • 使用触发器来审计表的DML、DDL操作
    最近帮客户排查某问题时,因为怀疑应用对某张配置表有变更,所以需要对这张表的所有操作进行审计。原本Oracle对某张表的审计是非常方便的,一条命令就可以实现,也不需要费心自定义审计表。--启用对表DEPT的插入、更新和删除操作的审计AUDITINSERT,UPDATE,DELETEONDEPTBYACCE......
  • ShardingSphere之ShardingProxy实战操作、分布式事务
    文章目录简介基础使用部署ShardingProxy配置分库分表策略分布式事务机制介绍XA事务Demo使用另外两种XA事务管理器简介ShardingSphere的两个核心产品分别为ShardingJDBC和ShardingProxy。前文已经详细介绍了ShardingJDBC的具体使用,接下来介绍服务端的分库分表Shar......
  • 泳泳馆押金原路退回系统,一键操作秒到账 押金+手牌+电子押金单
     一、游泳馆手牌收押金必要性游泳馆手牌收押金有以下必要性:1.防止手牌丢失:手牌是顾客在游泳馆内存储个人物品和进出更衣室的重要凭证。收押金可以让顾客更加重视手牌,降低丢失的概率。比如说,有的顾客可能会因为粗心大意随手放置手牌,如果没有押金的约束,可能就不会太在意寻找......