【理解】SparkSQL执行流程
-
接收到查询,既可以是SQL语句,也可以是DSL语法,以一个SQL语句为例:
1、Parser,第三方类库Antlr实现。将sql字符串切分成Token,根据语义规则解析成一颗AST语法树,称为Unresolved Logical Plan;
如果没有语法错误,则解析成下面的语法树。否则返回语法错误信息。
简单来说就是判断SQL语句是否符合规范,比如select from where 这些关键字是否写对。就算表名字段名写错也无所谓。
2、Unresolved Logical Plan经过Analyzer,借助于表的真实数据元数据,比如元数据被委托存在第三方mysql某个库比如叫hive3库下面的TBL表,columns_v2表中,进行数据类型绑定和函数绑定,解析为resolved Logical Plan;
下面为字段编号,防止同名字段冲突。
简单来说就是判断SQL语句的表名,字段名是否真的在元数据库里存在。
3、Optimizer,基于各种优化规则(【谓词下推】,【列值裁剪】,【常量折叠】), 将上面的resolved Logical Plan进一步转换为语法树Optimized Logical Plan。这个过程称作基于规则的优化(Rule Based Optimizer) 【RBO】。
简单来说就是把SQL调整一下,以便跑得更快。
4、query planner,基于planning,将逻辑计划转换成多个物理计划,再根据代价模型cost model,哪个耗时最少,耗内存最小,筛选出代价最小的物理计划。这个过程称之为【CBO】(Cost Based Optimizer)
上面2-3-4步骤合起来,就是【Catalyst】优化器。
5、最后依据最优的物理计划,将SQL转换为RDD操作,再划分为DAG,再将DAG的stage的task发送到WorkerNode的Executor的Core上执行。。 -
2大优化
-
RBO:基于规则的优化,比如【常量折叠】,【谓词下推】,【列裁剪】。
-
常量折叠举例
-
select 1+1 as id from table1
-
上面的会优化为
select 2 as id from table1
-
会提前将【1+1】计算(折叠)成【2】,再赋给id列的每行,不用每行都计算一次1+1
-
-
谓词下推举例
-
大白话讲:在join前,分别过滤数据量
-
select * from table1 a join table2 b on a.id=b.id where a.age>20 and b.cid=1
-
会优化为
select * from (select * from table1 where age>20) a join (select * from table2 where b.cid=1) b on a.id=b.id
-
在子查询阶段就提前将数据进行【过滤】,后期join的数据量就大大【减少】。
-
-
列裁剪举例
-
select a.name, a.age, b.cid from ( select * from table1 where a.age>20) a join ( select * from table2 where b.cid=1) b on a.id=b.id
-
会优化为
select a.name, a.age, b.cid from ( select id,name,age from table1 where a.age>20) a join ( select id,cid from table2 where b.cid=1) b on a.id=b.id
-
提前将需要的列查询出来,其他不需要的列【裁减掉】。
-
-
-
CBO:多种物理计划基于cost model,选取最优的执行耗时最少的那个物理计划
-
-
查看计划(逻辑计划+物理计划)
-
DSL方式
df_x=spark.sql('select * from temp_t') df_x.explain(True)
-
SQL方式
spark.sql('explain extended select * from temp_t').show(truncate=False)
-
【理解】Spark On Hive
在Linux的Spark中集成hive
-
为什么要集成?
- Spark加载的数据源中,就离线场景来讲,只有hive表数据是最有价值的。
- 因为在Linux中的Spark安装包,默认是不能直接读取hive的表的,需要集成hive才能读取hive的库和表。
-
怎么做?
-
如果没配置hive的环境变量,最好配置一下
vim /etc/profile,加上
export HIVE_HOME=/export/server/hive export PATH=$HIVE_HOME/bin:$PATH
source /etc/profile
-
启动hive的metastore
nohup /export/server/hive/bin/hive --service metastore 2>&1 > /tmp/hive-metastore.log &
-
将metastore的进程端口号和warehouse所在的HDFS路径告诉给Spark
-
在spark/conf/的hive-site.xml文件中(可以从hive/conf/hive-site.xml文件拷贝过来即可),里面需要包括下面的内容
<!-- 默认数仓的路径 --> <!-- spark保存数据的路径的配置名叫spark.sql.warehouse.dir 如果SparkSQL找到了hive.metastore.warehouse.dir,那么 就用hive.metastore.warehouse.dir的值作为 spark.sql.warehouse.dir 如果找不到hive.metastore.warehouse.dir配置,就用默认的路径名 /root/spark-warehouse/ --> <property> <name>hive.metastore.warehouse.dir</name> <value>hdfs://node1:8020/user/hive/warehouse/</value> </property> <!-- 远程模式部署metastore metastore地址 --> <property> <name>hive.metastore.uris</name> <value>thrift://node1:9083</value> </property>
-
-
-
如何验证?
-
在hive客户端建库建表,然后再spark-sql中也能查到
进入/export/server/hive/bin/hive create database test_db; use test_db; create table test_table(id int,name string); insert into test_table values(1,'zs');
-
进入/export/server/spark/bin/spark-sql界面
-
进入/export/server/spark/bin/spark-sql show databases;--查询的是hive的库 show tables;--查询的hive的表
-
-
或者进入pyspark会话
/export/server/spark/bin/pyspark
spark.sql('show databases').show() spark.sql('show tables').show()
-
在spark代码中集成HIVE
-
在SparkSession中写上3句话
.config('hive.metastore.warehouse.dir','hdfs:///user/hive/warehouse/')\ .config('hive.metastore.uris','thrift://node1:9083')\ .enableHiveSupport()\
-
测试代码
import os from pyspark.sql import SparkSession from pyspark.sql.types import * os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['SPARK_HOME'] = '/export/server/spark' PYSPARK_PYTHON = "/root/anaconda3/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON if __name__ == '__main__': # 1-将HDFS真实数据的地址告诉给Spark # 2-将hive的元数据地址告诉给Spark # 3-开启hive的支持 spark=SparkSession\ .builder\ .appName('test')\ .master('local[*]')\ .config('hive.metastore.warehouse.dir','hdfs:///user/hive/warehouse/')\ .config('hive.metastore.uris','thrift://node1:9083')\ .enableHiveSupport()\ .getOrCreate() #上面做了spark集成hive之后,下面查询的默认就是hive的库和表。 spark.sql('show databases').show() spark.sql('use bigdata').show() spark.sql('select * from test1').show() spark.stop()
【了解】Spark的分布式JDBC服务
-
本章节的前提是先把《Linux的Spark中集成hive章节》集成成功。再做下面的事。
-
Spark Thriftserver与hiveserver2、metastore、beeline等的关系
-
注意上面红色的hiveserver2架构和绿色的Spark Thriftserver架构中间有一些东西是重叠共用的。
-
需要启动hive的metastore服务
nohup /export/server/hive/bin/hive --service metastore 2>&1 > /tmp/hive-metastore.log &
metastore进程会占用【9083】端口
-
Spark Thriftserver【不需要】启动hiveserver2进程。
-
启动Spark的thriftserver服务
-
注意不要选取10000端口,因为10000端口通常作为【hiveserver2】进程的端口。比如选用10001
-
如果在企业中可以申请很多资源。比如向yarn申请200C:
/export/server/spark/sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=10001 \ --hiveconf hive.server2.thrift.bind.host=node1 \ --master yarn \ --deploy-mode cluster \ --num-executors 50 \ --executor-cores 4 \ --executor-memory 12G \
-
上面的作用
- 在实际大数据分析项目中,使用SparkSQL时,往往启动一个【Spark的thriftserver】服务,分配较多资源(Executor数目和内存、CPU),不同的用户启动客户端比如【beeline】连接他,编写SQL语句分析数据。方便集中管理。
-
此处如果自测练习,就少一点:
SPARK_HOME=/export/server/spark $SPARK_HOME/sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=10001 \ --hiveconf hive.server2.thrift.bind.host=node1 \ --master local[*]
-
-
启动客户端beeline去连接上面的Spark Thriftserver
-
进入beeline客户端:
输入beeline !connect jdbc:hive2://node1:10001 Enter username for jdbc:hive2://node1:10001: root Enter password for jdbc:hive2://node1:10001: 123456
-
连接数据库需要用户名和密码,就是Linux的用户名和密码。
-
【了解】如果不输入用户名和密码,则是以匿名用户anonymous登录的,后期有时写表数据时会权限不足。
-
最好用root和密码登录,不会有权限限制问题。
-
-
在Pycharm中配置客户端连接Spark的thriftserver服务
- 见资料的《如何在IDEA(或Pycharm)中配置Hive和SparkSQL数据源.doc》
-
【了解】如何查看mestore进程是否占用了9083端口号?
- 1-先 jps -m | grep metastore
- 2-再【netstat -antp | grep 进程号】 ,就会返回端口号。