首页 > 数据库 >SparkSQL

SparkSQL

时间:2022-10-22 12:56:44浏览次数:52  
标签:val df age DataFrame SparkSQL spark User

DataFrame

  • DataFrame 是一种以 RDD 为基础的分布式数据集,类似于二维表格。与 RDD 的区别在于,前者带有 schema 元信息,即 DataFrame。

  • DataFrame 也是懒执行的,但性能上比 RDD 要高。因为优化了执行计划,查询计划通过 Spark catalyst optimiser 进行了优化。

  • DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表活着生成 SQL 表达式。

  • DataFrame API 既有 transfomation 操作也有 action 操作。

创建 DataFrame

在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame 有三种方式:

  • 通过 Spark 的数据源进行创建。
  • 从一个存在的 RDD 进行转换。
  • 从 Hive Table 进行查询返回。

基本操作

// DataFrame
val df: DataFrame = spark.read.json("datas/user.json")
df.show()

// DataFrame => SQL 风格语法
// 创建视图
df.createTempView("user")
spark.sql("select * from user").show()
spark.sql("select age from user").show()
spark.sql("select avg(age) from user").show()

// DataFrame => DSL 风格语法
// 特定领域语言(domain-specific language),使用 DSL 不必去创建临时视图
df.select("age", "username").show()

// RDD => DataFrame => DataSet 转换需要引入隐式转换规则
// spark 不是包名,是上下文环境对象名
// 涉及到运算时,每列都必须使用 $,或者采用引号表达式:单引号+字段名
import spark.implicits._
df.select($"age" + 1).show()
df.select('age + 1).show()

DataSet

  • DataSet 是分布式数据集合, 是 DataFrame 的一个扩展。
  • DataSet 是强类型,可以有 DataSet[Person]DataSet[Car]
  • DataFrame 是 DataSet 的特例,DataFrame = DataSet[Row]Row 是一个跟 Person 一样的类型,所有的表结构信息都用 Row 表示。

基本操作

// 使用基本类型的序列创建 DataSet
val seq: Seq[Int] = Seq(1, 2, 3, 4)
val ds: Dataset[Int] = seq.toDS()
ds.show()

// 使用样例类序列创建 DataSet
case class Person(name: String, age:Int)
val ds: Dataset[Person] = Seq(Person("zhangsan", 20)).toDS()
ds.show()

三者之间的关系

共性

  • 都是分布式弹性数据集。
  • 都有惰性机制,在进行创建、转换操作时,不会立即执行,只有遇到 action 操作才会执行。
  • 都会根据 Spark 的内存情况自动缓存运算。
  • 都有 Partition 的概念。

相互转换

转换关系

// RDD <=> DataFrame
val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "zhangsan", 20), (2, "lisi", 30)))
val df: DataFrame = rdd.toDF("id", "name", "age")
val rowRDD: RDD[Row] = df.rdd

// DataFrame <=> DataSet
val ds: Dataset[User] = df.as[User]
val df1: DataFrame = ds.toDF()

// RDD <=> DataSet
val ds1: Dataset[User] = rdd.map {
  case (id, name, age) => {
    User(id, name, age)
  }
}.toDS()
val userRDD: RDD[User] = ds1.rdd

case class User(id: Int, name: String, age:Int)

用户自定义函数

UDF

用户可以通过 spark.udf 功能添加自定义函数,实现自定义功能

val df = spark.read.json("datas/user.json")
df.createOrReplaceTempView("user")

// 自定义一个函数,函数名为prefixName,传入参数为name:String
spark.udf.register("prefixName", (name:String) => {
  "Name: " + name
})

// 使用自定义的函数
spark.sql("select age, prefixName(username) from user").show

UDAF

强类型的 DS 和弱类型的 DF 都提供了相关的聚合函数:count()、avg() 等等。用户可以自定义聚合函数,通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。从 Spark3.0 开始,UserDefinedAggregateFunction 就不推荐使用了。可以统一采用强类型聚合函数 Aggregator

object UDAF {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._

    val df = spark.read.json("datas/user.json")

    // 早期版本中,spark不能在sql中使用强类型UDAF操作
    // SQL & DSL
    // 早期的 UDAF 强类型聚合函数使用 DSL 语法操作
    val ds: Dataset[User] = df.as[User]

    // 将UDAF函数转换为查询的列对象
    val udafCol: TypedColumn[User, Long] = new MyAvgUDAF().toColumn

    ds.select(udafCol).show

    spark.close()
  }

  /*
     自定义聚合函数类:计算年龄的平均值
     1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
         IN : 输入的数据类型 User
         BUF : 缓冲区的数据类型 Buff
         OUT : 输出的数据类型 Long
     2. 重写方法(6)
  */
  case class User(username:String, age:Long)
  case class Buff( var total:Long, var count:Long )
  class MyAvgUDAF extends Aggregator[User, Buff, Long]{
    // z & zero : 初始值或零值
    // 缓冲区的初始化
    override def zero: Buff = {
      Buff(0L,0L)
    }

    // 根据输入的数据更新缓冲区的数据
    override def reduce(buff: Buff, in: User): Buff = {
      buff.total = buff.total + in.age
      buff.count = buff.count + 1
      buff
    }

    // 因为是分布式计算有多个缓冲区,需要合并每个缓冲区数据(即合并每个分区的计算结果)
    override def merge(buff1: Buff, buff2: Buff): Buff = {
      buff1.total = buff1.total + buff2.total
      buff1.count = buff1.count + buff2.count
      buff1
    }

    // 根据最后的结果,再执行具体的业务计算逻辑
    override def finish(buff: Buff): Long = {
      buff.total / buff.count
    }

    // 缓冲区的编码操作
    override def bufferEncoder: Encoder[Buff] = Encoders.product

    // 输出的编码操作
    override def outputEncoder: Encoder[Long] = Encoders.scalaLong
  }
}

标签:val,df,age,DataFrame,SparkSQL,spark,User
From: https://www.cnblogs.com/fireonfire/p/16815903.html

相关文章

  • SparkSQL参数
    SparkSQL参数<1>表分区类参数--是否允许动态生成分区sethive.exec.dynamic.partition=true;--是否容忍指定分区全部动态生成sethive.exec.dynamic.partition.mode=......
  • SparkSQL on K8s 在网易传媒的落地实践
    随着云原生技术的发展和成熟,大数据基础设施积极拥抱云原生是业内发展的一大趋势。网易传媒在2021年成功将SparkSQL部署到了K8s集群,并实现与部分在线业务的混合部署,......
  • (4)SparkSQL中如何定义UDF和使用UDF
    SparkSQL中用户自定义函数,用法和SparkSQL中的内置函数类似;是saprkSQL中内置函数无法满足要求,用户根据业务需求自定义的函数。首先定义一个UDF函数:packagecom.udf;import......
  • 关于sparksql调优的一些操作
    1、查看执行计划 1、直接sql查看:explainselect...from... 2、ds.explain()2、执行计划的处理流程 sql代码->未决断的逻辑执行计划->根据元数据生成已决断的逻辑......
  • sparksql 优化
    最近把spark文档里面配置那一页看了一下,在这记录一些可用的配置,免得后续再去查文档地址:https://spark.apache.org/docs/3.0.1/configuration.htmlSpark文档运行环境......
  • sparksql 函数大全
    数学函数函数简介用法acosh反双曲余弦值SELECTacosh(0.5);0.9624236501192069SELECTacosh(3.5);1.9248473002384139asinh反双曲正弦SELECTasinh(1.45);......
  • sparksql概念补充
    Spark-sql概念补充基本概念        SparkSQL是基于RDD的,可以通过Schema信息来访问其中某个字段        RDD处理的不是结构化数据,所以不能进行类似HIve......
  • 创建SparkSQL的项目
    创建项目方式和前面一样pom依赖不一样无需导入spark_core包,因为spark_sql中包含了spark_corepom.xml文件<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="h......
  • SparkSQL支持的数据源
    1.SparkSQL支持的数据源HiveScala内存中数据--集合支持从RDD读取数据作SQL操作支持从外部存储文件读取数据json,csv,普通结构文本文件支持从关系型数据库读取数据处理......