首页 > 数据库 >Spark_04 SparkSQL的介绍及使用

Spark_04 SparkSQL的介绍及使用

时间:2024-07-01 21:29:56浏览次数:18  
标签:04 df where rdd SparkSQL sql Spark 数据 select

说明

这一章主要包括对于sparksql概念的介绍,sparksql的特点,sparksql用到的数据类型,DataFrame的基础方法等。

概念

SparkSQL是Spark用于处理结构化数据的模块。

诞生

由于MapReduce这种计算模型的执行效率较慢,rdd原生代码较为复杂,所以引入了SparkSQL应运而生。它可以将sql转换为RDD,然后提交到集群中去运行,执行效率非常快。

特点

融合性:使用sql配合spark使用,封装了不同语言的dsl方法。

统一数据访问:使用read方法可以读取hdfs数据,mysql数据,不同类型的文件数据

                         使用write方法可以写入hdfs,mysql不同类型的文件

兼容hive:可以使用hivesql方法    可以使用sparksql直接生成hive数据表

标准化数据连接:支持jdbc和odbc连接方式连接SparkSQL

SparkSQL和hive的关系

  • 运行模式是 spark on hive

  • 是spark团建独立开发的工具,2014年发布1.0版本

  • sparkSQL工具对spark的兼容性更好,优化性能得到提升

  • sparkSQL本质也是将sql语句转化为rdd执行,catalyst引擎负责将sql转化为rdd

  • sparkSQL可以连接使用hive的metastore服务,管理表的元数据

数据类型

RDD 基础数据类型

弹性分布式数据集合

#以下是示例代码
person_rdd = [1,zhangsan,20]
person_rdd2 = [2,李四,22]

data = person_rdd.collect() # 将rdd数据获取后转为list
data[0]
data[1]

dataframe 对于RDD再次进行封装

结构化行列,俩部分组成,row类(即行数据),schema(即表结构)

#以下是示例代码

[
    row1 = [1,zhangsan,20],
    row2 = [2,lisi,22]
]

schema
{
    id:int,
    name:string
    age:int
}

dataset 对于dataframe再次进行封装

结构化数据 俩倍分组成,row类(即行数据),schema(即表结构)。从dataset中取出一行数据可以当做dataframe类型操作。

DataFrame的详细学习

创建dataframe数据,需要使用一个sparksession的类创建

SparkSession类是在SparkContext的基础上进行了封装,也就是SparkSession类中包含了SparkContext

示例代码

基本创建

#导入行

from pyspark.sql import Row,SparkSession

#导入所有数据类型

from pyspark.sql.types import *

#定义每行数据

r1 = Row(id=1,name='张三',gender='female')
r2 = Row(id=2,name='李四',gender='male')

#定义字段信息

schema = StructType().add('id',IntegerType(),False).add('name',StringType(),False).add('male',StringType(),False)

'''
创建dataframe数据
# 创建SparkSession对象 是sparksql的入口类对象,操作sparkSQL必须创建此对象
# SparkSession.builder -> 类名.类属性名 -> 生成Builder类的对象
'''

ss = SparkSession.builder.getOrCreate()

df = ss.createDataFrame([r1,r2],schema=schema)

df.show()
df.printSchema()

rdd和dataframe的相互转化

#rdd转df

ss = SparkSession.builder.getOrCreate()

sc = ss.sparkContext

rdd = sc.parallelize([[1,'张三',20],[1,'李四',30]])
schema = StructType().add('id',IntegerType()).add('name',StringType()).add('age',IntegerType())

df = rdd.toDF(schema=schema)
df.show()

#df转rdd

rdd1 =df.rdd
printI(rdd1.collect())

rdd2 = rdd1.map(lambda:x:x.id)
print(rdd2)

DSL方法 -- DataFrame方法

spark提供DSL方法和sql的关键词一样,使用此方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据

from join 知道数据在哪 df本身就是要处理的数据 df.join(df2)

where 过滤需要处理的数据 df.join(df2).where()

group by 聚合 数据的计算 df.join(df2).where().groupby().sum()

having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()

select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()

order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()

limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()

示例代码

# 使用DF的DSL方法实现对df数据处理
# DF的数据是结构化数据,所以DSL方法和SQL的关键字基本一致

from pyspark.sql import SparkSession

# 使用sparkcontext读取hdfs上的文件数据
sc = ss.SparkContext

将读取的数据转换为RDD
rdd =sc.textFile('file:///root/data/test.csv')
res = rdd.collect()
print(res)

#将字符串数据切换为二维rdd
rdd_split =rdd.map(lambda x:x.split(','))
res2=rdd_split.collect()
print(res)

# 将rdd转为df数据
df =rdd.toDF(schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')
df.show()

#df的查询操作
df_select1 =df.select('name',df['age'],df.gender)
df_select1.show()

#命名别称
df_select2 = df.select(df.name.alias('username'))
#类型转换
df_select3 = df.select(df.name.cast('int'),df.age.astype('int'))


#df的过滤操作
df_where1=df_select3.where('age>=22 and gender="女"')


#df的分组操作
df_groupby1=df_select3.groupby('major', 'gender')

# 数据的分组过滤操作
df_groupby3 = df_select6.groupby('major', 'gender').avg('age').where('avg(age) > 21.5')

# 数据的排序操作
df_orderby = df_select6.orderBy('birthday') # 返回新的df
df_orderby.show()

# 对出生日期按照从大到小排序
# sql:select * from df order by birthday desc
df_orderby2 = df_select6.orderBy('birthday',ascending=False) # 返回新的df
df_orderby2.show()


# 取出指定数量的数据
# 取出前10条数据
# sql:select * from df limit 10
df_limit = df_select6.limit(10) # 返回新的df
df_limit.show()

# 在df中对某列计算后生成新的一列
df_new =df.withColumn('new_age',df.age+10)


SQL语句

使用sparksession提供的sql方法,编写sql语句执行

需要将要操作的dataframe数据映射成一张数据表, 编写sql语句执行

df.createTempView('stu')

sql1='select * from stu'

df_sql=ss.sql(sql1).show

标签:04,df,where,rdd,SparkSQL,sql,Spark,数据,select
From: https://blog.csdn.net/a666b777/article/details/140086993

相关文章

  • 为Ubuntu-24.04-live-server-amd64磁盘扩容
    系列文章目录Ubuntu-24.04-live-server-amd64安装界面中文版文章目录系列文章目录前言一、检查系统本身情况1.用lsblk命令查看自己系统磁盘是什么状态2.用df-h命令查看文件系统的磁盘空间使用情况3.解决Ubuntu-24.04磁盘空间只能用一半的问题3-1扩展逻辑卷:3-2.......
  • 大数据面试题之Spark(6)
              Spark输出文件的个数,如何合并小文件?Spark的driver是怎么驱动作业流程的?SparkSQL的劣势?介绍下SparkStreaming和StructedStreamingSpark为什么比Hadoop速度快?DAG划分Spark源码实现?SparkStreaming的双流join的过程,怎么做的?Spark的Bl......
  • c++使用matplotlibcpp,subplot() 报错问题-ubuntu22.04
    使用matplotlibcpp.h在C++代码中绘制图形plt::subplot();程序抛出运行时错误,terminatecalledafterthrowinganinstanceof'std::runtime_error'what():Calltosubplot()failed.解决方法:在matplotlibcpp.h文件中把PyTuple_SetItem(args,0,PyFloat_FromDouble(......
  • Ubuntu20.04之VNC的安装与使用
    本教程适用于Ubuntu20.04及以下版本,Ubuntu22.04版本或有出入更多更新的文章详见我的个人博客:【前往】文章目录1.安装图形桌面1.1选择安装gnome桌面1.2选择安装xface桌面2.安装VNC-Server3.配置VCN-Server4.连接VNC5.设置VNC-Server为系统服务(可选)1.安装图形桌面如果linux系统已经......
  • 代码随想录算法训练营第四十二天 | 1049最后一块石头的重量II 494.目标和 474.一和零
    1049.最后一块石头的重量题目链接文章讲解视频讲解解题思路:  将石头尽量分为相等的两堆,两堆最差即为所求结果  石头的重量就是石头的价值动规五部曲:dp[j]:表示背包容量为j时可以装的石头的总价值递推公式:dp[j]=max(dp[j],dp[j-stones[i]]+stones[i]初始化:均......
  • 代码随想录算法训练营第十天|232.用栈实现队列、225.用队列实现栈、20.有效的括号、 1
    今天学习了栈与队列这两个数据结构,栈是一个先进后出的结构,在C++中用stack进行表示,有push、pop、top、empty这些属性;队列是一个先进后出的结构,有push、pop、front、back。empty这些属性。在底层实现上,他们都是用deque双向队列进行实现的。232.用栈实现队列题目链接:232.用栈......
  • 乌班图Ubuntu 24.04 SSH Server 修改默认端口重启无效
    试用最新的乌班图版本,常规修改ssh端口,修改完毕后重启sshd提示没有找到service,然后尝试去掉d重启ssh后查看状态,端口仍然是默认的22,各种尝试都试了不行,重启服务器后倒是端口修改成功了,心想着不能每台机器都重启吧。百思不得其解后查看官网相关(机翻)意思就是22.10之后的版本使用方......
  • 乌班图Ubuntu 24.04初始化MySQL报错error while loading shared libraries: libaio.so
    由于乌班图24.04LTS已经发布了,因此准备新业务逐步往这上面迁移,毕竟支持有效期比22.04更长准备在24.04上进行MySQL的初始化,因为习惯自定义安装存储目录,所以使用mysql-8.0.37-linux-glibc2.28-x86_64.tar.xz这个最新的二进制版本。按照22.04版本整理的安装笔记进行操作,第一步安装......
  • FL Studio Portable 21.2.3.4004破解版
    ​FLStudioPortable21.2.3.4004破解版是很多音乐人都在用的全功能的音乐工作站,里面拥有非常先进的制作工具,音符编辑器,音效编辑器,便捷的音源输入,让用户可以自由的在这里自由的改编歌曲,打造出你想要的曲风效果,操作方便,推荐给玩音乐的朋友!FLStudio21.2.3.4004破解版是一款D......
  • 【Redis —— 04 Redis配置文件】
    Redis配置文件(通常为redis.conf)包含多种配置选项,可以调整Redis服务器的行为和性能。以下是Redis配置文件中的常见配置项及其详解:官网:Redisconfiguration|Docs常用配置项1.基本配置bind绑定的IP地址。如果你想让Redis监听多个IP地址,可以用空格分隔多个IP。示例:b......