说明
这一章主要包括对于sparksql概念的介绍,sparksql的特点,sparksql用到的数据类型,DataFrame的基础方法等。
概念
SparkSQL是Spark用于处理结构化数据的模块。
诞生
由于MapReduce这种计算模型的执行效率较慢,rdd原生代码较为复杂,所以引入了SparkSQL应运而生。它可以将sql转换为RDD,然后提交到集群中去运行,执行效率非常快。
特点
融合性:使用sql配合spark使用,封装了不同语言的dsl方法。
统一数据访问:使用read方法可以读取hdfs数据,mysql数据,不同类型的文件数据
使用write方法可以写入hdfs,mysql不同类型的文件
兼容hive:可以使用hivesql方法 可以使用sparksql直接生成hive数据表
标准化数据连接:支持jdbc和odbc连接方式连接SparkSQL
SparkSQL和hive的关系
-
运行模式是 spark on hive
-
是spark团建独立开发的工具,2014年发布1.0版本
-
sparkSQL工具对spark的兼容性更好,优化性能得到提升
-
sparkSQL本质也是将sql语句转化为rdd执行,catalyst引擎负责将sql转化为rdd
-
sparkSQL可以连接使用hive的metastore服务,管理表的元数据
数据类型
RDD 基础数据类型
弹性分布式数据集合
#以下是示例代码
person_rdd = [1,zhangsan,20]
person_rdd2 = [2,李四,22]
data = person_rdd.collect() # 将rdd数据获取后转为list
data[0]
data[1]
dataframe 对于RDD再次进行封装
结构化行列,俩部分组成,row类(即行数据),schema(即表结构)
#以下是示例代码
[
row1 = [1,zhangsan,20],
row2 = [2,lisi,22]
]
schema
{
id:int,
name:string
age:int
}
dataset 对于dataframe再次进行封装
结构化数据 俩倍分组成,row类(即行数据),schema(即表结构)。从dataset中取出一行数据可以当做dataframe类型操作。
DataFrame的详细学习
创建dataframe数据,需要使用一个sparksession的类创建
SparkSession类是在SparkContext的基础上进行了封装,也就是SparkSession类中包含了SparkContext
示例代码
基本创建
#导入行
from pyspark.sql import Row,SparkSession
#导入所有数据类型
from pyspark.sql.types import *
#定义每行数据
r1 = Row(id=1,name='张三',gender='female')
r2 = Row(id=2,name='李四',gender='male')
#定义字段信息
schema = StructType().add('id',IntegerType(),False).add('name',StringType(),False).add('male',StringType(),False)
'''
创建dataframe数据
# 创建SparkSession对象 是sparksql的入口类对象,操作sparkSQL必须创建此对象
# SparkSession.builder -> 类名.类属性名 -> 生成Builder类的对象
'''
ss = SparkSession.builder.getOrCreate()
df = ss.createDataFrame([r1,r2],schema=schema)
df.show()
df.printSchema()
rdd和dataframe的相互转化
#rdd转df
ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext
rdd = sc.parallelize([[1,'张三',20],[1,'李四',30]])
schema = StructType().add('id',IntegerType()).add('name',StringType()).add('age',IntegerType())
df = rdd.toDF(schema=schema)
df.show()
#df转rdd
rdd1 =df.rdd
printI(rdd1.collect())
rdd2 = rdd1.map(lambda:x:x.id)
print(rdd2)
DSL方法 -- DataFrame方法
spark提供DSL方法和sql的关键词一样,使用此方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据
from join 知道数据在哪 df本身就是要处理的数据 df.join(df2)
where 过滤需要处理的数据 df.join(df2).where()
group by 聚合 数据的计算 df.join(df2).where().groupby().sum()
having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
示例代码
# 使用DF的DSL方法实现对df数据处理
# DF的数据是结构化数据,所以DSL方法和SQL的关键字基本一致
from pyspark.sql import SparkSession
# 使用sparkcontext读取hdfs上的文件数据
sc = ss.SparkContext
将读取的数据转换为RDD
rdd =sc.textFile('file:///root/data/test.csv')
res = rdd.collect()
print(res)
#将字符串数据切换为二维rdd
rdd_split =rdd.map(lambda x:x.split(','))
res2=rdd_split.collect()
print(res)
# 将rdd转为df数据
df =rdd.toDF(schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')
df.show()
#df的查询操作
df_select1 =df.select('name',df['age'],df.gender)
df_select1.show()
#命名别称
df_select2 = df.select(df.name.alias('username'))
#类型转换
df_select3 = df.select(df.name.cast('int'),df.age.astype('int'))
#df的过滤操作
df_where1=df_select3.where('age>=22 and gender="女"')
#df的分组操作
df_groupby1=df_select3.groupby('major', 'gender')
# 数据的分组过滤操作
df_groupby3 = df_select6.groupby('major', 'gender').avg('age').where('avg(age) > 21.5')
# 数据的排序操作
df_orderby = df_select6.orderBy('birthday') # 返回新的df
df_orderby.show()
# 对出生日期按照从大到小排序
# sql:select * from df order by birthday desc
df_orderby2 = df_select6.orderBy('birthday',ascending=False) # 返回新的df
df_orderby2.show()
# 取出指定数量的数据
# 取出前10条数据
# sql:select * from df limit 10
df_limit = df_select6.limit(10) # 返回新的df
df_limit.show()
# 在df中对某列计算后生成新的一列
df_new =df.withColumn('new_age',df.age+10)
SQL语句
使用sparksession提供的sql方法,编写sql语句执行
需要将要操作的dataframe数据映射成一张数据表, 编写sql语句执行
df.createTempView('stu')
sql1='select * from stu'
df_sql=ss.sql(sql1).show
标签:04,df,where,rdd,SparkSQL,sql,Spark,数据,select
From: https://blog.csdn.net/a666b777/article/details/140086993