首页 > 数据库 >Spark(九)SparkSQL DataFrame

Spark(九)SparkSQL DataFrame

时间:2024-09-24 15:51:08浏览次数:1  
标签:20 name scala age DataFrame SparkSQL Spark spark

DataFrame

  • Spark SQL的DataFrame API允许我们使用DataFrame而不用必须去注册临时表或者生成SQL表达式,DataFrame API既有transformation操作也有action操作

1. 创建DataFrame

从Spark数据源进行创建

  • 启动Spark Shell
[user@hadoop102 spark-yarn]$ bin/spark-shell
  • 查看Spark支持创建文件的数据源格式,按tab键
scala> spark.read.
csv      jdbc   load     options   parquet   table   textFile      
format   json   option   orc       schema    text
  • 在本地创建user.json文件,并将文件上传到HDFS文件系统
[user@hadoop102 hadoop-3.1.3]$ vim user.json
[user@hadoop102 hadoop-3.1.3]$ hadoop fs -moveFromLocal ./user.json /datas
{"username":"zhangsan","age":20}
  • 读取json文件创建DataFrame
scala> val df = spark.read.json("hdfs://hadoop102:8020/datas/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string] 
  • 展示结果
scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- username: string (nullable = true)

scala> df.show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+

从RDD进行转换

  • 在本地创建一个文件,有id、name、age三列,用空格分隔,上传到HDFS
[user@hadoop102 hadoop-3.1.3]$ vim root.txt
[user@hadoop102 hadoop-3.1.3]$ hadoop fs -moveFromLocal ./root.txt /datas
1 abao 20
2 lili 20
3 zhangsan 24
4 wangwu 30
  • 打开spark-shell,创建RDD
scala> val lineRDD = sc.textFile("hdfs://hadoop102:8020/datas/root.txt").map(_.split(" "))
lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[10] at map at <console>:24
  • 定义case class,相当于表的schema
scala> case class person(id:Int,name:String,age:Int)
defined class person
  • 将RDD和case class关联
scala> val personRDD = lineRDD.map(x => person(x(0).toInt,x(1),x(2).toInt))
personRDD: org.apache.spark.rdd.RDD[person] = MapPartitionsRDD[11] at map at <console>:27
  • 将RDD转换成DataFrame
scala> val personDF = personRDD.toDF
personDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
  • 查看数据
scala> personDF.show()
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|    abao| 20|
|  2|    lili| 20|
|  3|zhangsan| 24|
|  4|  wangwu| 30|
+---+--------+---+

2. SQL语法

(1)对DataFrame创建一个临时表

scala> personDF.createOrReplaceTempView("people")

(2)通过SQL语句实现查询全表

scala> val sqlDF = spark.sql("select * from people")
sqlDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

(3)结果展示

scala> sqlDF.show()
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|    abao| 20|
|  2|    lili| 20|
|  3|zhangsan| 24|
|  4|  wangwu| 30|
+---+--------+---+
  • 普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表,使用全局临时表时需要全路径访问,如global_temp.people

(4)对于DataFrame创建一个全局表

scala> df.createGlobalTempView("people")

(5)通过SQL语句实现查询全表

scala> spark.sql("SELECT * FROM global_temp.people").show()
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|    abao| 20|
|  2|    lili| 20|
|  3|zhangsan| 24|
|  4|  wangwu| 30|
+---+--------+---+
scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|    abao| 20|
|  2|    lili| 20|
|  3|zhangsan| 24|
|  4|  wangwu| 30|
+---+--------+---+

3. DSL语法

  • DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据,可以在Scala, Java, Python和R中使用DSL,使用DSL语法风格不必去创建临时视图了

(1)只查看"name"列数据

scala> personDF.select("name").show()
+--------+
|    name|
+--------+
|    abao|
|    lili|
|zhangsan|
|  wangwu|
+--------+

(2)查看"name"列数据以及"age+1"数据

  • 涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
scala> personDF.select($"name",$"age"+1).show()
+--------+---------+
|    name|(age + 1)|
+--------+---------+
|    abao|       21|
|    lili|       21|
|zhangsan|       25|
|  wangwu|       31|
+--------+---------+

scala> personDF.select('name,'age+1 as "newage").show()
+--------+------+
|    name|newage|
+--------+------+
|    abao|    21|
|    lili|    21|
|zhangsan|    25|
|  wangwu|    31|
+--------+------+

(3)查看"age"大于"20"的数据

scala> personDF.filter($"age">20).show()
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  3|zhangsan| 24|
|  4|  wangwu| 30|
+---+--------+---+

(4)按照"age"分组,查看数据条数

scala> personDF.groupBy("age").count.show()
+---+-----+                                                                     
|age|count|
+---+-----+
| 20|    2|
| 24|    1|
| 30|    1|
+---+-----+

4. DataFrame转换为RDD

scala> val personRDD = personDF.rdd
personRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[40] at rdd at <console>:25

scala> val array = personRDD.collect
array: Array[org.apache.spark.sql.Row] = Array([1,abao,20], [2,lili,20], [3,zhangsan,24], [4,wangwu,30])

scala> array(0)
res17: org.apache.spark.sql.Row = [1,abao,20]

scala> array(0)(1)
res18: Any = abao

标签:20,name,scala,age,DataFrame,SparkSQL,Spark,spark
From: https://www.cnblogs.com/shihongpin/p/18428374

相关文章

  • 获取两个 DataFrame 中某两列相同的项
    要获取两个DataFrame中某两列相同的项,可以使用pandas的merge方法或isin方法。以下是两种方法的示例。方法1:使用mergemerge方法可以用来根据多个列将两个DataFrame合并。通过设置how='inner',可以得到两个DataFrame中在指定列上相同的项。importpandasaspd......
  • 大数据毕业设计选题推荐-安顺旅游景点数据分析系统-Hive-Hadoop-Spark
    ✨作者主页:IT研究室✨个人简介:曾从事计算机专业培训教学,擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。☑文末获取源码☑精彩专栏推荐⬇⬇⬇Java项目Python项目安卓项目微信小程序项目......
  • Spark学习(一):概述
    Spark学习(一):概述上周六面试腾讯时被问到是否了解Spark,彼时对Spark毫无接触故答不了解,面试结束后了解到Spark与MapReduce渊源颇深,去年夏天学习MIT6.824分布式系统设计时曾深入学习过MapReduce(分布式学习:MapReduce-pinoky-博客园(cnblogs.com))故对Spark产生兴趣,由此开始学习......
  • Spark(六)运行模式(二)
    Yarn模式1、解压缩文件[user@hadoop102software]$tar-zxvfspark-3.0.0-bin-hadoop3.2.tgz-C/opt/module[user@hadoop102software]$cd/opt/module[user@hadoop102module]$mvspark-3.0.0-bin-hadoop3.2spark-yarn2、修改配置文件(1)修改hadoop配置文件/opt/module......
  • Spark(五)运行环境(一)
    Local模式不需要其他任何节点资源就可以在本地执行Spark代码的环境,一般用于教学,调试,演示等在IDEA中运行代码的环境称之为开发环境1、解压缩文件将spark-3.0.0-bin-hadoop3.2.tgz文件上传到Linux并解压缩,放置在指定位置,路径中不要包含中文或空格压缩文件放在'/opt/software......
  • 【python】Panda 之Dataframe 基础api讲解代码 建议在Jupyter Notebook 中运行
    建议在JupyterNotebook中运行jupyternotebook环境搭建文章目录1.dataframe常用属性2.dataframe的常用方法3.dataframe对象的布尔值操作4.datafrane对象的计算5.更改series和dataframe对象5.2修改行名和列名5.3添加删除插入列6.导入和导出数据6.1导......
  • 基于Spark的温布尔登特色赛赛事数据分析预测及算法实现_718p9405
    目录技术栈和环境说明python语言解决的思路具体实现截图框架介绍技术路线操作可行性性能/安全/负载方面python-flask核心代码部分展示python-django核心代码部分展示详细视频演示源码获取技术栈和环境说明结合用户的使用需求,本系统采用运用较为广泛的Python语言,DJAN......
  • Spark(三)Spark Core(二)
    RDD详解RDD持久化/缓存某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存valrdd1=sc.textFile("hdfs://node01:8020/words.txt")valrdd2=rdd1.flatMap(x=>x.split("")).map((_,1)).reduceByKey(_+_)rdd2.ca......
  • Spark(二)Spark Core(一)
    RDD详解前提:MapReduce框架采用非循环式的数据流模型,把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销,且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象,因此出现了RDD这个概念概念RDD(ResilientDistributedDataset)叫做弹性......
  • Spark(一)概述
    基本概念Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎SparkvsHadoopSpark和Hadoop的根本差异是多个作业之间的数据通信问题:Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘HadoopSpark类型分布式基础平台,包含计算,存储,调度分......