首页 > 数据库 >SparkSQL编程-DataFrame

SparkSQL编程-DataFrame

时间:2024-05-30 11:03:28浏览次数:29  
标签:scala 编程 Hive DataFrame SparkSQL sql spark Spark

SparkSession

在老的版本中,SparkSQL 提供两种 SQL 查询起始点:
一个叫SQLContext,用于Spark 自己提供的 SQL 查询;一个叫 HiveContext,用于连接 Hive 的查询。

从2.0开始, SparkSession作为 Spark 最新的 SQL 查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的 API 在SparkSession上同样是可以使用的。
SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。

当使用 spark-shell 的时候, spark 会自动的创建一个叫做spark的SparkSession, 就像我们以前可以自动获取到一个sc来表示SparkContext

在这里插入图片描述

DataFrame编程

Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式.
DataFrame API 既有 transformation操作也有action操作. DataFrame的转换从本质上来说更具有关系, 而 DataSet API 提供了更加函数式的 API

1 DataFrame创建

With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.
有了 SparkSession 之后, 通过 SparkSession有 3 种方式来创建DataFrame:
1 通过 Spark 的数据源创建
2 通过已知的 RDD 来创建
3 通过查询一个 Hive 表来创建.

1.1 通过 Spark 的数据源创建

Spark支持的数据源:

在这里插入图片描述

// 读取 json 文件
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]

// 展示结果
scala> df.show
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

1.2 通过已知的 RDD 来创建

注意:
涉及到RDD, DataFrame, DataSet之间的操作时, 需要导入:import spark.implicits._ 
这里的spark不是包名, 而是表示SparkSession 的那个对象. 所以必须先创建SparkSession对象再导入. 
implicits是一个内部object
// 首先创建一个RDD
scala> val rdd1 = sc.textFile("/opt/module/spark-local/examples/src/main/resources/people.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/examples/src/main/resources/people.txt MapPartitionsRDD[10] at textFile at <console>:24
1.2.1 手动转换
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); (paras(0), paras(1).toInt)})
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:26

// 转换为 DataFrame 的时候手动指定每个数据字段名
scala> rdd2.toDF("name", "age").show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+
1.2.2 通过样例类反射转换
// 1.创建样例类
scala> case class People(name :String, age: Int)
defined class People
// 2.使用样例把 RDD 转换成DataFrame
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); People(paras(0), paras(1).toInt) })
rdd2: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[6] at map at <console>:28

scala> rdd2.toDF.show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+
1.2.3 通过API方式进行转换
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object DataFrameDemo2 {
    def main(args: Array[String]): Unit = {

        val spark: SparkSession = SparkSession.builder()
            .master("local[*]")
            .appName("Word Count")
            .getOrCreate()
        val sc: SparkContext = spark.sparkContext
        val rdd: RDD[(String, Int)] = sc.parallelize(Array(("lisi", 10), ("zs", 20), ("zhiling", 40)))
        // 映射出来一个 RDD[Row], 因为 DataFrame其实就是 DataSet[Row]
        val rowRdd: RDD[Row] = rdd.map(x => Row(x._1, x._2))
        // 创建 StructType 类型
        val types = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))
        val df: DataFrame = spark.createDataFrame(rowRdd, types)
        df.show

    }
}

1.3 通过查询一个 Hive 表来创建

Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQ L编译时可以包含 Hive 支持,也可以不包含。

包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。
需要强调的一点是,如果要在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。
一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。
如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。

若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到 Spark的配置文件目录中($SPARK_HOME/conf)。
即使没有部署好 Hive,Spark SQL 也可以运行。 
需要注意的是,如果你没有部署好Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db。
此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,
这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中
(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。
1.3.1 使用内嵌Hive
// 如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可.
// Hive 的元数据存储在 derby 中, 仓库地址:$SPARK_HOME/spark-warehouse
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

scala> spark.sql("create table aa(id int)")
19/02/09 18:36:10 WARN HiveMetaStore: Location: file:/opt/module/spark-local/spark-warehouse/aa specified for non-external table:aa
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|       aa|      false|
+--------+---------+-----------+

// 向表中加载本地数据数据
scala> spark.sql("load data local inpath './ids.txt' into table aa")
res8: org.apache.spark.sql.DataFrame = []

scala> spark.sql("select * from aa").show
+---+
| id|
+---+
|100|
|101|
|102|
|103|
|104|
|105|
|106|
+---+
// 然而在实际使用中, 几乎没有任何人会使用内置的 Hive
1.3.2 使用外置Hive
1.3.2.1 配置准备
1.Spark 要接管 Hive 需要把 hive-site.xml copy 到conf/目录下.
2.把 Mysql 的驱动 copy 到 jars/目录下.
3.如果访问不到hdfs, 则需要把core-site.xml和hdfs-site.xml 拷贝到conf/目录下.
1.3.2.2 启动 spark-shell
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|      emp|      false|
+--------+---------+-----------+
scala> spark.sql("select * from emp").show
19/02/09 19:40:28 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
+-----+-------+---------+----+----------+------+------+------+
|empno|  ename|      job| mgr|  hiredate|   sal|  comm|deptno|
+-----+-------+---------+----+----------+------+------+------+
| 7369|  SMITH|    CLERK|7902|1980-12-17| 800.0|  null|    20|
| 7499|  ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0|    30|
| 7521|   WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0|    30|
| 7566|  JONES|  MANAGER|7839|  1981-4-2|2975.0|  null|    20|
| 7654| MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0|    30|
| 7698|  BLAKE|  MANAGER|7839|  1981-5-1|2850.0|  null|    30|
| 7782|  CLARK|  MANAGER|7839|  1981-6-9|2450.0|  null|    10|
| 7788|  SCOTT|  ANALYST|7566| 1987-4-19|3000.0|  null|    20|
| 7839|   KING|PRESIDENT|null|1981-11-17|5000.0|  null|    10|
| 7844| TURNER| SALESMAN|7698|  1981-9-8|1500.0|   0.0|    30|
| 7876|  ADAMS|    CLERK|7788| 1987-5-23|1100.0|  null|    20|
| 7900|  JAMES|    CLERK|7698| 1981-12-3| 950.0|  null|    30|
| 7902|   FORD|  ANALYST|7566| 1981-12-3|3000.0|  null|    20|
| 7934| MILLER|    CLERK|7782| 1982-1-23|1300.0|  null|    10|
| 7944|zhiling|    CLERK|7782| 1982-1-23|1300.0|  null|    50|
+-----+-------+---------+----+----------+------+------+------+
1.3.2.3 启动 spark-sql cli
在spark-shell执行 hive 方面的查询比较麻烦.spark.sql("").show
Spark 专门给我们提供了书写 HiveQL 的工具: spark-sql cli
1.3.2.4 使用hiveserver2 + beeline
# spark-sql 得到的结果不够友好, 所以可以使用hiveserver2 + beeline

1.启动 thrift服务器
sbin/start-thriftserver.sh \
--master yarn \
--hiveconf hive.server2.thrift.bind.host=hadoop201 \
-–hiveconf hive.server2.thrift.port=10000 \

2.启动beeline客户端
bin/beeline

# 然后输入
!connect jdbc:hive2://hadoop201:10000
# 然后按照提示输入用户名和密码

在这里插入图片描述

2 DataFrame语法风格

2.1 SQL语法风格

SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询.
这种风格的查询必须要有临时视图或者全局视图来辅助
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.createOrReplaceTempView("people")

scala> spark.sql("select * from people").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
注意:
1.临时视图只能在当前 Session 有效, 在新的 Session 中无效.
2.可以创建全局视图. 访问全局视图需要全路径:如global_temp.xxx
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.createGlobalTempView("people")

scala> spark.sql("select * from global_temp.people")
res31: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> res31.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> spark.newSession.sql("select * from global_temp.people")
res33: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> res33.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2.2 DSL语法风格

DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据. 
可以在 Scala, Java, Python 和 R 中使用 DSL;
使用 DSL 语法风格不必去创建临时视图了.
scala> df.select($"name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+


scala> df.select("name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

标签:scala,编程,Hive,DataFrame,SparkSQL,sql,spark,Spark
From: https://blog.csdn.net/weixin_44872470/article/details/139316379

相关文章

  • SparkSQL概述
    为了给熟悉RDBMS(关系数据库管理系统)但又不理解MapReduce的技术人员提供快速上手的工具,hive应运而生,它是运行在Hadoop上的SQL-on-hadoop工具;但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,运行效率低;sparksql则是采用内存存储可以减少大量的中间......
  • 给师妹写的《Java并发编程之线程池十八问》被表扬啦!
    写在开头  之前给一个大四正在找工作的学妹发了自己总结的关于Java并发中线程池的面试题集,总共18题,将之取名为《Java并发编程之线程池十八问》,今天聊天时受了学妹的夸赞,心里很开心,毕竟自己整理的东西对别人起到了一点帮助,记录一下!Java并发编程之线程池十八问  经过之前......
  • 《python编程从入门到实践》day42
    #昨日知识点回顾        使用Bootstrap设置项目“学习笔记”的样式#今日知识点学习    20.1.3修改base.html        1.定义HTML头部#base.html{%loadbootstrap4%}<!doctypehtml><htmllang="en"><head> <metacharset="utf......
  • 网络编程
    复习目录复习PymysqlFTPTelnetPop3SmtpSocketServer(服务器端)Client(客户端)记得点赞......
  • 618精选编程书单:学好代码是用好大模型的基础
      大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学......
  • 《用ChatGPT轻松搞定Java编程难题:从基础到复杂案例的全面解析》
    ChatGPT国内使用体验点击(文件中并非网站跳转而是详细教程):Docshttps://uajqbcov4oa.feishu.cn/docx/GmeGdznJkoc3nzxHECQcojZ9nXg?from=from_copylink随着人工智能技术的快速发展,越来越多的开发者开始使用ChatGPT来辅助解决编程中的问题。ChatGPT不仅可以快速生成代码,还能进行......
  • 理解结对编程
    基本概念:结对编程强调双人合作,其中一人负责编写代码(通常被称为“驾驶员”),而另一人则负责即时审查代码并提供反馈(通常被称为“观察员”)。这两个角色会定期交换,以保持双方都能活跃地参与编程过程,并共享知识。核心优势:提高代码质量:通过两人共同审查代码,可以显著减少错误,提高代码......
  • 易基因:Adv Sci:NSUN2介导m5C修饰代谢重编程促进肿瘤进展 揭示治疗新选择|项目文章
    大家好,这里是专注表观组学十余年,领跑多组学科研服务的易基因。喜讯!易基因表观转录组学RNA-BS技术服务见刊《ADVANCEDSCIENCE》表观遗传修饰包括有丝分裂遗传和稳定的修饰,这些修饰在不改变基础DNA序列的情况下调控基因表达。通常癌症中的表观遗传失调表现为突变、表观遗传修饰酶......
  • 说说你了解过、使用过什么编程工具?
    1、VisualStudio:这是微软开发的一款功能强大的集成开发环境(IDE),广泛用于Windows平台的应用程序开发。它支持多种编程语言,包括C#、C++、F#等,并且可以用于开发桌面、移动、Web和游戏应用。2、VisualStudioCode:简称VSCode,是微软推出的一款轻量级但功能全面的源代码编辑器。它支持......
  • 详解 Java AOP:面向方面编程的核心概念与 Spring 实现
    ......