SparkSQL简介
为什么需要 SparkSQL?
- Spark 的 RDD有一定局限性,无法处理结构化数据(比如 json 格式等等);
- SparkSQL 提供了两种编程的抽象,DataFrame(关心数据结构不关心类型),DataSet(关心面向对象的数据);
RDD、DataFrame、DataSet
- DataFrame
DataFrame 是一种类似于 RDD 的分布式数据集,类似于传统数据库的二维表格;
DataFrame 与 RDD 的区别在于,DataFrame 带有表元数据信息,每一列都带有名称和类型;
SparkSQL 性能比 RDD 要高,因为可以针对结构化数据进行针对性优化。
- DataSet
DataFrame API的一个扩展,是 Spark 最新的数据抽象,提供 RDD 的优势以及 SparkSQL 优化执行引擎的优点。
DataSet 是强类型的,比如可以有 DataSet[Car]、DataSet[User],它具有类型安全检查;
三者共性:
- 全部为 Spark 下的分布式弹性数据集;
- 惰性机制,遇见转换算子(如 map)时不会立刻执行,而是遇到动作算子(如 collect)时开始计算;
- 在对
DataFrame
、DataSet
进行操作时依赖包import spark.implicits._
; - 根据 Spark 的内存情况进行自动缓存运算,即使数据量很大,也不用担心内存溢出问题;
三者区别:
- RDD:
- 不支持 sparkSql 操作;
- DataFrame:
- 每一行类型固定为 Row,每一列值无法直接访问,需要解析才能获取到各个字段值;
- 支持 SparkSQL 操作,比如
select、groupBy
操作;
- DataSet:
- DataFrame 只是 DataSet 的一个特例:
type DataFrame = DataSet[Row]
; - DataFrame 中每一行的类型是 Row,每一行有哪些字段、字段又有什么类型无从得知,只能通过特定方法拿取模式匹配字段;DataSet 中每一行类型不固定,只有在定义了
case class
之后才可以自由获取每一行信息。
- DataFrame 只是 DataSet 的一个特例:
SparkSQL 介绍
SparkSQL 是专门为了处理结构化数据而设计的 Spark 模块,不仅仅是简单的 SQL查询引擎,还提供了多种编程接口,包括 SQL、DataFrame、DataSet API,以支持不同类型的数据处理请求。
SparkSQL 设计理念在于将 SQL 强大功能与 Spark 高性能计算能力结合。
SparkSQL 核心特征:
- 集成性:Spark SQL与Spark紧密集成,可以通过SQL、DataFrame或Dataset API处理结构化数据。
- 统一数据访问:Spark SQL提供了统一的数据访问接口,支持连接多种数据源,包括Hive、Avro、Parquet、ORC、JSON和JDBC等。
- Hive集成:Spark SQL可以在现有的Hive数据仓库上运行,支持Hive的语法,并允许操作现有的Hive表。
- 标准接口:Spark SQL提供了标准的JDBC和ODBC接口,使得商业智能(BI)工具能够方便地连接和使用Spark集群。
SparkSQL 核心组件包括:
- SQL 解析器:负责接收前端用户输入的 SQL/Hive QL,并将其转换为 Spark 内部的执行计划;
- 逻辑计划器:负责将解析后的 SQL 语句转化为逻辑执行计划,其中包括数据源选择、过滤转换等操作;
- 物理计划器:负责将逻辑执行计划转化为物理执行计划,包括如何分配任务、如何分区、如何执行操作等信息。
- 执行引擎:负责执行物理执行计划,并将结果返回给用户;
- Catalyst优化器:负责对 逻辑/物理 执行计划进行优化,以提高查询性能;
- 数据源:SparkSQL 支持多种数据源,比如 Hive、JSON、Parquet、CSV 等;
- DataFrame、DataSet:SparkSQL 中核心概念,提供一种强类型的、面向列的数据结构,并支持类似关系型数据库操作。
SparkSQL Shell 编程
SparkSession:
- 老版本中
SparkSQL
提供两种 SQL 查询起始点,SparkContext、HiveContext;SparkSession 内部实际上封装了 SparkContext。
DataFrame
SQL 风格语法:
// 创建 DataFrame
val df = spark.read.json("/tmp/spark_data/user.json")
// 创建临时视图
df.createOrReplaceTempView("user")
// SQL语句查询全表
val sqlDF = spark.sql("SELECT * FROM user")
sqlDF.show()
+---+--------+
|age| name|
+---+--------+
| 20|qiaofeng|
| 19| xuzhu|
| 18| duanyu|
+---+--------+
// 求年龄平均值
val sqlDF = spark.sql("SELECT avg(age) from user")
+--------+
|avg(age)|
+--------+
// 临时视图只对当前 Session 有效,对其他 Session 需要创建全局视图
spark.newSession().sql("SELECT avg(age) from user ").show()
// 创建全局视图
df.createOrReplaceGlobalTempView("user2")
// 通过 SQL 语句实现查询全表
spark.sql("SELECT * FROM global_temp.user2").show()
+---+--------+
|age| name|
+---+--------+
| 20|qiaofeng|
| 19| xuzhu|
| 18| duanyu|
+---+--------+
DSL 语法:
DSL 为 Spark 特定的语言去管理结构化数据,不需要创建临时视图。
// 创建 DataFrame
val df = spark.read.json("/opt/module/spark-local/user.json")
// 查询指定列数据,注意列名用双引号或者只在前面的一个单引号来指定
df.select("name").show()
+--------+
| name|
+--------+
|qiaofeng|
| xuzhu|
| duanyu|
+--------+
// 条件查询
df.select("age","name").where("age>18").show
+---+--------+
|age| name|
+---+--------+
| 20|qiaofeng|
| 19| xuzhu|
+---+--------+
// 查询并操作列,注意每列都必须用 $ 来指定
df.select($"name",$"age" + 1).show
+--------+---------+
| name |(age + 1)|
+--------+---------+
|qiaofeng| 21|
| xuzhu| 20|
| duanyu| 19|
+--------+---------+
// 分组查询
df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 19| 1|
| 18| 1|
| 20| 1|
+---+-----+
// 求平均值、总和值
df.agg(avg("age")).show
+--------+
|avg(age)|
+--------+
| 19.0|
+--------+
df.agg(max("age")).show
+--------+
|max(age)|
+--------+
| 20|
+--------+
DataSet
// 创建样例类
case class User(name: String, age: Long)
// 将集合转换为 DataSet
val caseClassDS = Seq(User("wangyuyan",18)).toDS()
// 查看 DataSet 的值
caseClassDS.show()
+---------+---+
| name|age|
+---------+---+
|wangyuyan| 18|
+---------+---+
SparkSQL IDEA 编程
创建 Maven 项目,导入如下依赖:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.1</version>
</dependency>
</dependencies>
<build>
<finalName>SparkSQLTest</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
RDD 与 DataFrame 转换
- 手动转换
RDD.toDF("column1", "column2)
; - 通过样例类反射转换
UserRdd.map {x => User(x._1, x._2)}.toDF()
def test02(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
val sc = new SparkContext(conf)
val lineRdd = sc.textFile("input/user.txt")
val rdd = lineRdd.map {
line => {
val fields = line.split(",")
(fields(0), fields(1).toLong)
}
}
val spark = SparkSession.builder().config(conf).getOrCreate()
// RDD 与 DataFrame 转换必须导入的包
import spark.implicits._
val df = rdd.toDF("name", "age")
df.show()
// 对象类型转化为 DataFrame
val userRdd = rdd.map {
t=> {
User(t._1, t._2)
}
}
val userDF = userRdd.toDF()
userDF.show()
// DataFrame 转化为 RDD
val rdd1 = df.rdd
val userRdd2 = userDF.rdd
rdd1.collect().foreach(println)
userRdd2.collect().foreach(println)
// 获取转换后 RDD 中 ROW 类型的内部数据
val rdd2 = rdd1.map {
row => {
(row.getString(0), row.getLong(1))
}
}
rdd2.collect().foreach(println)
sc.stop()
}
RDD 与 DataSet 转换
- 手动转换:
RDD.map{x => User(x._1, x._2)}.toDS()
;
def test03(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
val sc = new SparkContext(conf)
val lineRdd = sc.textFile("input/user.txt")
val rdd = lineRdd.map {
line => {
val fields = line.split(",")
(fields(0), fields(1).toLong)
}
}
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
// RDD 转化为 DataSet
val ds = rdd.toDS()
ds.show()
// 对象类型RDD 转化为 DataSet
val userRdd = rdd.map {
t => {
User(t._1, t._2)
}
}
val userDs = userRdd.toDS()
userDs.show()
// DataSet 转化为 RDD
val rdd1 = ds.rdd
val userRdd2 = userDs.rdd
sc.stop()
}
DataSet 与 DataFrame 转换
def test04(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
val spark = SparkSession.builder().config(conf).getOrCreate()
val df = spark.read.json("input/user.json")
import spark.implicits._
// DataFrame 转换为 DataSet
val ds = df.as[User]
ds.show()
// DataSet 转换为 DataFrame
val convertDf = ds.toDF()
convertDf.show()
spark.stop()
}
用户自定义函数 UDF、UDAF
如果在执行 SparkSQL 时需要执行特殊的函数,可以实现自定义 UDF、UDAF 函数注册后使用。
UDF
函数只允许一个入参、一个出参:
def test05(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
val spark = SparkSession.builder().config(conf).getOrCreate()
val df = spark.read.json("input/user.json")
// 创建临时视图
df.createOrReplaceTempView("user")
// 通过匿名函数注册自定义 UDF 函数,处理一个输入
spark.udf.register("addName", (name:String)=>"Name:" + name)
// 调用自定义 UDF 函数
spark.sql("SELECT addName(name) as name,age FROM user").show()
spark.stop()
}
UDAF
函数允许输入多行,但是只能返回一行数据,Spark3.x
通过 extends Aggregator
自定义 UDAF:
def test06(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
val spark = SparkSession.builder().config(conf).getOrCreate()
val df = spark.read.json("input/user.json")
// 创建临时视图
df.createOrReplaceTempView("user")
// 注册自定义 UDAF 函数,处理多个输入
spark.udf.register("myAvg", functions.udaf(new MyAvgUDAF()))
// 调用自定义 UDAF 函数
spark.sql("SELECT myAvg(age) FROM user").show()
spark.stop()
}
// 输入数据类型
case class Buff(var sum:Long, var count:Long)
// 自定义 UDAF 函数,处理多个输入
class MyAvgUDAF extends Aggregator[Long, Buff, Double] {
// 初始化缓冲区
override def zero: Buff = Buff(0L, 0L)
// 聚合输入年龄、总人数
override def reduce(b: Buff, a: Long): Buff = {
b.sum = b.sum + a
b.count = b.count + 1
b
}
// 多个缓冲区数据合并
override def merge(b1: Buff, b2: Buff): Buff = {
b1.sum = b1.sum + b2.sum
b1.count = b1.count + b2.count
b1
}
// 聚合操作完成,获取最终结果
override def finish(reduction: Buff): Double = {
reduction.sum.toDouble / reduction.count
}
// spark 对传输对象的序列化操作
override def bufferEncoder: Encoder[Buff] = Encoders.product
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
Spark 数据加载与保存
更加通用的数据加载方式为
spark.read.format("…")[.option("…")].load("…")
更加通用的数据保存方式为df.write.format("…")[.option("…")].save("…")
def test01(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
val spark = SparkSession.builder().config(conf).getOrCreate()
spark.read.json("input/user.json").show()
spark.read.format("json").load("input/user.json").show()
spark.stop()
}
某些场景需要实现文件追加,可以通过 df.write.mode("...").文件格式(文件路径)
def test02(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
val spark = SparkSession.builder().config(conf).getOrCreate()
val df = spark.read.json("input/user.json")
// 默认保存为 Parquet 格式文件
df.write.save("output")
spark.read.load("output").show()
// 指定保存数据类型
df.write.format("json").save("output2")
// 追加文件
df.write.mode("append").json("output2")
// 文件已存在则忽略,不存在就创建
df.write.mode("ignore").json("output2")
// 文件已存在就覆盖
df.write.mode("overwrite").json("output2")
// 文件已存在则报异常
// df.write.mode("error").json("output2")
spark.stop()
}
与 MySQL 交互
// 从 MySQL 读取数据
def test03(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
val spark = SparkSession.builder().config(conf).getOrCreate()
// 从 MySQL 加载数据
val df = spark.read.format("jdbc")
.option("url", "jdbc:mysql://192.168.56.152:3306/gmall")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "DBa2020*")
.option("dbtable", "user_info")
.load()
df.createOrReplaceTempView("user")
spark.sql("select id,name from user").show()
spark.stop()
}
// 向 MySQL 写入数据
case class User(name : String, id : Int)
def test04(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
val spark = SparkSession.builder().config(conf).getOrCreate()
// DataSet 数据准备
val rdd = spark.sparkContext.makeRDD(List(User("zhaoliu", 6)))
import spark.implicits._
val ds = rdd.toDS()
// 写入数据库
ds.write.format("jdbc")
.option("url", "jdbc:mysql://192.168.56.152:3306/gmall")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "DBa2020*")
.option("dbtable", "user_info")
.mode(SaveMode.Append)
.save()
spark.stop()
}