spark sql
Apache Spark SQL 是 Apache Spark 中用于结构化数据处理的模块。它允许在大规模数据集上运行 SQL 查询,提供数据查询、分析和转换的能力。Spark SQL 与 Spark 核心集成,允许你将 SQL 查询与其他 Spark 函数结合使用。
主要特点:
- DataFrame 和 Dataset:
- DataFrame 是一种以命名列组织的分布式数据集合,类似于关系数据库中的表。
- Dataset 是类型化的分布式数据集合,提供编译时类型安全性。
- SQL 查询:
- 可以使用标准 SQL 语法在 DataFrame 和 Dataset 上执行查询。
- 支持与 Hive 集成,能够查询 Hive 表。
- 数据源支持:
- 支持多种数据源,如 JSON、Parquet、Avro、ORC、JDBC 等。
- 可以轻松地将数据从各种存储系统(如 HDFS、Cassandra、HBase、S3 等)加载到 Spark 中。
- Catalyst 优化器:
- Spark SQL 使用 Catalyst 优化器来自动优化查询,生成高效的执行计划,从而提高查询性能。
- 统一的数据访问接口:
- 提供统一的 API 来访问不同的数据源和格式,简化数据操作和处理流程。
spark sql处理数据的步骤:
- 读取数据源
- 将读取到的DF注册成一个临时视图
- 使用sparkSession的sql函数,编写sql语句操作临时视图,返回的依旧是一个DataFrame
- 将结果写道hdfs上
如何在spark sql中使用rdd编程
spark sql 是spark core的上层api,如果要想使用rdd的编程,可以直接通过sparkSession获取SparkContext对象
在使用sparkSession获取SparkContext对象后使用println打印不出来结果,需要使用show函数展示结果
可以使用printSchema函数来查看表的结构
sql语句是无法直接作用在DataFrame上面的,需要提前将要使用sql分析的DataFrame注册成一张表(临时视图)
可以编写sql语句作用在临时视图上,sql语法是完全兼容hive语法
spark DSL & API
如果要想使用DSL语法编写spark sql的话,需要导入两个隐式转换
// 将sql中的函数,封装成spark程序中的一个个的函数直接调用,以传参的方式调用
import org.apache.spark.sql.functions._
// 主要作用是,将来可以在调用的函数中,使用$函数,将列名字符串类型转成一个ColumnName类型,而ColumnName是继承自Column类的
import sparkSession.implicits._
读取json数据文件,转成DF
读取json数据的时候,是不需要指定表结构,可以自动的根据json的键值来构建DataFrame
使用show()函数打印时,默认显示20行数据,可以在括号内赋予numRows值,返回想要得到的行数
select函数
类似于纯sql语法中的select关键字,传入要查询的列
selectExpr函数
与select功能差不多的查询函数,如果要以传字符串的形式给到select的话,并且还想对列进行表达式处理的话,可以使用selectExpr函数
where函数
===:类似于sql中的=等于某个值
=!=:类似于sql中!=或者<> 不等于某个值
groupBy函数
非分组字段使无法出现在select查询语句中的
orderBy函数
按照某个字段进行排序
开窗函数
无论是在纯sql中还是在DSL语法中,开窗是不会改变原表条数
sparksql数据源读写
可以读写csv格式的数据:
可以直接使用.csv文件的函数读取
也可以使用format的形式读取,可以设置表的结构
读取json文件
使用.json的函数读取json文件,json文件会自动根据键值对创建列和列名
也可以用.json函数写出json文件
去写parquet格式文件
parquet格式的文件存储,是由【信息熵】决定的,极大的减小存储空间
DataFrame与RDD的相互转换
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("rdd与df之间的转换")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
//通过SparkSession获取sparkContext对象
val sparkContext: SparkContext = sparkSession.sparkContext
//作用1:使用$函数
//作用2:可以在不同的数据结构之间转换
import sparkSession.implicits._
/**
* spark core的核心数据结构是:RDD
* spark sql的核心数据结构是DataFrame
*/
// RDD->DataFrame .toDF
val linesRDD: RDD[String] = sparkContext.textFile("spark/data/students.txt")
val stuRDD: RDD[(String, String, String, String, String)] = linesRDD.map((line: String) => {
line.split(",") match {
case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
(id, name, age, gender, clazz)
}
})
val resRDD1: RDD[(String, Int)] = stuRDD.groupBy(_._5)
.map((kv: (String, Iterable[(String, String, String, String, String)])) => {
(kv._1, kv._2.size)
})
val df1: DataFrame = resRDD1.toDF
val df2: DataFrame = df1.select($"_1" as "clazz", $"_2" as "counts")
df2.printSchema()
// DataFrame->RDD .rdd
val resRDD2: RDD[Row] = df2.rdd
// resRDD2.map((row:Row)=>{
// val clazz: String = row.getAs[String]("clazz")
// val counts: Integer = row.getAs[Integer]("counts")
// s"班级:$clazz, 人数:$counts"
// }).foreach(println)
resRDD2.map {
case Row(clazz:String, counts:Integer)=>
s"班级:$clazz, 人数:$counts"
}.foreach(println)
spark-sql 写代码方式
-
idea里面将代码编写好打包上传到集群中运行,上线使用
--conf spark.sql.shuffle.partitions=1 -- 设置spark sqlshuffle之后分区数据马,和代码里面设置是一样的,代码中优先级高 spark-submit提交 spark-submit --master yarn-client --class com.shujia.sql.Demo8SubmitYarn --conf spark.sql.shuffle.partitions=1 spark-1.0.jar //新版本spark提交yarn的命令 spark-submit --master yarn --deploy-mode client --class xxx.xxx.xxx --conf spark.sql.shuffle.partitions=100 spark-1.0.jar
-
spark shell (repl) 里面使用sqlContext 测试使用,简单任务使用
-
spark-sql spark-sql --master yarn --deploy-mode client 不能使用yarn-cluster 和hive的命令行一样,直接写sql