首页 > 其他分享 >学习记录14

学习记录14

时间:2024-01-26 18:56:28浏览次数:20  
标签:14 scala age 记录 学习 apache org spark name

  本次学习学习了Dataframe方面的知识

DataFrame

DataFrame概念

Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-2003规范

 

DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询

 

RDD是分布式的 Java对象的集合,但是,对象内部结构对于RDD而言却是不可知的

DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息,可以快速去找相关信息

DataFrame创建

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> val spark = SparkSession.builder().getOrCreate()
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1cc00f56
// 支持RDDs转换为DataFrames以及后面的操作
scala> import spark.implicits._
import spark.implicits._

scala> val df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

// 展示
scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

//查看结构
scala> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 
// 查询
scala> df.select(df("name"),df("age") + 1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

// 过滤
scala> df.filter(df("age") > 20).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

// 分组
scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|  30|    1|
|null|    1|
+----+-----+

// 排序
scala> df.sort(df("age").desc).show()
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+

// 进行多列排序 
scala> df.sort(df("age").desc,df("name").asc).show()
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+

// 进行重命名
scala> df.select(df("name").as("username"),df("age")).show()
+--------+----+
|username| age|
+--------+----+
| Michael|null|
|    Andy|  30|
|  Justin|  19|
+--------+----+

利用反射机制推断RDD模式

在利用反射机制推断RDD模式时,需要首先定义一个case class,因为,只有case class才能被Spark隐式地转换为DataFrame

scala> case class Person(name: String, age: Long)  //定义一个case class
defined class Person
scala> val peopleDF = spark.sparkContext.
| textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").
| map(_.split(",")).
| map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint] 
scala> peopleDF.createOrReplaceTempView("people") //必须注册为临时表才能供下面的查询使用
scala> val personsRDD = spark.sql("select name,age from people where age > 20")
//最终生成一个DataFrame,下面是系统执行返回的信息
personsRDD: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> personsRDD.map(t => "Name: "+t(0)+ ","+"Age: "+t(1)).show()  //DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值
//下面是系统执行返回的信息
+------------------+ 
| value|
+------------------+
|Name:Michael,Age:29|
| Name:Andy,Age:30|
+------------------+

使用编程方式定义RDD模式

当无法提前定义case class时,就需要采用编程方式定义RDD模式。

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

//下面加载文件生成RDD
scala> val peopleRDD = spark.sparkContext.
| textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:26 

//定义一个模式字符串
scala> val schemaString = "name age"
schemaString: String = name age

// 进行
scala> val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = true))
fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,StringType,true))

//进行关联,schema描述了模式信息,模式中包含name和age两个字段
//shcema就是“表头”
scala> val schema = StructType(fields)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,StringType,true))

//下面加载文件生成RDD
scala> val peopleRDD = spark.sparkContext.
| textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:26 

//对peopleRDD 这个RDD中的每一行元素都进行解析
scala> val rowRDD = peopleRDD.map(_.split(",")).
|  map(attributes => Row(attributes(0), attributes(1).trim.toInt))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:29
//上面得到的rowRDD就是“表中的记录”

//下面把“表头”和“表中的记录”拼装起来
 scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int]

 //必须注册为临时表才能供下面查询使用
scala> peopleDF.createOrReplaceTempView("people")
scala> val results = spark.sql("SELECT name,age FROM people")
results: org.apache.spark.sql.DataFrame = [name: string, age: int] 
scala> results.
|  map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).
|  show()
+--------------------+
| value|
+--------------------+
|name: Michael,age:29|
| name: Andy,age:30|
| name: Justin,age:19|

把RDD保存为文件

scala> val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> peopleDF.select("name","age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")

write.format()可以指定输出json,parquet,jdbc,orc,csv,text等类型的文件

scala> df.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")

生成的均为一个目录

[atguigu@hadoop102 mycode]$ ll
总用量 0
drwxrwxr-x. 2 atguigu atguigu   6 1月  22 17:56 examples
drwxrwxr-x. 5 atguigu atguigu  64 1月  22 21:58 filesort
drwxrwxr-x. 3 atguigu atguigu  23 1月  22 21:27 MaxAndMin
drwxr-xr-x. 2 atguigu atguigu 176 1月  23 11:48 newpeople.csv
drwxr-xr-x. 2 atguigu atguigu  84 1月  23 11:53 newpeople.txt
drwxrwxr-x. 3 atguigu atguigu  52 1月  22 21:53 rdd
drwxrwxr-x. 2 atguigu atguigu  40 1月  22 21:22 rdd2
drwxrwxr-x. 5 atguigu atguigu  64 1月  22 18:31 topscala
drwxrwxr-x. 6 atguigu atguigu  97 1月  21 21:09 wordcount

标签:14,scala,age,记录,学习,apache,org,spark,name
From: https://www.cnblogs.com/JIANGzihao0222/p/17990478

相关文章

  • 学习记录15
    本次学习学习了将dataframe里吗有结构的数据加载到mysql以及进行读这里采用独立应用程序的方式读取MySQL数据库内容。创建一个代码文件SparkReadMySQL.scala,其内容如下:importorg.apache.log4j.{Level,Logger}importorg.apache.spark.sql.SparkSessionobjectSparkRea......
  • 2024年1月Java项目开发指南14:关于post中的body和param以及java中的@RequestBody和@Req
    在HTTP请求中,POST方法通常用于向服务器发送数据,这些数据可以在请求的body中,也可以在URL的param中。不过,这两者的使用方式和适用场景是不同的。Body:在POST请求中,body主要用于包含要发送到服务器的数据。这些数据通常是表单数据、JSON数据或其他类型的数据。当你需要在请求体中发送......
  • openGauss学习笔记-208 openGauss 数据库运维-常见故障定位案例-TPCC高并发长稳运行因
    openGauss学习笔记-208openGauss数据库运维-常见故障定位案例-TPCC高并发长稳运行因脏页刷盘效率导致性能下降208.1TPCC高并发长稳运行因脏页刷盘效率导致性能下降208.1.1问题现象TPCC高并发长稳运行因脏页刷盘效率导致性能下降,具体表现为:初始性能较高,随着运行时间增加,数据......
  • 自动化测试平台搭建背景及记录
    在目前产品的迭代过程中,公司现有的自动化测试体系存在很多问题,大多数情况是人工进行用例回归测试,低效且易出错,导致测试流程在效率和品质方面均未达到理想状态。同时,业务上线周期的日益缩短也导致产品质量的不稳定性也愈发突出,出现版本质量不统一的问题。流程下也伴随着以下痛点:......
  • 记录--h5端调用手机摄像头实现扫一扫功能
    这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助一、前言最近有遇到一个需求,在h5浏览器中实现扫码功能,其本质便是打开手机摄像头定时拍照,特此做一个记录。主要技术栈采用的是vue2,使用的开发工具是hbuilderX。经过测试发现部分浏览器并不支持打开摄像头,测试了......
  • Excel表格转GDScript插件的使用 学习笔记
    【【蘩】[Godot教程]Excel表格转GDScript插件的使用】ConfigHelper导出生成在excels文件夹下。添加新的字符串。导出表格,会被忽略掉的(如注释内容)"ignore":true......
  • Springcloud学习笔记61---Spring MVC的拦截器HandlerInterceptor
    1. HandlerMethod介绍HandlerMethod它作为SpringMVC的非公开API,可能绝大多数小伙伴都对它比较陌生,但我相信你对它又不是那么的生疏,因为你可能没用过但肯定见过。比如SpringMVC的拦截器HandlerInterceptor的拦截方法的第三个入参Objecthandler,虽然它是Object类型,但其实绝大部......
  • 【学习笔记】链式前向星
    链式前向星,是一种邻接表存图的方式。本质上是用数组模拟一个链表。适合存各种类型的图,但是查询两节点间的边是否存在很不方便,对出边进行排序也很麻烦。#include<iostream>#include<algorithm>#include<cstring>#include<queue>usingnamespacestd;constintN=1e5+5;type......
  • 读论文-基于自监督学习的序列推荐算法
    前言今天读的文章为一篇名叫《基于自监督学习的序列推荐算法》的期刊论文,文章于2023年8月15日发表在自然科学报上,这篇论文的引用为:[1]闫猛猛,汪海涛,贺建峰等.基于自监督学习的序列推荐算法[J].重庆邮电大学学报(自然科学版),2023,35(04):722-731.摘要原文如下:针对现有序列......
  • D-Bus学习
    D-Bus学习https://blog.csdn.net/f110300641/article/details/106823611概念D-Bus是在Linus上桌面系统中各应用程序之间通信(IPC)和远程过程调用(RPC)的机制,实现了多个程序在计算机上同时通信。D-Bus将原本一对一的通信过程出抽象出一个软件总线,应用程序链接到这个总线,不关......