首页 > 数据库 >SparkSQL(二)

SparkSQL(二)

时间:2022-10-27 20:57:29浏览次数:33  
标签:-- hive metastore SparkSQL Spark spark select

【理解】SparkSQL执行流程

  • 接收到查询,既可以是SQL语句,也可以是DSL语法,以一个SQL语句为例:

    1、Parser,第三方类库Antlr实现。将sql字符串切分成Token,根据语义规则解析成一颗AST语法树,称为Unresolved Logical Plan;

    如果没有语法错误,则解析成下面的语法树。否则返回语法错误信息。

    简单来说就是判断SQL语句是否符合规范,比如select from where 这些关键字是否写对。就算表名字段名写错也无所谓。

    2、Unresolved Logical Plan经过Analyzer,借助于表的真实数据元数据,比如元数据被委托存在第三方mysql某个库比如叫hive3库下面的TBL表,columns_v2表中,进行数据类型绑定和函数绑定,解析为resolved Logical Plan;

    下面为字段编号,防止同名字段冲突。

    简单来说就是判断SQL语句的表名,字段名是否真的在元数据库里存在。

    3、Optimizer,基于各种优化规则(【谓词下推】,【列值裁剪】,【常量折叠】), 将上面的resolved Logical Plan进一步转换为语法树Optimized Logical Plan。这个过程称作基于规则的优化(Rule Based Optimizer) 【RBO】。

    简单来说就是把SQL调整一下,以便跑得更快。

    4、query planner,基于planning,将逻辑计划转换成多个物理计划,再根据代价模型cost model,哪个耗时最少,耗内存最小,筛选出代价最小的物理计划。这个过程称之为【CBO】(Cost Based Optimizer)
    上面2-3-4步骤合起来,就是【Catalyst】优化器。
    5、最后依据最优的物理计划,将SQL转换为RDD操作,再划分为DAG,再将DAG的stage的task发送到WorkerNode的Executor的Core上执行。。

  • 2大优化

    • RBO:基于规则的优化,比如【常量折叠】,【谓词下推】,【列裁剪】。

      • 常量折叠举例

        • select 1+1 as id 
            from table1
          
        • 上面的会优化为

          select 2 as id 
            from table1
          
        • 会提前将【1+1】计算(折叠)成【2】,再赋给id列的每行,不用每行都计算一次1+1

      • 谓词下推举例

        • 大白话讲:在join前,分别过滤数据量

        • select * 
          from table1 a
          join table2 b on a.id=b.id
          where a.age>20
          and b.cid=1
          
        • 会优化为

          select * 
          from (select * from table1 where age>20) a
          join (select * from table2 where  b.cid=1) b on a.id=b.id
          
          
        • 在子查询阶段就提前将数据进行【过滤】,后期join的数据量就大大【减少】。

      • 列裁剪举例

        • select a.name,
                 a.age,
                 b.cid
          from ( select * from table1 where a.age>20) a
          join ( select * from table2 where b.cid=1) b on a.id=b.id
          
        • 会优化为

          select a.name,
                 a.age,
                 b.cid
          from ( select id,name,age from table1 where a.age>20) a
          join ( select id,cid from table2 where b.cid=1) b on a.id=b.id
          
        • 提前将需要的列查询出来,其他不需要的列【裁减掉】。

    • CBO:多种物理计划基于cost model,选取最优的执行耗时最少的那个物理计划

  • 查看计划(逻辑计划+物理计划)

    • DSL方式

      df_x=spark.sql('select * from temp_t')
      df_x.explain(True)
      
      
    • SQL方式

      spark.sql('explain extended select * from temp_t').show(truncate=False)
      
      

【理解】Spark On Hive

在Linux的Spark中集成hive

  • 为什么要集成?

    • Spark加载的数据源中,就离线场景来讲,只有hive表数据是最有价值的。
    • 因为在Linux中的Spark安装包,默认是不能直接读取hive的表的,需要集成hive才能读取hive的库和表。
  • 怎么做?

    • 如果没配置hive的环境变量,最好配置一下

      vim /etc/profile,加上

      export HIVE_HOME=/export/server/hive
      export PATH=$HIVE_HOME/bin:$PATH
      
      

      source /etc/profile

    • 启动hive的metastore

      nohup /export/server/hive/bin/hive --service metastore  2>&1 > /tmp/hive-metastore.log &
      
      
    • 将metastore的进程端口号和warehouse所在的HDFS路径告诉给Spark

      • 在spark/conf/的hive-site.xml文件中(可以从hive/conf/hive-site.xml文件拷贝过来即可),里面需要包括下面的内容

            <!-- 默认数仓的路径 -->
            <!-- spark保存数据的路径的配置名叫spark.sql.warehouse.dir
             如果SparkSQL找到了hive.metastore.warehouse.dir,那么
             就用hive.metastore.warehouse.dir的值作为
             spark.sql.warehouse.dir
             如果找不到hive.metastore.warehouse.dir配置,就用默认的路径名
             /root/spark-warehouse/
             -->
        
            <property>
            	<name>hive.metastore.warehouse.dir</name>
            	<value>hdfs://node1:8020/user/hive/warehouse/</value>
            </property>
            <!-- 远程模式部署metastore metastore地址 -->
            <property>
            	<name>hive.metastore.uris</name>
            	<value>thrift://node1:9083</value>
            </property>
        
        
        
  • 如何验证?

    • 在hive客户端建库建表,然后再spark-sql中也能查到

      进入/export/server/hive/bin/hive
      create database test_db;
      use test_db;
      create table test_table(id int,name string);
      insert into test_table values(1,'zs');
      
    • 进入/export/server/spark/bin/spark-sql界面

      • 进入/export/server/spark/bin/spark-sql
        
        show databases;--查询的是hive的库
        show tables;--查询的hive的表
        
        
    • 或者进入pyspark会话

      /export/server/spark/bin/pyspark

      spark.sql('show databases').show()
      spark.sql('show tables').show()
      

在spark代码中集成HIVE

  • 在SparkSession中写上3句话

            .config('hive.metastore.warehouse.dir','hdfs:///user/hive/warehouse/')\
            .config('hive.metastore.uris','thrift://node1:9083')\
            .enableHiveSupport()\
    
  • 测试代码

    import os
    
    from pyspark.sql import SparkSession
    from pyspark.sql.types import *
    
    os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
    os.environ['SPARK_HOME'] = '/export/server/spark'
    PYSPARK_PYTHON = "/root/anaconda3/bin/python"
    # 当存在多个版本时,不指定很可能会导致出错
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
    if __name__ == '__main__':
        # 1-将HDFS真实数据的地址告诉给Spark
        # 2-将hive的元数据地址告诉给Spark
        # 3-开启hive的支持
        spark=SparkSession\
            .builder\
            .appName('test')\
            .master('local[*]')\
            .config('hive.metastore.warehouse.dir','hdfs:///user/hive/warehouse/')\
            .config('hive.metastore.uris','thrift://node1:9083')\
            .enableHiveSupport()\
            .getOrCreate()
        #上面做了spark集成hive之后,下面查询的默认就是hive的库和表。
        spark.sql('show databases').show()
        spark.sql('use bigdata').show()
        spark.sql('select * from test1').show()
        spark.stop()
    
    
    

【了解】Spark的分布式JDBC服务

  • 本章节的前提是先把《Linux的Spark中集成hive章节》集成成功。再做下面的事。

  • Spark Thriftserver与hiveserver2、metastore、beeline等的关系

    • 注意上面红色的hiveserver2架构和绿色的Spark Thriftserver架构中间有一些东西是重叠共用的。

  • 需要启动hive的metastore服务

    nohup /export/server/hive/bin/hive --service metastore  2>&1 > /tmp/hive-metastore.log &
    

    metastore进程会占用【9083】端口

  • Spark Thriftserver【不需要】启动hiveserver2进程。

  • 启动Spark的thriftserver服务

    • 注意不要选取10000端口,因为10000端口通常作为【hiveserver2】进程的端口。比如选用10001

    • 如果在企业中可以申请很多资源。比如向yarn申请200C:

      /export/server/spark/sbin/start-thriftserver.sh \
      --hiveconf hive.server2.thrift.port=10001 \
      --hiveconf hive.server2.thrift.bind.host=node1 \
      --master yarn \
      --deploy-mode cluster \
      --num-executors 50 \
      --executor-cores 4 \
      --executor-memory 12G \
      
      
    • 上面的作用

      • 在实际大数据分析项目中,使用SparkSQL时,往往启动一个【Spark的thriftserver】服务,分配较多资源(Executor数目和内存、CPU),不同的用户启动客户端比如【beeline】连接他,编写SQL语句分析数据。方便集中管理。
    • 此处如果自测练习,就少一点:

      SPARK_HOME=/export/server/spark
      $SPARK_HOME/sbin/start-thriftserver.sh \
      --hiveconf hive.server2.thrift.port=10001 \
      --hiveconf hive.server2.thrift.bind.host=node1 \
      --master local[*]
      
      
  • 启动客户端beeline去连接上面的Spark Thriftserver

    • 进入beeline客户端:

      输入beeline
      !connect jdbc:hive2://node1:10001
      Enter username for jdbc:hive2://node1:10001: root
      Enter password for jdbc:hive2://node1:10001: 123456
      
      
    • 连接数据库需要用户名和密码,就是Linux的用户名和密码。

    • 【了解】如果不输入用户名和密码,则是以匿名用户anonymous登录的,后期有时写表数据时会权限不足。

    • 最好用root和密码登录,不会有权限限制问题。

  • 在Pycharm中配置客户端连接Spark的thriftserver服务

    • 见资料的《如何在IDEA(或Pycharm)中配置Hive和SparkSQL数据源.doc》
  • 【了解】如何查看mestore进程是否占用了9083端口号?

    • 1-先 jps -m | grep metastore
    • 2-再【netstat -antp | grep 进程号】 ,就会返回端口号。

标签:--,hive,metastore,SparkSQL,Spark,spark,select
From: https://www.cnblogs.com/nanguyhz/p/16833684.html

相关文章

  • SparkSQL
    DataFrame创建DataFrame1.转换为DataFrame方式1将RDD[元组或列表]转换为DataFrame定义RDD,每个元素是Row类型将上面的RDD[Row]转换成DataFrame,df=spark.createDat......
  • SparkSQL
    DataFrameDataFrame是一种以RDD为基础的分布式数据集,类似于二维表格。与RDD的区别在于,前者带有schema元信息,即DataFrame。DataFrame也是懒执行的,但性能上比......
  • SparkSQL参数
    SparkSQL参数<1>表分区类参数--是否允许动态生成分区sethive.exec.dynamic.partition=true;--是否容忍指定分区全部动态生成sethive.exec.dynamic.partition.mode=......
  • SparkSQL on K8s 在网易传媒的落地实践
    随着云原生技术的发展和成熟,大数据基础设施积极拥抱云原生是业内发展的一大趋势。网易传媒在2021年成功将SparkSQL部署到了K8s集群,并实现与部分在线业务的混合部署,......
  • (4)SparkSQL中如何定义UDF和使用UDF
    SparkSQL中用户自定义函数,用法和SparkSQL中的内置函数类似;是saprkSQL中内置函数无法满足要求,用户根据业务需求自定义的函数。首先定义一个UDF函数:packagecom.udf;import......
  • 关于sparksql调优的一些操作
    1、查看执行计划 1、直接sql查看:explainselect...from... 2、ds.explain()2、执行计划的处理流程 sql代码->未决断的逻辑执行计划->根据元数据生成已决断的逻辑......
  • sparksql 优化
    最近把spark文档里面配置那一页看了一下,在这记录一些可用的配置,免得后续再去查文档地址:https://spark.apache.org/docs/3.0.1/configuration.htmlSpark文档运行环境......
  • sparksql 函数大全
    数学函数函数简介用法acosh反双曲余弦值SELECTacosh(0.5);0.9624236501192069SELECTacosh(3.5);1.9248473002384139asinh反双曲正弦SELECTasinh(1.45);......
  • sparksql概念补充
    Spark-sql概念补充基本概念        SparkSQL是基于RDD的,可以通过Schema信息来访问其中某个字段        RDD处理的不是结构化数据,所以不能进行类似HIve......
  • 创建SparkSQL的项目
    创建项目方式和前面一样pom依赖不一样无需导入spark_core包,因为spark_sql中包含了spark_corepom.xml文件<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="h......