SparkSession
在老的版本中,SparkSQL 提供两种 SQL 查询起始点:
一个叫SQLContext,用于Spark 自己提供的 SQL 查询;一个叫 HiveContext,用于连接 Hive 的查询。
从2.0开始, SparkSession作为 Spark 最新的 SQL 查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的 API 在SparkSession上同样是可以使用的。
SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。
当使用 spark-shell 的时候, spark 会自动的创建一个叫做spark的SparkSession, 就像我们以前可以自动获取到一个sc来表示SparkContext
DataFrame编程
Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式.
DataFrame API 既有 transformation操作也有action操作. DataFrame的转换从本质上来说更具有关系, 而 DataSet API 提供了更加函数式的 API
1 DataFrame创建
With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.
有了 SparkSession 之后, 通过 SparkSession有 3 种方式来创建DataFrame:
1 通过 Spark 的数据源创建
2 通过已知的 RDD 来创建
3 通过查询一个 Hive 表来创建.
1.1 通过 Spark 的数据源创建
Spark支持的数据源:
// 读取 json 文件
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
// 展示结果
scala> df.show
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
1.2 通过已知的 RDD 来创建
注意:
涉及到RDD, DataFrame, DataSet之间的操作时, 需要导入:import spark.implicits._
这里的spark不是包名, 而是表示SparkSession 的那个对象. 所以必须先创建SparkSession对象再导入.
implicits是一个内部object
// 首先创建一个RDD
scala> val rdd1 = sc.textFile("/opt/module/spark-local/examples/src/main/resources/people.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/examples/src/main/resources/people.txt MapPartitionsRDD[10] at textFile at <console>:24
1.2.1 手动转换
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); (paras(0), paras(1).toInt)})
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:26
// 转换为 DataFrame 的时候手动指定每个数据字段名
scala> rdd2.toDF("name", "age").show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
1.2.2 通过样例类反射转换
// 1.创建样例类
scala> case class People(name :String, age: Int)
defined class People
// 2.使用样例把 RDD 转换成DataFrame
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); People(paras(0), paras(1).toInt) })
rdd2: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[6] at map at <console>:28
scala> rdd2.toDF.show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
1.2.3 通过API方式进行转换
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object DataFrameDemo2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("Word Count")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val rdd: RDD[(String, Int)] = sc.parallelize(Array(("lisi", 10), ("zs", 20), ("zhiling", 40)))
// 映射出来一个 RDD[Row], 因为 DataFrame其实就是 DataSet[Row]
val rowRdd: RDD[Row] = rdd.map(x => Row(x._1, x._2))
// 创建 StructType 类型
val types = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))
val df: DataFrame = spark.createDataFrame(rowRdd, types)
df.show
}
}
1.3 通过查询一个 Hive 表来创建
Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQ L编译时可以包含 Hive 支持,也可以不包含。
包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。
需要强调的一点是,如果要在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。
一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。
如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。
若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到 Spark的配置文件目录中($SPARK_HOME/conf)。
即使没有部署好 Hive,Spark SQL 也可以运行。
需要注意的是,如果你没有部署好Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db。
此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,
这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中
(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。
1.3.1 使用内嵌Hive
// 如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可.
// Hive 的元数据存储在 derby 中, 仓库地址:$SPARK_HOME/spark-warehouse
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
scala> spark.sql("create table aa(id int)")
19/02/09 18:36:10 WARN HiveMetaStore: Location: file:/opt/module/spark-local/spark-warehouse/aa specified for non-external table:aa
res2: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| aa| false|
+--------+---------+-----------+
// 向表中加载本地数据数据
scala> spark.sql("load data local inpath './ids.txt' into table aa")
res8: org.apache.spark.sql.DataFrame = []
scala> spark.sql("select * from aa").show
+---+
| id|
+---+
|100|
|101|
|102|
|103|
|104|
|105|
|106|
+---+
// 然而在实际使用中, 几乎没有任何人会使用内置的 Hive
1.3.2 使用外置Hive
1.3.2.1 配置准备
1.Spark 要接管 Hive 需要把 hive-site.xml copy 到conf/目录下.
2.把 Mysql 的驱动 copy 到 jars/目录下.
3.如果访问不到hdfs, 则需要把core-site.xml和hdfs-site.xml 拷贝到conf/目录下.
1.3.2.2 启动 spark-shell
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| emp| false|
+--------+---------+-----------+
scala> spark.sql("select * from emp").show
19/02/09 19:40:28 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
+-----+-------+---------+----+----------+------+------+------+
|empno| ename| job| mgr| hiredate| sal| comm|deptno|
+-----+-------+---------+----+----------+------+------+------+
| 7369| SMITH| CLERK|7902|1980-12-17| 800.0| null| 20|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0| 30|
| 7521| WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0| 30|
| 7566| JONES| MANAGER|7839| 1981-4-2|2975.0| null| 20|
| 7654| MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0| 30|
| 7698| BLAKE| MANAGER|7839| 1981-5-1|2850.0| null| 30|
| 7782| CLARK| MANAGER|7839| 1981-6-9|2450.0| null| 10|
| 7788| SCOTT| ANALYST|7566| 1987-4-19|3000.0| null| 20|
| 7839| KING|PRESIDENT|null|1981-11-17|5000.0| null| 10|
| 7844| TURNER| SALESMAN|7698| 1981-9-8|1500.0| 0.0| 30|
| 7876| ADAMS| CLERK|7788| 1987-5-23|1100.0| null| 20|
| 7900| JAMES| CLERK|7698| 1981-12-3| 950.0| null| 30|
| 7902| FORD| ANALYST|7566| 1981-12-3|3000.0| null| 20|
| 7934| MILLER| CLERK|7782| 1982-1-23|1300.0| null| 10|
| 7944|zhiling| CLERK|7782| 1982-1-23|1300.0| null| 50|
+-----+-------+---------+----+----------+------+------+------+
1.3.2.3 启动 spark-sql cli
在spark-shell执行 hive 方面的查询比较麻烦.spark.sql("").show
Spark 专门给我们提供了书写 HiveQL 的工具: spark-sql cli
1.3.2.4 使用hiveserver2 + beeline
# spark-sql 得到的结果不够友好, 所以可以使用hiveserver2 + beeline
1.启动 thrift服务器
sbin/start-thriftserver.sh \
--master yarn \
--hiveconf hive.server2.thrift.bind.host=hadoop201 \
-–hiveconf hive.server2.thrift.port=10000 \
2.启动beeline客户端
bin/beeline
# 然后输入
!connect jdbc:hive2://hadoop201:10000
# 然后按照提示输入用户名和密码
2 DataFrame语法风格
2.1 SQL语法风格
SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询.
这种风格的查询必须要有临时视图或者全局视图来辅助
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.createOrReplaceTempView("people")
scala> spark.sql("select * from people").show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
注意:
1.临时视图只能在当前 Session 有效, 在新的 Session 中无效.
2.可以创建全局视图. 访问全局视图需要全路径:如global_temp.xxx
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.createGlobalTempView("people")
scala> spark.sql("select * from global_temp.people")
res31: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> res31.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> spark.newSession.sql("select * from global_temp.people")
res33: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> res33.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
2.2 DSL语法风格
DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据.
可以在 Scala, Java, Python 和 R 中使用 DSL;
使用 DSL 语法风格不必去创建临时视图了.
scala> df.select($"name").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> df.select("name").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
标签:scala,编程,Hive,DataFrame,SparkSQL,sql,spark,Spark
From: https://blog.csdn.net/weixin_44872470/article/details/139316379