首页 > 数据库 >Spark SQL快速入门

Spark SQL快速入门

时间:2023-11-20 10:33:40浏览次数:43  
标签:入门 val show SQL DataFrame emp sql spark Spark

Spark SQL快速入门

1、概述

spark SQL是Apache用于处理结构化数据的模块。其中包含SQL、DataFrame API、DataSet API,意味着开发人员可以在不同的API之间来回切换,从而使数据处理更加灵活。

image-20231118200813592

  • Spark SQL(Spark on hive)

数据兼容方面SparkSQL不但兼容HIve,还可以从RDD、Parquet文件、JSON文件,甚至支持获取RDBMS数据以及Cassandra、HBase等NoSQL数据。

性能优化方面除了采用In-Memory Columnar Storage,Byte-code Generation等优化技术外,还引进了Cost Model对查询进行动态评估、获取最佳物理计划等等,spark内部底层核心,有两种优化策略:

RBO:基于规则优化/Rule Based Optimizer

CBO:基于代价优化

组件扩展方面无论是 SQL 的语法解析器、分析器还是优化器都可以重新定义,进行扩展。

特点:易整合、统一数据访问、兼容HIve、标准连接

image-20231118201452063

2、数据模型

  • DataFrame
 DataFrame 代表一个不可变的分布式数据集合(一种数据结构),是 Spark 对需要处理的数据的抽象。其核心目的是
让开发者面对数据处理时,只关心要做什么,而不用关心怎么去做,将一些优化的工作交由 Spark 框架本身去处理。
   DataFrame 是具有 Schema 信息的,也就是说可以被看做具有字段名称和类型的数据,类似于关系型数据库中的表,
但是底层做了许多的优化。创建了 DataFrame 之后,就可以使用 SQL 进行数据处理了。

image-20231118205209577

image-20231118205229491

左侧的 RDD[User] 虽然以 User 为类型参数,但 Spark 框架本身不了解 User 类的内部结构。而右侧的 DataFrame 却提 供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame 为数据提供了 Schema 视图,可以把它当做数据库中的一张表来对待。

  • DataSet

DataSet是一个DataFrame API的一个扩展,是Spark SQL最新的数据抽象

用户友好的API风格,既有类型安全检查也具有DataFrame的查询优化特性

用样例类来定义DataSet中数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的数据的字段名称。

DataSet是强类型的

DataFrame是DataSet的特列,DataFrame=DataSet[Row],所以可以通过as方法将DataFrame转换为DataSet。Row是一个类型。

实际项目中建议使用DataSet进行数据封装,这样数据分析和数据存储会更好。在 Scala 和 Java 中,Row 类型的 Dataset 代表 DataFrame,即 Dataset[Row] 等同于 DataFrame。

  • image-20231118205416996

image-20231118210311943

3、执行流程

解析:SQL parser将SQL通过词法、语法解析(检查表、字段是否存在)生成Unresolved Logical Plane(未绑定逻辑执行计划)

编译:分析器配合元数据将未绑定逻辑执行计划编译成编译后的逻辑计划。

优化:逻辑计划调优器使用一些基于规则优化(合并、列裁剪、谓词下推等),将编译后的逻辑计划优化成优化后的逻辑计划。

物理执行计划:物理逻辑计划生成器生成可执行的物理计划,根据过去的性能统计数据,利用代价模型,选择最佳的物理执行计划。

代码执行:最后通过代码生成器把SQL查询生成java字节码。最终执行。

image-20231118211125156

4、快速入门

使用服务器登录spark单机客户端,进入spark-xxx.hadoop3根目录下

bin/spark-shell

image-20231118211421675

sc是shell窗口集成的SparkContext对象,而Spark是Spark Session对象,可以用过sc对象创建RDD,spark对象创建DataFrame、DataSet,读取文件等。

  • 准备数据,在linux上本地创建数据
{"id":1,"name":"zhangsan","age":18,"gender":1}
{"id":2,"name":"lisi","age":19,"gender":0}
{"id":3,"name":"wangwu","age":20,"gender":1}
  • 在shell窗口加载文件数据(在文件目录下)
val df = spark.read.json("file:///opt/yjx/spark-local/data/user/user.json")
df.show  # 查看加载的文件数据
+---+------+---+--------+
|age|gender| id|   name|
+---+------+---+--------+
| 18|     1|  1|zhangsan|
| 19|     0|  2|   lisi|
| 20|     1|  3| wangwu|
+---+------+---+--------+

注意: DataFrame 也是懒执行的,此处的 show 就类似于 RDD 的行动算子。

  • SQL

由于我们读取数据时,首先返回的是DataFrame数据类型,然后我们最终是想通过SQL去操作数据,就需要创建模板视图也就是表,我们才能操作查询表!

 df.createOrReplaceTempView("t_user") #创建临时模板视图

image-20231118212202382

  • 执行SQL查询
spark.sql("SELECT id, name AS n, age, gender FROM t_user").show

5、DSL语法

DSL 为 Domain Specific Language 的缩写,翻译过来为领域特定语言。简单理解就是 Spark 独有的结构化数据 操作语法。

image-20231118212417152

  • 使用常见的几个语法
# "*" 查询所有列
scala> df.select("id", "name", "age", "gender").show
+---+--------+---+------+
| id|   name|age|gender|
+---+--------+---+------+
|  1|zhangsan| 18|     1|
|  2|   lisi| 19|     0|
|  3| wangwu| 20|     1|
+---+--------+---+------+
scala> df.selectExpr("id", "name as n", "age", "gender").show
+---+--------+---+------+
| id|       n|age|gender|
+---+--------+---+------+
|  1|zhangsan| 18|     1|
|  2|   lisi| 19|     0|
|  3| wangwu| 20|     1|
+---+--------+---+------+
scala> df.select('id, 'name).show
+---+--------+
| id|   name|
+---+--------+
|  1|zhangsan|
|  2|   lisi|
|  3| wangwu|
+---+--------+
scala> df.select('id, 'name).show(2)
+---+--------+
| id|   name|
+---+--------+
|  1|zhangsan|
|  2|   lisi|
+---+--------+
only showing top 2 rows
# df.filter("gender = 1").show
scala> df.where("gender = 1").show
+---+------+---+--------+
|age|gender| id|   name|
+---+------+---+--------+
| 18|     1|  1|zhangsan|
| 20|     1|  3| wangwu|
+---+------+---+--------+
scala> df.groupBy('gender).count.show
+------+-----+                                                                  
|gender|count|
+------+-----+
|     0|    1|
|     1|    2|
+------+-----+
scala> df.select("*").orderBy(-'age).show
+---+------+---+--------+
|age|gender| id|   name|
+---+------+---+--------+
| 20|     1|  3| wangwu|
| 19|     0|  2|   lisi|
| 18|     1|  1|zhangsan|
+---+------+---+--------+
# df.select($"age" + 1).show
scala> df.select('age + 1).show
+---------+
|(age + 1)|
+---------+
|       19|
|       20|
|       21|
+---------+
scala> df.groupBy("gender").avg("age").show
+------+--------+
|gender|avg(age)|
+------+--------+
|     0|    19.0|
|     1|    19.0|
+------+--------+

6、模型互转

  • RDD转DataFrame
case class User(id: Int, name: String, age: Int, gender: Int) #创建样例类
val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",20, 1))) #创建RDD
 val df = rdd.toDF#RDD转DataFrame

RDD[User] :表示 RDD 模型的数据类型为 User,这就是强数据类型的体现。

DataFrame = [id: int, name: string ... 2 more fields] :表示 DataFrame 模型的数据类型为一行数据 ( Row ),这是弱数据类型的体现。

  • DataFrame转RDD
val df = spark.read.json("file:///opt/yjx/spark-local/data/user/user.json") #创建DataFrame
val rdd = df.rdd #dataFrame转RDD

因为 DataFrame 模型的数据类型为 Row (弱数据类型),所以 RDD[org.apache.spark.sql.Row] 。

  • RDD转DataFrame
val ds = rdd.toDS

Dataset[User] :表示 Dataset 模型的数据类型为 User,这就是强数据类型的体现。

DataSet集成了RDD和DataFrame的优点,所以它能调用的方法DataSet都可以调用

scala> ds.show
+---+--------+---+------+
| id|   name|age|gender|
+---+--------+---+------+
|  1|zhangsan| 18|     1|
|  2|   lisi| 19|     0|
|  3| wangwu| 20|     1|
+---+--------+---+------+
scala> ds.createOrReplaceTempView("t_user")
scala> spark.sql("SELECT * FROM t_user ORDER BY age DESC").show
+---+--------+---+------+
| id|   name|age|gender|
+---+--------+---+------+
|  3| wangwu| 20|     1|
|  2|   lisi| 19|     0|
|  1|zhangsan| 18|     1|
scala> ds.select("*").orderBy(-'age).show
+---+--------+---+------+
| id|   name|age|gender|
+---+--------+---+------+
|  3| wangwu| 20|     1|
|  2|   lisi| 19|     0|
|  1|zhangsan| 18|     1|
+---+--------+---+------+
  • DataSet转RDD

SparkSession创建DataSet

val ds = spark.createDataset(1 to 5)
val ds = spark.createDataset(List((1,"zhangsan",18,1),(2,"lisi",19,0),(3,"wangwu",20,1)))

创建 Dataset。通过 spark.createDataset 加 RDD 创建 DataSet。

val ds = spark.createDataset(sc.textFile("file:///opt/yjx/sparklocal/data/wordcount/wd1.txt"))
val ds = spark.createDataset(sc.makeRDD(List((1,"zhangsan",18,1),(2,"lisi",19,0),
(3,"wangwu",20,1))))

创建 Dataset。通过集合加样例类创建 DataSet。

case class User(id: Int, name: String, age: Int, gender: Int)
val list = List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu", 20, 1))
val ds = list.toDS
  • DataFrame使用as[类型]转换为DataSet
val df = spark.read.json("file:///opt/yjx/spark-local/data/user/user.json")
case class User(id: Long, name: String, age: Long, gender: Long)
val ds = df.as[User]
  • DataSet转RDD
val rdd = ds.rdd

因为 Dataset 模型的数据类型为 User (强数据类型),所以 RDD[User] 。

  • DataSet转DataFrame
case class User(id: Int, name: String, age: Int, gender: Int)
val list = List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu", 20, 1))
val ds = list.toDS #集合配置样例类转DataSet
val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
df.show
+---+--------+---+------+
| id|   name|age|gender|
+---+--------+---+------+
|  1|zhangsan| 18|     1|
|  2|   lisi| 19|     0|
|  3| wangwu| 20|     1|
+---+--------+---+------+

7、IDEA开发SparkSQL

  • 添加Pom.xml依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.3.2</version>
</dependency>
  • 创建dataFrame
package com.yjxxt.dataframe
import com.yjxxt.dataset.DatasetDemo.User
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
object DataFrameDemo {
  case class User(id: Int, name: String, age: Int, gender: Int)
  def main(args: Array[String]): Unit = {
    // ==================== 建立连接 ====================
    // 初始化配置对象并设置运行模式与 AppName
    val conf = new SparkConf().setMaster("local[*]").setAppName("DataFrameDemo")
    // 根据配置对象初始化 SparkSession 对象
    val spark = SparkSession.builder().config(conf).getOrCreate()
    // 日志级别
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    // 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
    import spark.implicits._
15
No. 15 / 55
6.2. Dataset
    // ==================== 业务处理 ====================
    // RDD 转 DataFrame
    val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",
20, 1)))
    val df1: DataFrame = rdd.toDF()
    df1.show()
    // 直接创建 DataFrame
    val df2 = spark.read.json("data/user/user.json")
    df2.show()
    // 创建临时表
    df2.createOrReplaceTempView("t_user")
    // 编写 SQL
    lazy val sql = "SELECT id, name, age, gender FROM t_user ORDER BY age DESC"
    // 执行 SQL
    spark.sql(sql).show()
    // ==================== 关闭连接 ====================
    spark.stop
 }
}
  • 创建DataSet
package com.yjxxt.dataset
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object DatasetDemo {
  case class User(id: Long, name: String, age: Long, gender: Long)
  def main(args: Array[String]): Unit = {
    // ==================== 建立连接 ====================
    // 初始化配置对象并设置运行模式与 AppName
    val conf = new SparkConf().setMaster("local[*]").setAppName("DatasetDemo")
    // 根据配置对象初始化 SparkSession 对象
    val spark = SparkSession.builder().config(conf).getOrCreate()
    // 日志级别
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    // 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
    import spark.implicits._
    // ==================== 业务处理 ====================
    // RDD 转 Dataset
    val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",
20, 1)))
    val ds1: Dataset[User] = rdd.toDS()
    ds1.show()
    // 创建 DataFrame
    val df: DataFrame = spark.read.json("data/user/user.json")
    // 通过 DataFrame 使用 as[类型] 转换为 DataSet
    val ds2: Dataset[User] = df.as[User]
    ds2.show()
    // 创建临时表
    ds2.createOrReplaceTempView("t_user")
    // 编写 SQL
    lazy val sql = "SELECT id, name, age, gender FROM t_user ORDER BY age DESC"
    // 执行 SQL
    spark.sql(sql).show
        // ==================== 关闭连接 ====================
    spark.stop
 }
}

8、DSL领域特定语言

  • emp.csv
EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO
7369,SMITH,CLERK,7902,1980-12-17,800,0,20
7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30
7521,WARD,SALESMAN,7698,1981-02-22,1250,500,30
7566,JONES,MANAGER,7839,1981-04-02,2975,0,20
7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981-05-01,2850,0,30
7782,CLARK,MANAGER,7839,1981-06-09,2450,0,10
7788,SCOTT,ANALYST,7566,1987-07-13,3000,0,20
7839,KING,PRESIDENT,0,1981-11-17,5000,0,10
7844,TURNER,SALESMAN,7698,1981-09-08,1500,0,30
7876,ADAMS,CLERK,7788,1987-07-13,1100,0,20
7900,JAMES,CLERK,7698,1981-12-03,950,0,30
7902,FORD,ANALYST,7566,1981-12-03,3000,0,20
7934,MILLER,CLERK,7782,1982-01-23,1300,0,10
  • dept.csv
DEPTNO,DNAME,LOC
10,ACCOUNTING,NEWYORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
  • 环境准备
case class Emp(empno: Int, ename: String, job: String, mgr: Int, hiredate: String, sal: Double, comm:Double, deptno: Int)
case class Dept(deptno: Int, dname: String, loc: String)
  • 创建DataFrame并转为DataSet
val e = spark.read.option("header", "true").option("sep",
",").option("inferSchema","true").csv("file:///opt/yjx/spark-local/data/scott/emp.csv")
val emp = e.as[Emp]
val d = spark.read.option("header", "true").option("sep",
",").option("inferSchema","true").csv("file:///opt/yjx/spark-local/data/scott/dept.csv")
val dept = d.as[Dept]
  • header:第一行是否为表头

  • seq:设置数据分割符,默认是逗号

  • inferschema:是否开启类型自动推断

DSL重分区:

emp.repartition(分区数)
dept.repartition(分区数)

 SQL 重分区。Spark 3.x 新特性

SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(3) */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
SELECT /*+ REBALANCE(3, c) */ * FROM t;
  • 建表
emp.createOrReplaceTempView("emp")
dept.createOrReplaceTempView("dept")
  • Action操作
def show(): Unit :默认只显示前 20 条记录。
spark.sql("SELECT * FROM emp").show
def show(numRows: Int): Unit :显示 numRows 条。
def show(truncate: Boolean): Unit :是否最多只显示 20 个字符,默认为 true ;用于控制单个数据列字符
串的显示长度,并且所有列都会靠右对齐。
def show(numRows: Int,truncate: Int,vertical: Boolean): Unit :显示 numRows 条,并设置单个数据列字符串的显示长度,并设置竖状显示方式。
  • collect
def collect(): Array[org.apache.spark.sql.Row] :将表中所有的数据获取到 Array 对象返回。
val array = emp.collect()
def collectAsList(): java.util.List[org.apache.spark.sql.Row] :将表中所有的数据获取到 List 对
象返回(注意是 Java 的 List)
  • describe
def describe(cols: String*): org.apache.spark.sql.DataFrame :这个方法可以动态的传入一个或多个
String 类型的字段名,用于统计数值类型字段的统计值,比如 count(统计),mean(平均),stddev(标准偏差)min(最小),max(最大) 等。

scala> emp.describe("sal").show
+-------+------------------+
|summary|               sal|
+-------+------------------+
| count|                14|
|   mean| 2073.214285714286|
| stddev|1182.5032235162716|
|   min|               800|
|   max|              5000|
+-------+------------------+

提示:标准偏差(Std Dev,Standard Deviation)统计学名词。一种度量数据分布的分散程度之标准,用于衡量数据 值偏离算术平均值的程度。标准偏差越小,这些值偏离平均值就越少,反之亦然。标准偏差的大小可通过标准偏差与 平均值的倍率关系来衡量。 例如,A、B 两组各有 6 位学生参加同一次语文测验,A 组的分数为 95、85、75、65、55、45,B 组的分数为 73、 72、71、69、68、67。这两组的平均数都是 70,但 A 组的标准差应该是 17.078 分,B 组的标准差应该是 2.160 分,说 明 A 组学生之间的差距要比 B 组学生之间的差距大得多。

  • first & head & take(dataSet/dataFrame)
first :获取第一行记录。
head :获取第一行记录。
head(n: Int) :获取前 n 行记录,并放入 Array 中进行返回。
take(n: Int) :获取前 n 行数据,并放入 Array 中进行返回。
takeAsList(n: Int) :获取前 n 行数据,并放入 List 中进行返回(注意是 Java 的 List)。
  • 基本查询
select :获取指定字段值。根据传入的 String 类型字段名(逗号分隔多个),获取指定字段的值,以 DataFrame 类
型返回。emp.select("empno", "ename").show
selectExpr :可以对指定字段进行特殊处理,可以直接对指定字段调用 UDF 函数,或者指定别名等。传入 String
类型参数,得到 DataFrame 对象。emp.selectExpr("empno eno", "ename", "deptno dno").show
col :获取指定字段,只能获取一个字段,返回对象为 Column 类型。emp.col("ename")
apply :获取指定字段,只能获取一个字段,返回对象为 Column 类型。 emp.apply("ename")
  • 条件查询
where(conditionExpr: String) :类似 SQL 语言中 Where 关键字后的条件,可以配合 and 和 or 关键字一起使用。emp.where("sal > 2000 and deptno = 10").show
filter :根据字段进行筛选,和 where 方法效果一致。emp.filter("sal > 2000 and deptno = 10").show
  • 排序
orderBy 和 sort :按指定字段排序,默认为升序, - 表示降序排序。 sort 和 orderBy 使用方法相同。emp.orderBy(-$"sal").show
emp.orderBy(-'sal).show
sortWithinPartitions :和 sort 方法功能类似,区别在于 sortWithinPartitions 方法是按 Partition 来排序的。 val emp2 = emp.repartition(2)  emp2.sortWithinPartitions(-'sal).show
  • 分组
groupBy :根据字段进行分组操作。 groupBy 方法有两种调用方式,可以传入 String 类型的字段名,也可传入
Column 类型的对象。该方法返回的是 RelationalGroupedDataset 类型对象,在 RelationalGroupedDataset 的API 中提供了 groupBy 之后的操作: max , min , count , mean , avg , sum , agg 等。
emp.groupBy("deptno").count.show
emp.groupBy("deptno").max("sal").show
emp.groupBy("deptno").avg("sal").show
  • 聚合
聚合操作调用的是 agg 方法,该方法有多种调用方式。一般与 groupBy 方法配合使用。
emp.groupBy("deptno").agg(count("ename").alias("人数"), sum("sal").alias("总工资")).show
  • 去重
distinct :返回一个不包含重复记录的结果集。emp.select("mgr").distinct.show emp.select("*").distinct.show
dropDuplicates :根据指定字段去重,不传入字段时效果和 distinct 一致
emp.select("*").dropDuplicates.show
  • 分页
limit :SparkSQL 中的 limit 关键字只支持获取指定的前 n 行记录。limit 仅仅实现了非常简单的类似 TopN 的功能,还不能很好的查询某个区间范围的记录,比如分页显示。那有没有什么好的解决办法呢?
emp.limit(5).show
有,使用 monotonically_increasing_id 方法可以生成一个单调增长的列。然后使用该列实现分页查询。
emp.withColumn("rank", monotonically_increasing_id()).where("rank >= 0 and rank < 5").show
 emp.withColumn("rank", monotonically_increasing_id()).where("rank >= 5 and rank < 10").show
spark.sql("SELECT * FROM (SELECT *, monotonically_increasing_id() AS rank FROM emp) WHERE rank>= 0 AND rank < 5").show
  • Union
union 方法:对两个 Dataset 进行组合,类似于 SQL 中的 UNION ALL 操作。
emp.union(emp2).show
  • Join
 join :笛卡尔积。默认不允许执行会直接报错。要不使用允许执行的 crossJoin ,要不通过配置打开相关设置,提示:本文使用的 Spark 3.3.2 版本默认已打开笛卡尔积查询。
emp.crossJoin(emp2).show
emp.join(emp2).show
on :以下使用类型 SQL 的 JOIN ON 操作。
joinWith :支持各种复杂 JOIN('inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left', 'left_outer', 'rightouter',
'right', 'right_outer', 'leftsemi', 'left_semi', 'leftanti', 'left_anti', 'cross')。
emp.as("e").joinWith(dept.as("d"), col("e.deptno") === col("d.deptno"),
"left").show(truncate=false)

9、列操作

  • 添加列
def withColumn(colName: String, col: Column): DataFrame :当某列存在时替换值,不存在时添加此
列,返回一个新的 DataFrame 对象。
ds.withColumn("LEVEL", col("LEVEL").cast("Integer")) :更改某列的数据类型,返回一个新的
DataFrame 对象。
val dept2 = dept.withColumn("LEVEL", lit("1"))
def withColumnRenamed(existingName: String, newName: String): DataFrame :重命名某列,返回一个新的 DataFrame 对象。
val dept4 = dept3.withColumnRenamed("LEVEL", "L")
  • 删除列
drop :去除指定字段,保留其他字段,返回一个新的 DataFrame 对象,其中不包含去除的字段,一次只能去除一个字段。 val dept5 = dept4.drop("L")

10、WordCount案例

在项目根路径下创建 data 目录, data 目录下再创建 wordcount 目录, wordcount 目录下创建以下文件

  • wd1
Hello Hadoop
Hello ZooKeeper
Hello Hadoop Hive
  • wd2
Hello Hadoop HBase
Hive Scala Spark
  • 案例代码
package com.yjxxt.wordcount
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
object WordCountDemo {
case class User(id: Long, name: String, age: Long, gender: Long)
def main(args: Array[String]): Unit = {
// ==================== 建立连接 ====================
 // 初始化配置对象并设置运行模式与 AppName
 val conf = new SparkConf().setMaster("local[*]").setAppName("WordCountDemo")
 // 根据配置对象初始化 SparkSession 对象
  val spark = SparkSession.builder().config(conf).getOrCreate()
 // 日志级别
  val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    // ==================== 业务处理 ====================
    // 创建 DataFrame
    val df: DataFrame = spark.read.text("data/wordcount")
    // 创建临时表
    df.createOrReplaceTempView("t_wordcount")
    // SQL
    lazy val sql1 =
      """
        |WITH temp AS (
        |SELECT EXPLODE(SPLIT(value, '\\s+')) AS word
        |FROM t_wordcount
        |)
        |SELECT word, COUNT(*) AS cnt FROM temp
        |GROUP BY word ORDER BY cnt DESC
        |""".stripMargin
    lazy val sql2 =
      """
        |SELECT word, COUNT(*) cnt
        |FROM t_wordcount
        |LATERAL VIEW EXPLODE(SPLIT(value, '\\s+')) wordtable AS word
        |GROUP BY word ORDER BY cnt DESC
        |""".stripMargin
    spark.sql(sql2).show()
    // DSL
    import org.apache.spark.sql.functions._
    df.select(explode(split(col("value"), "\\s+")).as("word"))
     .groupBy(col("word"))
     .agg(count(col("word")).as("cnt"))
     .orderBy(col("cnt").desc).show()
    // ==================== 关闭连接 ====================
    spark.stop
 }
}

11、自定义函数

UDF(User Defined Function):普通函数,一进一出,比如 UPPER, LOWER;

UDAF(User Defined Aggregation Function):聚合函数,多进一出,比如 COUNT/MAX/MIN;

UDTF(User Defined Table Generating Function):表生成函数,一进多出。(explode/cocat_ws)

image-20231118221340438

  • UDF案例

导入依赖

         <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.2</version>
        </dependency>

编写案例

package com.zwf.UDF

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
 * @author MrZeng
 * @date 2023-11-16 10:58
 * @version 1.0
 */
object UdfDemo {
  case class Emp(empno: Int, ename: String, job: String, mgr: Int, hiredate: String, sal: Double,
                 comm: Double, deptno: Int)

  def main(args: Array[String]): Unit = {
    // ==================== 建立连接 ====================
    // 初始化配置对象并设置运行模式与 AppName
    val conf = new SparkConf().setMaster("local[*]").setAppName("UDFDemo")
    // 根据配置对象初始化 SparkSession 对象
    val spark = SparkSession.builder().config(conf).getOrCreate()
    // 日志级别
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    // 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
    import spark.implicits._
    val df: DataFrame = spark.read
      .option("header", "true")
      .option("sep", ",")
      .option("inferSchema", "true")
      .csv("data/csv/emp.csv")
    val emp: Dataset[Emp] = df.as[Emp]
    emp.createOrReplaceTempView("emp")
    import spark.implicits._
      //编写UDF自定义函数
    val prefix_name: UserDefinedFunction = spark.udf.register("prefix_name", (name: String) => {
      "Hello" + name
    })
      //使用自定义函数
      val sql=
        """
          |SELECT ename, prefix_name(ename) AS new_name FROM emp
          |""".stripMargin
     spark.sql(sql).show(5)
    spark.stop()

  }

}
  • UDTF案例(导入依赖看上面)

准备json数据(使用时,每行数据的换行符要删除)

{"movie": [{"movie_name": "肖申克的救赎", "movie_type": "犯罪" }, {"movie_name":"肖申克的救赎", "movie_type":"剧情" }]}
{"movie": [{"movie_name": "这个杀手不太冷", "movie_type": "剧情" }, {"movie_name":"这个杀手不太冷", "movie_type":"动作" }]}
{"movie": [{"movie_name": "勇敢的心", "movie_type": "动作" }, {"movie_name":"勇敢的心", "movie_type":"战争" }]}
{"movie": [{"movie_name": "东邪西毒", "movie_type": "武侠" }, {"movie_name":"东邪西毒", "movie_type":"剧情" }]}
{"movie": [{"movie_name": "霍比特人", "movie_type": "冒险" }, {"movie_name":"霍比特人", "movie_type":"奇幻" }]}

代码示范(自定义聚合函数)

package com.zwf.UDAF

import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator

/**
 * @author MrZeng
 * @date 2023-11-16 11:18
 * @version 1.0
 */

/**
 * 自定义UDAF聚合函数:计算薪资的平均值
 * IN: 输入数据的类型
 * BUFF:缓存区数据的类型
 * OUT:返回值的数据类型
 */

//缓存区的数据结构
case class Buff(var sum:Double,var count:Long)

object MyAvg  extends Aggregator[Double,Buff,Double]{
  // 初始化缓冲区 Buff(求和, 计数)
  override def zero: Buff = Buff(0D,0L)
  // 根据输入的数据更新缓冲区的数据

  /**
   *
   * @param b  缓冲区对象
   * @param a  入参数据
   * @return
   */
  override def reduce(b: Buff, a: Double): Buff = {
    //累加每次输入的数据
    b.sum+=a
    //计数器每次加一
    b.count+=1
    //返回缓冲对象
    b
  }

  override def merge(b1: Buff, b2: Buff): Buff = {
      //把每个分区的值归并
      b1.sum+=b2.sum
      b1.count+=b2.count
      b1
  }

     //返回的最终结果 返回数据类型是Double
  override def finish(reduction: Buff): Double = {
      reduction.sum/reduction.count
  }
   //缓冲区数据的编码处理
  override def bufferEncoder: Encoder[Buff] = Encoders.product
   //输出数据的编码处理
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

代码示范(注册自定义UDAF函数并使用)

package com.zwf.UDAF

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, functions}

/**
 * @author MrZeng
 * @date 2023-11-16 10:58
 * @version 1.0
 */
object UdAFDemo {
  case class Emp(empno: Int, ename: String, job: String, mgr: Int, hiredate: String, sal: Double,
                 comm: Double, deptno: Int)

  def main(args: Array[String]): Unit = {
    // ==================== 建立连接 ====================
    // 初始化配置对象并设置运行模式与 AppName
    val conf = new SparkConf().setMaster("local[*]").setAppName("UDFDemo")
    // 根据配置对象初始化 SparkSession 对象
    val spark = SparkSession.builder().config(conf).getOrCreate()
    // 日志级别
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    // 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
    import spark.implicits._
    val df: DataFrame = spark.read
      .option("header", "true")
      .option("sep", ",")
      .option("inferSchema", "true")
      .csv("data/csv/emp.csv")
    val emp: Dataset[Emp] = df.as[Emp]
    emp.createOrReplaceTempView("emp")
    // 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
     import spark.implicits._
    //自定义UDAF函数  主要是格式
val myAvg1: UserDefinedFunction = spark.udf.register("myAvg", functions.udaf(MyAvg))
      val sql=
        """
          |SELECT  myAvg(sal) AS new_name FROM emp
          |""".stripMargin
     spark.sql(sql).show()
    //在DSL中使用
    emp.select(myAvg1('sal).as("avg_sal")).show
    spark.stop()
  }
}
  • UDTF

代码示范 (UDTF还是沿用了hive语法,spark没有自己的语法)

package com.zwf.UDTF

import org.apache.hadoop.hive.ql.exec.{UDFArgumentLengthException, UDFArgumentTypeException}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory, StructObjectInspector}
import org.json.{JSONArray, JSONObject}

import java.util
//自定义UDTF对象
class MyUDTF extends GenericUDTF {
  // 实例化 UDTF 对象,判断传入参数的长度以及数据类型
  // 和 Hive 的自定义 UDTF 不一样的是,Spark 中用的是已经过时的 initialize(ObjectInspector[] argOIs)
  override def initialize(argOIs: Array[ObjectInspector]): StructObjectInspector = {
    // 获取入参
    // 参数校验,判断传入参数的长度以及数据类型
    if (argOIs.length != 1) throw new UDFArgumentLengthException("参数个数必须为 1")
    if (ObjectInspector.Category.PRIMITIVE != argOIs(0).getCategory) {
      /*
      UDFArgumentTypeException(int argumentId, String message)
        异常对象需要传入两个参数:
          int argumentId:参数的位置,ObjectInspector 中的下标
          String message:异常提示信息
       */
      throw new UDFArgumentTypeException(0, "参数类型必须为 String")
    }
    // 自定义函数输出的字段和类型
    // 创建输出字段名称的集合
    val columNames = new util.ArrayList[String]
    // 创建字段数据类型的集合
    val columType = new util.ArrayList[ObjectInspector]
    columNames.add("movie_name")
    columType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)
    columNames.add("movie_type")
    columType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)
    ObjectInspectorFactory.getStandardStructObjectInspector(columNames, columType)
  }
  // 处理数据
  override def process(objects: Array[AnyRef]): Unit = {
    val outline = new Array[String](2)
    if (objects(0) != null) {
      val jsonObject = new JSONObject(objects(0).toString)
      val jsonArray: JSONArray = jsonObject.getJSONArray("movie")
      var i = 0
      while ( {
        i < jsonArray.length
      }) {
        outline(0) = jsonArray.getJSONObject(i).getString("movie_name")
        outline(1) = jsonArray.getJSONObject(i).getString("movie_type")
        // 将处理好的数据通过 forward 方法将数据按行写出
        forward(outline)
        i += 1
      }
    }
  }

  override def close(): Unit = {}
}

代码示范(注册并使用函数)

使用hive的SQL进行注册:create temporary function 自定义函数名称 as ' 函数实现类的完全限定名包括自定义函数的类名 直接在SQL中使用即可。

package com.zwf.UDTF

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}


object UDTFDemo {
  def main(args: Array[String]): Unit = {
    // ==================== 建立连接 ====================
    // 初始化配置对象并设置运行模式与 AppName
    val conf = new SparkConf().setMaster("local[*]").setAppName("UDTFDemo")
    // 根据配置对象初始化 SparkSession 对象
    // enableHiveSupport() 开启内置 Hive 支持
    val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    // 日志级别
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    // ==================== 业务处理 ====================
    // 创建表并载入数据
    val movie: DataFrame = spark.read.text("data/UDTF/movie.json")
    movie.createOrReplaceTempView("t_movie")
    // 注册 UDTF 函数
    // 使用 sparkSession.sql("CREATE TEMPORARY FUNCTION 自定义函数名称 AS '函数实现类的完全限定名包括自定义函数的类名'") 进行 注册
    spark.sql("CREATE TEMPORARY FUNCTION MyUDTF AS 'com.zwf.UDTF.MyUDTF'")
    spark.sql("DESC FUNCTION EXTENDED MyUDTF").show()
    // 使用 UDTF 函数
    spark.sql("SELECT MyUDTF(value) FROM t_movie").show()
    // ==================== 关闭连接 ====================
    spark.stop
  }
}

12、数据的读取和保存

sparkSQL提供了通用的数据保存和读取方式。使用DataFrameReader.load()读取文件DataFrameWriter.save()写入文件 API,默认读取和保存文件格式为Parquet。

  • 保存数据(通用)
package com.yjxxt.write
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object WriteFileDemo01 {
  def main(args: Array[String]): Unit = {
    // ==================== 建立连接 ====================
    // 初始化配置对象并设置运行模式与 AppName
    val conf = new SparkConf().setMaster("local[*]").setAppName("UDTFDemo")
    // 根据配置对象初始化 SparkSession 对象
    val spark = SparkSession.builder().config(conf).getOrCreate()
    // 日志级别
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    // ==================== 业务处理 ====================
    val df: DataFrame = spark.read.json("data/user/user.json")
    df.createOrReplaceTempView("t_user")
    val sql = "SELECT id, name, age, gender FROM t_user"
    // write 还可以配合 mode(SaveMode.模式) 设置模式:覆盖、追加、存在报错、忽略
    // save 默认保存的文件格式就是 parquet  save()中传递的参数是文件存放的路径
    spark.sql(sql).write.mode(SaveMode.Overwrite).save("data/write/parquet")
    // 配合 format 保存 csv
    spark.sql(sql).write.format("csv").mode(SaveMode.Append).save("data/write/csv")
    // 配合 format 保存 json
    spark.sql(sql).write.format("json").mode(SaveMode.ErrorIfExists).save("data/write/json")
    // 配合 format 保存 orc
    spark.sql(sql).write.format("orc").mode(SaveMode.Ignore).save("data/write/orc")
    // 配合 rdd 和 saveAsTextFile 保存 text
    spark.sql(sql).rdd.saveAsTextFile("data/write/text")
    // ==================== 关闭连接 ====================
    spark.stop
 }
}
  • 指定文件格式保存
package com.yjxxt.write
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
object WriteFileDemo02 {
  def main(args: Array[String]): Unit = {
    // ==================== 建立连接 ====================
    // 初始化配置对象并设置运行模式与 AppName
    val conf = new SparkConf().setMaster("local[*]").setAppName("UDTFDemo")
    // 根据配置对象初始化 SparkSession 对象
    val spark = SparkSession.builder().config(conf).getOrCreate()
    // 日志级别
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
// ==================== 业务处理 ====================
    val df: DataFrame = spark.read.json("data/user/user.json")
    df.createOrReplaceTempView("t_user")
    val sql = "SELECT id, name, age, gender FROM t_user"
    // write 还可以配合 mode(SaveMode.模式) 设置模式:覆盖、追加、存在报错、忽略
    // 指定保存 parquet
    spark.sql(sql).write.parquet("data/write/parquet")
    // 指定保存 csv
    spark.sql(sql).write.csv("data/write/csv")
    // 指定保存 json
    spark.sql(sql).write.json("data/write/json")
    // 指定保存 orc
    spark.sql(sql).write.orc("data/write/orc")
    // 配合 rdd 和 saveAsTextFile 保存 text
    spark.sql(sql).rdd.saveAsTextFile("data/write/text")
    // ==================== 关闭连接 ====================
    spark.stop
 }
}
  • 读取文件(通用)
package com.yjxxt.read
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrameReader, SparkSession}
object ReadFileDemo01 {
  def main(args: Array[String]): Unit = {
    // ==================== 建立连接 ====================
    // 初始化配置对象并设置运行模式与 AppName
    val conf = new SparkConf().setMaster("local[*]").setAppName("UDTFDemo")
    // 根据配置对象初始化 SparkSession 对象
    val spark = SparkSession.builder().config(conf).getOrCreate()
    // 日志级别
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    // ==================== 业务处理 ====================
    val read: DataFrameReader = spark.read
    // load 默认读取的就是 parquet
    read.load("data/write/parquet").show
    // 配合 format 读取 csv
    read.format("csv")
     .option("header", "true").option("sep", ",").option("inferSchema", "true")
     .load("data/write/csv").show
    // 配合 format 读取 json
    read.format("json").load("data/write/json").show
    // 配合 format 读取 orc
    read.format("orc").load("data/write/orc").show
    // 配合 format 读取 text
    read.format("text").load("data/write/text").show
    // ==================== 关闭连接 ====================
    spark.stop
 }
}
  • 读取指定文件格式
package com.yjxxt.read
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrameReader, SparkSession}
object ReadFileDemo02 {
  def main(args: Array[String]): Unit = {
    // ==================== 建立连接 ====================
    // 初始化配置对象并设置运行模式与 AppName
    val conf = new SparkConf().setMaster("local[*]").setAppName("UDTFDemo")
    // 根据配置对象初始化 SparkSession 对象
    val spark = SparkSession.builder().config(conf).getOrCreate()
    // 日志级别
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    // ==================== 业务处理 ====================
    val read: DataFrameReader = spark.read
    // 指定读取 parquet
    read.parquet("data/write/parquet").show
    // 指定读取 csv 可以设置属性 header:表头    sep:切割符  inferSchema:类型推断
    read.option("header", "true").option("sep", ",").option("inferSchema", "true")
     .csv("data/write/csv").show
    // 指定读取 json
    read.json("data/write/json").show
    // 指定读取 orc
    read.orc("data/write/orc").show
    // 指定读取 text
    read.text("data/write/text").show
    // ==================== 关闭连接 ====================
    spark.stop
 }
}

13、读写JDBC

  • 导入依赖
       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
  • 写数据到Mysql数据库中
package com.zwf.jdbc

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}

import scala.collection.mutable

/**
 * @author MrZeng
 * @date 2023-11-16 14:06
 * @version 1.0
 */
object WriterJDBC {

   case class Dept(deptno:BigInt,dname:String,loc:String)

   //jdbc 写入到数据库
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("appName")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
     //sc中获取RDD sparkSession中获取sql dataFrame dateSet
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("ERROR")
    val map: Map[String, String] = Map(("header", "true"), ("inferSchema", "true"), ("seq", ","))
    val df: DataFrame = spark.read.options(map).csv("data/csv/dept.csv")
      import spark.implicits._
    val ds: Dataset[Dept] = df.as[Dept]
    ds.createOrReplaceTempView("t_emp")
    val map1 =mutable.HashMap[String, String]()
      map1.put("driver","com.mysql.cj.jdbc.Driver")
      map1.put("url","jdbc:mysql://localhost:3306/spark")
      //没有表会自动创建表
      map1.put("dbtable","t_emp")
      map1.put("user","root")
      map1.put("password","root@123456")
    spark.sql("select * from t_emp;").write.format("jdbc").mode(SaveMode.Overwrite).options(map1).save()

    spark.stop()
  }

}

  • 数据库中读取数据
package com.zwf.jdbc

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import scala.collection.mutable

/**
 * @author MrZeng
 * @date 2023-11-16 14:37
 * @version 1.0
 */
object ReadJDBC {

  def main(args: Array[String]): Unit = {
    // 初始化配置对象并设置运行模式与 AppName
    val conf = new SparkConf().setMaster("local[*]").setAppName("ReadJdbcDemo")
    // 根据配置对象初始化 SparkSession 对象
    val spark = SparkSession.builder().config(conf).getOrCreate()
    // 日志级别
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    val maps = new mutable.HashMap[String, String]()
      //jdbc连接信息
    maps.put("driver","com.mysql.cj.jdbc.Driver")
    maps.put("url","jdbc:mysql://localhost:3306/spark")
    maps.put("user","root")
    maps.put("password","root@123456")
    //要读取数据库中的表名
    maps.put("dbtable","t_emp")
    spark.read.format("jdbc").options(maps).load().show()
    spark.stop()
  }
}

如果使用spark-shell窗口访问MySQL,需要将Mysql的连接驱动包,拷贝到spark的jars目录下,然后启动spark-shell命令。

bin/spark-shell \
--jars /usr/local/spark-yarn/spark-3.3.2-hadoop3/jars/mysql-connector-java-8.0.18.jar \
--driver-class-path /usr/local/spark-yarn/spark-3.3.2-hadoop3/jars/mysql-connector-java-8.0.18.jar

14、整合Hive

  • 内部有hive

Spark SQL为了学习使用,默认使用Derby数据库,hive的元数据存在Derby数据库中,默认的仓库地址是spark根路径/spark-warehouse,如果是window上,在项目根路径下有个spark-warehouse文件夹。

package com.zwf.innerToHive

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * @author MrZeng
 * @date 2023-11-16 14:53
 * @version 1.0
 */
object InnerHiveDemo {
  /**
   * 单机整合hive
   * @param args
   */
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("InnerHiveDemo")
    // 根据配置对象初始化 SparkSession 对象
    // enableHiveSupport() 开启 Hive 支持
    val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    val sc = spark.sparkContext
     sc.setLogLevel("ERROR")
    val hql=
      """
        |create table if not exists emp(
        |deptno int,
        |dname string,
        |loc string
        |)
        |row format delimited fields terminated by ','
        |lines terminated by '\n';
        |""".stripMargin
        //创建表并载入数据
         spark.sql(hql)
         //这里是加载本地 使用 load data local 默认使用Derby数据库存储元数据在spark-warehouse
        spark.sql("load data local inpath 'data/csv/emp.txt' into table emp")
         spark.sql("select * from emp").show()
         spark.stop()
  }

}

image-20231119114652104

  • 外部Hive

将hive中config目录下的hive-site.xml拷贝到spark conf目录下

将mysql驱动包拷贝到Spark的jars目录

将hadoop根目录下中etc目录下的core-site.xml和hdfs-site.xml拷贝到spark的conf目录下(为了连接HDFS)

重启spark-shell

image-20231119115037685

 使用 bin/spark-shell 直接访问 Hive。这种访问方式是最麻烦的,每次写 SQL 都要写 spark.sql() 结构。

scala> spark.sql("USE scott")
scala> spark.sql("SHOW TABLES").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|   scott|     dept|      false|
|   scott|     emp|      false|
|   scott| salgrade|      false|
+--------+---------+-----------+

使用 bin/spark-sql 直接访问 Hive。没有表格格式!

spark-sql (default)> SHOW DATABASES;
databaseName
default
scott
Time taken: 2.516 seconds, Fetched 2 row(s)

使用 bin/beeline 访问 Hive。

${spark_home}/bin/beeline -u jdbc:hive2://node01:10000 -n root

缺点:简单的不走MR 复杂的SQL会走MR

  • 最常用的方式,也是最常用的方式

使用saprk ThrifServer应用程序,通过beeline连接spark ThrifServer应用程序。

spark-sql 是一个Session 级别应用任务,而非是一个服务;

HiveServer2 是一个Java 进程,在服务器本地运行,通过 JDBC 的方式接受客户端的调度,通过语法解析,生成执 行计划,最终发送给 YARN 进行调度执行,自己并不负责具体执行。

Spark ThriftServer 进程本质上是Spark 的一个Application(是在 YARN 上的一个 Application 应用,前提是使用 YARN 做资源管理)。通过 JDBC 的方式接受客户端的调度,通过语法解析,生成执行计划,最后在当前 Application 内去执行,而不是单独去启动一个 YARN 任务执行。这就要求当前的 Application 资源足够丰富且弹性。

  • 启动Spark ThrifServer应用程序(注意:spark thrift server只支持yarn client 模式,不支持cluster模型)
sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10015 \
--master yarn --deploy-mode client \
--driver-cores 1 --driver-memory 512M \
--num-executors 1 --executor-cores 1 --executor-memory 1G
  • 使用Beeline连接
bin/beeline -u jdbc:hive2://node01:10015 -n root

在Idea上开发

首先 Windwos 系统配置好 Hadoop 环境变量,然后将 HDFS 的 core-site.xml 、 hdfs-site.xml 配置文件以及Hive 的 hive-site.xml 配置文件拷贝至 Maven 项目的 resources 目录,最后在项目的 pom 文件中添加以下依赖。
  • 导入依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
</dependency>
  • 编写代码
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object OuterHiveDemo {
  def main(args: Array[String]): Unit = {
    // ==================== 建立连接 ====================
    // 初始化配置对象并设置运行模式与 AppName
    val conf = new SparkConf().setMaster("local[*]").setAppName("OuterHiveDemo")
    // 根据配置对象初始化 SparkSession 对象
    // enableHiveSupport() 开启 Hive 支持
      //可以通过 conf.set("spark.sql.warehouse.dir", path) 修改 Hive 仓库的地址。
    val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    // 日志级别
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    // ==================== 业务处理 ====================
    val TabSQL=
      """
        |create table if not exists emp(
        |empno int,
        |ename string,
        |job string,
        |mgr int,
        |hiredate string,
        |sal double,
        |comm double,
        |deptno int)
        |row format delimited fields terminated by ','
        |lines terminated by '\n';
        |""".stripMargin
    spark.sql("USE scott")
    spark.sql(TabSQL)
    spark.sql("load data local inpath 'data/csv/emp.txt' into table emp;")
    spark.sql("SELECT deptno, AVG(sal) AS avg_sal FROM emp GROUP BY deptno").show()
    // ==================== 关闭连接 ====================
    spark.stop
  }
}

15、数据写入Hive

使用Hive SQL语法格式

-- 将查询结果导出到本地
insert overwrite local directory '/root/user' select * from t_user; 
-- 将查询结果输出到 HDFS
insert overwrite local directory '/bd/export/user' select * from t_user; 
-- 通过查询创建表
create if not exists user as select * from t_user;

使用InsertInto

insertInto("表名") :首先要求表必须存在否则插入会报错;其次无论 SaveMode 是 Append 还是 Overwrite,需 要当前 DataFrame/Dataset 的 Schema(数据库) 与目标表的 Schema(数据库) 一致。

saveAsTable

saveAsTable("表名"):将DataFrame/DataSet的数据保存为指定的表
如果表不存在,则创建表插入数据;
如果表存在,且 SaveMode 为 Overwrite 则使用新的 Schema(数据库) 覆盖表;
如果表存在,且 SaveMode 为 Append 则需要当前 DataFrame/Dataset 的 Schema(数据库) 与目标表的 Schema 一致。
  • 案例(使用conf.set("spark.sql.parquet.writeLegacyFormat", "true")防止老版spark无法解析parquet文件)
package com.zwf.hive

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
 * @author MrZeng
 * @date 2023-11-16 17:58
 * @version 1.0
 */
object DsToTable {

  def main(args: Array[String]): Unit = {

    // ==================== 建立连接 ====================
    // 初始化配置对象并设置运行模式与 AppName 默认以parquet文件格式保存
    val conf = new SparkConf().setMaster("local[*]").setAppName("WriteHiveDemo")
    conf.set("spark.sql.parquet.writeLegacyFormat", "true")
//    conf.set("spark.sql.warehouse.dir", "/spark/warehouse/")
    // 根据配置对象初始化 SparkSession 对象
    // enableHiveSupport() 开启 Hive 支持
    val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    // 日志级别
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    // ==================== 业务处理 ====================
    spark.sql("USE SCOTT")
    lazy val sql =
      """
        |SELECT * FROM (
        |SELECT empno, ename, deptno, sal,
        |RANK() OVER(PARTITION BY deptno ORDER BY sal DESC) r
        |FROM emp
        |) t WHERE t.r <= 3
        |""".stripMargin
    // org.apache.spark.sql.AnalysisException: Table not found: emp_topn;
    //首先要emp_topn数据表已经创建好!
    //spark.sql(sql).write.mode(SaveMode.Overwrite).insertInto("emp_topn")
    //把查询的数据转为表格上传到Hdfs上
    // spark.sql(sql).write.mode(SaveMode.Overwrite).save("/spark/warehouse/")
    //以表格上传到scott数据库下的默认路径
    spark.sql(sql).write.mode(SaveMode.Overwrite).saveAsTable("emp_topn")
    spark.sql("select * from emp_topn").show()
    // ==================== 关闭连接 ====================
    spark.stop
  }
}

使用spark submit命令提提交Spark SQL文件,提交任务如下Spark SQL Shell 目前还不支持 Cluster 模式):

$SPARK_HOME/bin/spark-sql \
--master yarn --deploy-mode client \
--driver-cores 1 --driver-memory 512M \
--num-executors 1 --executor-cores 1 --executor-memory 1G \
--queue default \
-f /root/xxx.sql

标签:入门,val,show,SQL,DataFrame,emp,sql,spark,Spark
From: https://www.cnblogs.com/smallzengstudy/p/17843383.html

相关文章

  • GreatSQL社区与Amazon、Facebook、Tencent共同被MySQL致谢
    一、来自MySQL官方的感谢在2023-10-25MySQL官方发布的8.2版本ReleaseNotes中,GreatSQL社区核心开发者RichardDang和HaoLu,分别收到了来自MySQL官方的贡献感谢,与Amazon、Facebook(Meta)、Tencent等一并出现在感谢清单中。详见:MySQL8.2ReleaseNotes/Chang......
  • 神经网络入门篇:神经网络的梯度下降(Gradient descent for neural networks)
    神经网络的梯度下降在这篇博客中,讲的是实现反向传播或者说梯度下降算法的方程组单隐层神经网络会有\(W^{[1]}\),\(b^{[1]}\),\(W^{[2]}\),\(b^{[2]}\)这些参数,还有个\(n_x\)表示输入特征的个数,\(n^{[1]}\)表示隐藏单元个数,\(n^{[2]}\)表示输出单元个数。在这个例子中,只介绍过的......
  • SQL DML语句
    DataManipulationLanguage数据操纵语言,用于数据的增加、删除、更新等操作。包括INSERT(向表中插入新数据)、UPDATE(修改表中的数据)和DELETE(从表中删除数据)。插入数据INSERT插入一行数据插入数据的时候,不指定列名直接使用VALUES指定数据,表示为表中的每一列提供一个值,值的顺......
  • 设置pgsql使用SSL加密(自签名证书)
    1、切换至postgres用户supostgres2、进入到pgsql的安装目录cd/home/data/postgresql-11.63、生成自签名证书opensslreq-new-x509-days365-nodes-outserver.crt-keyoutserver.key依次输入国家:CN哪个州:Asia城市:SHANGHAI公司:SH部门:SH服务器名称:SH邮箱:可不写如图......
  • Istio从入门到精通——Istio 能做什么?
    Istio能做什么? 下面通过一个天气预报应用展示Istio的服务访问形式。其中有两个服务:forecast和recommendation。forecast由Node.js开发而成,recommendation由Java开发而成。这两个服务之间通过最简单的服务名进行调用,在代码中只实现最简单的业务处理,不包含额外的服务......
  • LNMP一键安装包安装的mysql远程连接不上的问题
    正常的做法: 以root用户登录mysql:grant all privileges on*.* to创建的用户名@"%"identified by "密码";flushprivileges;例如:mysql>grant all privileges on*.* tozhangsan@"%"identified by "123456";......
  • oracle日期常用sql
    selectto_date('2023-11-03','YYYY-MM-DD')+(LEVEL-1)*INTERVAL'15'MINUTEfromdualconnectbylevel<=96;----获取间隔15分钟的时间段selectto_char(TRUNC(to_date('2023','YYYY'),'YYYY')+(LEVLE-1),'YY......
  • FPGA入门笔记003——计数器IP核调用与验证
    FPGA设计方式主要有三种:1、原理图(不推荐);2、VerilogHDL设计方式;3、IP核输入方式计数器IP核调用与验证步骤如下:1、添加IP核文件打开QuartusII,新建一个项目,名称为counter_ip。选择Tools->MegaWizardPlug-InManager。选择第一个选项。在搜索栏中输入COUNTER,单击LPM_COU......
  • Log4j入门使用
    ✨前言✨本篇文章主要在于,初步了解log4j,以及对它的简单使用......
  • JUnit单元测试使用教程(新手入门)
    ✨前言✨本篇文章主要在于,单元测试工具jUnit的简单认识及入门使用......