首页 > 数据库 >05_sparkSQL

05_sparkSQL

时间:2024-07-23 21:21:18浏览次数:18  
标签:val 05 df DataFrame DataSet sparkSQL spark user

SparkSQL简介

为什么需要 SparkSQL?

  • Spark 的 RDD有一定局限性,无法处理结构化数据(比如 json 格式等等);
  • SparkSQL 提供了两种编程的抽象,DataFrame(关心数据结构不关心类型),DataSet(关心面向对象的数据);

RDD、DataFrame、DataSet

  • DataFrame

DataFrame 是一种类似于 RDD 的分布式数据集,类似于传统数据库的二维表格;
DataFrame 与 RDD 的区别在于,DataFrame 带有表元数据信息,每一列都带有名称和类型;
SparkSQL 性能比 RDD 要高,因为可以针对结构化数据进行针对性优化。

  • DataSet

DataFrame API的一个扩展,是 Spark 最新的数据抽象,提供 RDD 的优势以及 SparkSQL 优化执行引擎的优点。
DataSet 是强类型的,比如可以有 DataSet[Car]、DataSet[User],它具有类型安全检查;

三者共性:

  • 全部为 Spark 下的分布式弹性数据集;
  • 惰性机制,遇见转换算子(如 map)时不会立刻执行,而是遇到动作算子(如 collect)时开始计算;
  • 在对 DataFrameDataSet 进行操作时依赖包 import spark.implicits._
  • 根据 Spark 的内存情况进行自动缓存运算,即使数据量很大,也不用担心内存溢出问题;

三者区别:

  • RDD:
    • 不支持 sparkSql 操作;
  • DataFrame:
    • 每一行类型固定为 Row,每一列值无法直接访问,需要解析才能获取到各个字段值;
    • 支持 SparkSQL 操作,比如 select、groupBy 操作;
  • DataSet:
    • DataFrame 只是 DataSet 的一个特例:type DataFrame = DataSet[Row]
    • DataFrame 中每一行的类型是 Row,每一行有哪些字段、字段又有什么类型无从得知,只能通过特定方法拿取模式匹配字段;DataSet 中每一行类型不固定,只有在定义了 case class 之后才可以自由获取每一行信息。

SparkSQL 介绍

SparkSQL 是专门为了处理结构化数据而设计的 Spark 模块,不仅仅是简单的 SQL查询引擎,还提供了多种编程接口,包括 SQL、DataFrame、DataSet API,以支持不同类型的数据处理请求。
SparkSQL 设计理念在于将 SQL 强大功能与 Spark 高性能计算能力结合。

SparkSQL 核心特征:

  • 集成性:Spark SQL与Spark紧密集成,可以通过SQL、DataFrame或Dataset API处理结构化数据。
  • 统一数据访问:Spark SQL提供了统一的数据访问接口,支持连接多种数据源,包括Hive、Avro、Parquet、ORC、JSON和JDBC等。
  • Hive集成:Spark SQL可以在现有的Hive数据仓库上运行,支持Hive的语法,并允许操作现有的Hive表。
  • 标准接口:Spark SQL提供了标准的JDBC和ODBC接口,使得商业智能(BI)工具能够方便地连接和使用Spark集群。

SparkSQL 核心组件包括:

  • SQL 解析器:负责接收前端用户输入的 SQL/Hive QL,并将其转换为 Spark 内部的执行计划;
  • 逻辑计划器:负责将解析后的 SQL 语句转化为逻辑执行计划,其中包括数据源选择过滤转换等操作;
  • 物理计划器:负责将逻辑执行计划转化为物理执行计划,包括如何分配任务如何分区如何执行操作等信息。
  • 执行引擎:负责执行物理执行计划,并将结果返回给用户;
  • Catalyst优化器:负责对 逻辑/物理 执行计划进行优化,以提高查询性能;
  • 数据源:SparkSQL 支持多种数据源,比如 Hive、JSON、Parquet、CSV 等;
  • DataFrame、DataSet:SparkSQL 中核心概念,提供一种强类型的、面向列的数据结构,并支持类似关系型数据库操作。

SparkSQL Shell 编程

SparkSession:

  • 老版本中 SparkSQL 提供两种 SQL 查询起始点,SparkContext、HiveContext;SparkSession 内部实际上封装了 SparkContext。

DataFrame

SQL 风格语法:

// 创建 DataFrame
val df = spark.read.json("/tmp/spark_data/user.json")

// 创建临时视图
df.createOrReplaceTempView("user")

// SQL语句查询全表
val sqlDF = spark.sql("SELECT * FROM user")
sqlDF.show()

+---+--------+
|age|    name|
+---+--------+
| 20|qiaofeng|
| 19|   xuzhu|
| 18|  duanyu|
+---+--------+

// 求年龄平均值
val sqlDF = spark.sql("SELECT avg(age) from user")
+--------+                                                                      
|avg(age)|
+--------+

// 临时视图只对当前 Session 有效,对其他 Session 需要创建全局视图
spark.newSession().sql("SELECT avg(age) from user ").show()



// 创建全局视图
df.createOrReplaceGlobalTempView("user2")

// 通过 SQL 语句实现查询全表
spark.sql("SELECT * FROM global_temp.user2").show()
+---+--------+
|age|    name|
+---+--------+
| 20|qiaofeng|
| 19|   xuzhu|
| 18|  duanyu|
+---+--------+

DSL 语法:

DSL 为 Spark 特定的语言去管理结构化数据,不需要创建临时视图。

// 创建 DataFrame
val df = spark.read.json("/opt/module/spark-local/user.json")

// 查询指定列数据,注意列名用双引号或者只在前面的一个单引号来指定
df.select("name").show()
+--------+
|  name|
+--------+
|qiaofeng|
|  xuzhu|
| duanyu|
+--------+

// 条件查询
df.select("age","name").where("age>18").show
+---+--------+
|age|  name|
+---+--------+
| 20|qiaofeng|
| 19|  xuzhu|
+---+--------+

// 查询并操作列,注意每列都必须用 $ 来指定
df.select($"name",$"age" + 1).show
+--------+---------+
| name  |(age + 1)|
+--------+---------+
|qiaofeng|    21|
|  xuzhu|    20|
| duanyu|    19|
+--------+---------+

// 分组查询
df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 19|    1|
| 18|    1|
| 20|    1|
+---+-----+

// 求平均值、总和值
df.agg(avg("age")).show
+--------+
|avg(age)|
+--------+
|   19.0|
+--------+

df.agg(max("age")).show
+--------+
|max(age)|
+--------+
|     20|
+--------+

DataSet

// 创建样例类
case class User(name: String, age: Long)

// 将集合转换为 DataSet
val caseClassDS = Seq(User("wangyuyan",18)).toDS()

// 查看 DataSet 的值
caseClassDS.show()
+---------+---+
|     name|age|
+---------+---+
|wangyuyan|  18|
+---------+---+

img

SparkSQL IDEA 编程

创建 Maven 项目,导入如下依赖:

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.1</version>
    </dependency>

  </dependencies>

  <build>
    <finalName>SparkSQLTest</finalName>
    <plugins>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.4.6</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

RDD 与 DataFrame 转换

  • 手动转换 RDD.toDF("column1", "column2)
  • 通过样例类反射转换 UserRdd.map {x => User(x._1, x._2)}.toDF()
  def test02(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
    val sc = new SparkContext(conf)
    val lineRdd = sc.textFile("input/user.txt")

    val rdd = lineRdd.map {
      line => {
        val fields = line.split(",")
        (fields(0), fields(1).toLong)
      }
    }
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // RDD 与 DataFrame 转换必须导入的包
    import spark.implicits._
    val df = rdd.toDF("name", "age")
    df.show()
    // 对象类型转化为 DataFrame
    val userRdd = rdd.map {
      t=> {
        User(t._1, t._2)
      }
    }
    val userDF = userRdd.toDF()
    userDF.show()

    // DataFrame 转化为 RDD
    val rdd1 = df.rdd
    val userRdd2 = userDF.rdd
    rdd1.collect().foreach(println)
    userRdd2.collect().foreach(println)

    // 获取转换后 RDD 中 ROW 类型的内部数据
    val rdd2 = rdd1.map {
      row => {
        (row.getString(0), row.getLong(1))
      }
    }
    rdd2.collect().foreach(println)

    sc.stop()
  }

RDD 与 DataSet 转换

  • 手动转换:RDD.map{x => User(x._1, x._2)}.toDS()
  def test03(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
    val sc = new SparkContext(conf)
    val lineRdd = sc.textFile("input/user.txt")
    val rdd = lineRdd.map {
      line => {
        val fields = line.split(",")
        (fields(0), fields(1).toLong)
      }
    }

    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
    // RDD 转化为 DataSet
    val ds = rdd.toDS()
    ds.show()

    // 对象类型RDD 转化为 DataSet
    val userRdd = rdd.map {
      t => {
        User(t._1, t._2)
      }
    }
    val userDs = userRdd.toDS()
    userDs.show()

    // DataSet 转化为 RDD
    val rdd1 = ds.rdd
    val userRdd2 = userDs.rdd
    sc.stop()
  }

DataSet 与 DataFrame 转换

  def test04(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val df = spark.read.json("input/user.json")

    import spark.implicits._

    // DataFrame 转换为 DataSet
    val ds = df.as[User]
    ds.show()

    // DataSet 转换为 DataFrame
    val convertDf = ds.toDF()
    convertDf.show()

    spark.stop()
  }

用户自定义函数 UDF、UDAF

如果在执行 SparkSQL 时需要执行特殊的函数,可以实现自定义 UDF、UDAF 函数注册后使用。

UDF 函数只允许一个入参、一个出参:

  def test05(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val df = spark.read.json("input/user.json")
    // 创建临时视图
    df.createOrReplaceTempView("user")
    // 通过匿名函数注册自定义 UDF 函数,处理一个输入
    spark.udf.register("addName", (name:String)=>"Name:" + name)
    // 调用自定义 UDF 函数
    spark.sql("SELECT addName(name) as name,age FROM user").show()
    spark.stop()
  }

UDAF 函数允许输入多行,但是只能返回一行数据,Spark3.x 通过 extends Aggregator 自定义 UDAF:

  def test06(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val df = spark.read.json("input/user.json")
    // 创建临时视图
    df.createOrReplaceTempView("user")
    // 注册自定义 UDAF 函数,处理多个输入
    spark.udf.register("myAvg", functions.udaf(new MyAvgUDAF()))
    // 调用自定义 UDAF 函数
    spark.sql("SELECT myAvg(age) FROM user").show()
    spark.stop()
  }

  // 输入数据类型
  case class Buff(var sum:Long, var count:Long)

    // 自定义 UDAF 函数,处理多个输入
  class MyAvgUDAF extends Aggregator[Long, Buff, Double] {
    // 初始化缓冲区
    override def zero: Buff = Buff(0L, 0L)

    // 聚合输入年龄、总人数
    override def reduce(b: Buff, a: Long): Buff = {
        b.sum = b.sum + a
        b.count = b.count + 1
        b
    }

    // 多个缓冲区数据合并
    override def merge(b1: Buff, b2: Buff): Buff = {
        b1.sum = b1.sum + b2.sum
        b1.count = b1.count + b2.count
        b1
    }

    // 聚合操作完成,获取最终结果
    override def finish(reduction: Buff): Double = {
        reduction.sum.toDouble / reduction.count
    }

    // spark 对传输对象的序列化操作
    override def bufferEncoder: Encoder[Buff] = Encoders.product

    override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
  }

Spark 数据加载与保存

更加通用的数据加载方式为 spark.read.format("…")[.option("…")].load("…")
更加通用的数据保存方式为 df.write.format("…")[.option("…")].save("…")

  def test01(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    spark.read.json("input/user.json").show()

    spark.read.format("json").load("input/user.json").show()
    spark.stop()
  }

某些场景需要实现文件追加,可以通过 df.write.mode("...").文件格式(文件路径)

  def test02(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val df = spark.read.json("input/user.json")
    // 默认保存为 Parquet 格式文件
    df.write.save("output")
    spark.read.load("output").show()

    // 指定保存数据类型
    df.write.format("json").save("output2")
    // 追加文件
    df.write.mode("append").json("output2")
    // 文件已存在则忽略,不存在就创建
    df.write.mode("ignore").json("output2")
    // 文件已存在就覆盖
    df.write.mode("overwrite").json("output2")
    // 文件已存在则报异常
//    df.write.mode("error").json("output2")
    spark.stop()
  }

与 MySQL 交互

  // 从 MySQL 读取数据
  def test03(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // 从 MySQL 加载数据
    val df = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://192.168.56.152:3306/gmall")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "DBa2020*")
      .option("dbtable", "user_info")
      .load()

    df.createOrReplaceTempView("user")
    spark.sql("select id,name from user").show()
    spark.stop()
  }


  // 向 MySQL 写入数据
  case class User(name : String, id : Int)

  def test04(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_demo")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // DataSet 数据准备
    val rdd = spark.sparkContext.makeRDD(List(User("zhaoliu", 6)))
    import spark.implicits._
    val ds = rdd.toDS()

    // 写入数据库
    ds.write.format("jdbc")
      .option("url", "jdbc:mysql://192.168.56.152:3306/gmall")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "DBa2020*")
      .option("dbtable", "user_info")
      .mode(SaveMode.Append)
      .save()

    spark.stop()
  }

与 Hive 交互

标签:val,05,df,DataFrame,DataSet,sparkSQL,spark,user
From: https://www.cnblogs.com/istitches/p/18319633

相关文章

  • P2294 [HNOI2005] 狡猾的商人
    原题链接题解先看成前缀和,这样就是维护\(pre[r],pre[l-1]\)两点之间的权值如果是false,代表存在矛盾,且矛盾出现在回路我们可以把这个回路之前的元素看成一个集合,如果新加入的边使得原先两点间的权值不等便失效而对于一个集合里的元素,由于相加具有矢量特性,所以我们维护集合内......
  • [米联客-安路飞龙DR1-FPSOC] FPGA基础篇连载-05 FPGA流水灯实验
    软件版本:Anlogic-TD5.9.1-DR1_ES1.1操作系统:WIN1064bit硬件平台:适用安路(Anlogic)FPGA实验平台:米联客-MLK-L1-CZ06-DR1M90G开发板板卡获取平台:https://milianke.tmall.com/登录"米联客"FPGA社区http://www.uisrc.com视频课程、答疑解惑!1概述流水灯以及Helloworld实验是......
  • 安川伺服驱动器 SGDB-05ADG
    安川伺服驱动器维修经验总结:1、示波器检查驱动器的电流监控输出端时,发现它全为噪声,无法读出;故障原因:电流监控输出端没有与交流电源相隔离(变压器)。处理方法:可以用直流电压表检测观察。2、电机在一个方向上比另一个方向跑得快;(1)故障原因:无刷电机的相位搞错。处理......
  • 题解:P10717「KDOI-05」简单的树上问题
    \(\text{Link}\)题意给你一颗\(n\)个结点的树,有\(k\)次操作,第\(i\)次操作:每个点初始都处于未激活状态;以\(p_{i,j}\)的概率激活点\(j\);对于每个未激活的点\(i\),如果存在激活的结点\(j,k\)且\(i\)在\(j\)到\(k\)的路径上,则\(i\)也会被激活。给出\(v_{i......
  • 0054_Spiral-Matrix【M】
    JY:矩阵螺旋式遍历(一圈圈螺旋式、从外到里)参考:0054_Spiral-Matrix【M】·语雀1、基于矩阵4个边界指针实现顺时针顺序一层层遍历,共需遍历math.ceil(min(m,n)/2)圈fromtypingimportList,DictclassSolution:defspiralOrder(self,matrix:List[Lis......
  • 0059_Spiral-Matrix-ii【M】
    JY:矩阵的螺旋遍历相似题:0054_Spiral-Matrix【M】参考:0059_Spiral-Matrix-ii【M】 1、基于4个边界指针参考0054_Spiral-Matrix【M】中的解法2classSolution:defgenerateMatrix(self,n:int)->[[int]]:left,right,top,bottom=0,n-1,0,n......
  • 05内存情况
    documentReader类型DefaultBeanDefinitionDocumentReaderdelegate类型BeanDefinitionParserDelegate,临时对象属性|-Set<String>usedNames|-ParseStateparseState|-beanNameGenerator(DefaultBeanNameGenerator)BeanDefinition类型GenericBeanDefinition......
  • UNS0874A | UNC4672AV1 HIEE205012R1 间接励磁系统
    产品型号:UNS0874A产品类别:间接励磁系统产品成色:全新、非全新质量保障:365天原产地;美国库存;有货品牌;ABB定义:UNS0874A驱动控制器是电动车或混动车等设备的核心控制部件之一,负责将电池组提供的高压直流电转化为适合驱动电机工作的交流电信号,从而控制电机的旋转速度和......
  • 05document转为BeanDefinition并注册过程
    接着之前的文章4继续分析,文章4里的步骤七里xmlReader(XmlBeanDefinitionReader)的registerBeanDefinitions(doc,resource)方法里最终走到调用他的好友documentReader.registerBeanDefinitions(doc,createReaderContext(resource))方法Ⅰ、代码流程一、程序入口位于documentRea......
  • 嵌入式学习day05
    以下几个位置不允许创建文件和文件夹/:整个linux核心目录/home这里管理家目录,会危害用户数据/mnt/hgfs这个是挂载目录,以后所有的共享和外设都是挂载在这里虚拟机网络模式1.桥接模式网络相关ping语法:ping[选项]目标IP/目标网址说明:测试网络是否畅通执行者:所......