DataFrame
创建DataFrame
1.转换为DataFrame方式1
将RDD[元组或列表] 转换为DataFrame
-
定义RDD,每个元素是Row类型
-
将上面的RDD[Row]转换成DataFrame,df=spark.createDataFrame(row_rdd)
-
代码
# -*- coding:utf-8 -*- # Desc:This is Code Desc import os # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置 from pyspark import Row from pyspark.sql import SparkSession os.environ['SPARK_HOME'] = '/export/server/spark' PYSPARK_PYTHON = "/root/anaconda3/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON #方式1-通过RDD<Row>,转换成DataFrame if __name__ == '__main__': #1-创建上下文对象 #第二代的上下文对象SparkSession,第一代是SparkContext。 #SparkSession功能更强大,兼容SparkContext, spark=SparkSession\ .builder\ .appName('test')\ .master('local[*]')\ .getOrCreate() #可以从SparkSession获取SparkContext sc=spark.sparkContext #2-加载文本文件形成RDD,每个元素是string rdd1=sc.textFile('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt') rdd1.foreach(lambda x:print(x)) #3-将RDD的每个元素转换成Row对象 #比如将 ‘Justin, 19’ 转换成 Row(name='Justin',age=19) rdd2=rdd1.map(lambda x:x.split(',')) rdd2.foreach(lambda x: print(x)) rdd3=rdd2.map(lambda x:Row(name=x[0],age=int(x[1].strip()))) rdd3.foreach(lambda x: print(x)) #4-将RDD转换成DataFrame df=spark.createDataFrame(rdd3) #5-打印DataFrame的schema df.printSchema() #6-打印DataFrame的数据 df.show() #7-用sparkSQL筛选年龄介于13~19岁的人员 #将DataFrame注册成临时表 df.createOrReplaceTempView('people') #查询临时表 df2=spark.sql('select * from people where age>=13 and age<=19') #打印结果 df2.show()
2.转换为DataFrame方式2
RDD[元组或列表]+自定义Schema信息 -->DataFrame
-
核心步骤
- 1、RDD的每个元素转换为【元组或列表】
- 2、依据元组的值自定义schema
- 3、spark.createDataFrame(rdd,schema)
-
代码
直接复制上面的文件稍加修改,不用从头写
# -*- coding:utf-8 -*-
# Desc:This is Code Desc
import os
# 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
from pyspark import Row
from pyspark.sql import SparkSession
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
#2-RDD<元组或列表> -> 定义schema -> spark.createDataFrame(RDD,schema)
if __name__ == '__main__':
#1-创建上下文对象
spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
sc=spark.sparkContext
#2-加载文本文件形成RDD
#仍然需要通过SparkContext对象来创建RDD
#SparkSession对象无法直接加载数据得到RDD
rdd1=sc.textFile('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt')
#3-将RDD的每个元素转换成元组
rdd2=rdd1.map(lambda x: (x.split(',')[0],int(x.split(',')[1].strip()) ))
#4-为上面的元组,自定义schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
#5-将RDD转换成DataFrame
df=spark.createDataFrame(rdd2 , schema)
#6-打印DataFrame的schema
df.printSchema()
#7-打印DataFrame的数据
df.show()
3.转换为DataFrame方式3
RDD[容器]+toDF(指定列名)
-
步骤:
- RDD的每个元素转换为【元组或列表】。
- 再加上toDF(指定多个列名)
-
代码
直接复制上面的文件稍加修改,不用从头写
# -*- coding:utf-8 -*-
# Desc:This is Code Desc
import os
# 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
from pyspark import Row
from pyspark.sql import SparkSession
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
#3-RDD<元组或列表> -> RDD.toDF([字段名1,字段名2])
if __name__ == '__main__':
#1-创建上下文对象
spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
sc=spark.sparkContext
#2-加载文本文件形成RDD
#仍然需要通过SparkContext对象来创建RDD
#SparkSession对象无法直接加载数据得到RDD
rdd1=sc.textFile('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt')
#3-将RDD的每个元素转换成元组
rdd2=rdd1.map(lambda x: (x.split(',')[0],int(x.split(',')[1].strip()) ))
#4-将RDD转换成DataFrame
df=rdd2.toDF(['name','age'])
#5-打印DataFrame的schema
df.printSchema()
#6-打印DataFrame的数据
df.show()
4.转换为DataFrame方式4(了解)
【了解】Pandas构建DataFrame
-
语法:Spark的DataFrame=spark.createDataFrame(pandas的DataFrame)
-
注意:虽然都叫Dataframe,但是pandas的Dataframe是单机版的,Spark的Dataframe是多机器分布式的。
-
代码
# -*- coding:utf-8 -*- # Desc:This is Code Desc import os # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置 from pyspark import Row from pyspark.sql import SparkSession os.environ['SPARK_HOME'] = '/export/server/spark' PYSPARK_PYTHON = "/root/anaconda3/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON #3-RDD<元组或列表> -> RDD.toDF([字段名1,字段名2]) if __name__ == '__main__': #1-创建上下文对象 spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate() #2-创建一个pandas的DataFrame import pandas as pd pdf = pd.DataFrame({ "id": [1, 2, 3], "name": ["张大仙", '王晓晓', '王大锤'], "age": [11, 11, 11] }) print(pdf) #3-将其转换成spark的DataFrame df=spark.createDataFrame(pdf) #4-打印schema df.printSchema() #5-打印数据 df.show()
二.加载外部数据(文件)转化为DataFrame
- 用【spark.read】统一加载各种格式的数据文件
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
#1创建上下文对象
spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
#2-加载txt文件形成DataFrame
sc=spark.sparkContext
#都是加载同样的txt文件,但是sc返回的是RDD,SparkSession返回的是DataFrame
rdd=sc.textFile('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt')
df1=spark.read.format('text').load('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt')
#简化写法2
df2=spark.read.text('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt')
# 普通文本文件加载后,只有一个value字段,值就是每行的字符串
df1.printSchema()
df1.show()
#3加载json文件形成DataFrame
df3=spark.read.json('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.json')
df3.printSchema()
df3.show()
#4加载,csv文件形成DataFrame
#sep,指定分隔符,csv文件默认分隔符为,号
#header,是否把第一行识别成字段名
#inferSchema,是否自动推断字段类型,比如将20识别成整数而不是string的'20'
df4=spark.read\
.option('sep',';')\
.option('header','true')\
.option('inferSchema','true')\ # inferSchema=True 表示开启自动推断字段的类型
.csv('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.csv')
df4.printSchema()
df4.show()
#5加载parquet文件形成DataFrame
#parquet是列式存储文件,无法用文本打开
df5=spark.read.parquet('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/users.parquet')
df5.show()
df5.printSchema()
spark.stop()
DataFrame常用操作
DSL风格
-
简单来说DSL风格就是,调用dataframe的api名,他们与sql语句的关键字同名。
-
#比如 df.select() .where() .groupby() .orderby() .limit()
-
其他更复杂的api,可以借助pyspark.sql.functions包下的API
-
SQL 风格
-
2个步骤
-
- 将DataFrame注册成一个【视图(可以认为是表)】
df.createOrReplaceTempView('表名')
-
2.查询上面的视图(表)
df2=spark.sql('sql 语句')
-
案例一,花式查询
-
需求:
-
读取people.txt并分析下面
-
DSL分析 1.查看DataFrame中的内容,通过调用show方法 2.查看DataFrame的Scheme信息 3.第一种方式查看name字段数据 4.第二种方式查看name字段数据 5.第三种方式查看name和age字段数据 6.每个年龄加1 7.过滤出年龄大于21的数据 8.统计每个年龄有多少人 SQL分析 1 查看DataFrame中的内容 2 查看DataFrame的Scheme信息 3 查看name字段数据 3 根据age排序的前两个人员信息 4 查询年龄大于25的人的信息
-
-
代码
import os from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['SPARK_HOME'] = '/export/server/spark' PYSPARK_PYTHON = "/root/anaconda3/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON if __name__ == '__main__': #1创建上下文对象 spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate() sc=spark.sparkContext #2-加载people.txt形成DataFrame #最好先加载成RDDF,再转化成DataFrame rdd=sc.textFile('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt') df=rdd.map(lambda x:x.split(',')).map(lambda l:(l[0],int(l[1].strip()))).toDF(['name','age']) #DSL分析 # 1.查看DataFrame中的内容,通过调用show方法 df.show() # 2.查看DataFrame的Scheme信息 df.printSchema() # 3.第一种方式查看name字段数据 df2=df.select('name','age') df2.show() df2.printSchema() # 4.第二种方式查看name字段数据 df.select(['name','age']).show() # 5.第三种方式查看name和age字段数据 df.select(df['name']).show() # 6.每个年龄加1 #下面是错误写法 #df.select('age+1').show() #正确写法 df.select(df['age']+1).show() df.select((df['age']+1).alias('new_age')).show() # 7.过滤出年龄大于21的数据 df.where('age>21').show() df.filter('age>21').show() # 8.统计每个年龄有多少人 df.groupby('age').count().show() # SQL分析 #将DataFrame映射为一个表名 df.createOrReplaceTempView('people') # 1 查看DataFrame中的内容 df3=spark.sql('select * from people') df3.show() # 2 查看DataFrame的Scheme信息 df3.printSchema() # 3 查看name字段数据 spark.sql('select name from people').show() # 3 根据age排序的前两个人员信息 spark.sql('select * from people order by age desc limit 2').show() # 4 查询年龄大于25的人的信息 spark.sql('select * from people where age>25').show()
案例二,wordcount
-
需求,用SQL和DSL 2种编程方式来做wordcount,并对词频倒序排序
-
DSL风格做wordcount
- 对DataFrame调用API
- 调用的顺序【split】->【explode】->【groupby】+【count】+【orderBy】
-
SQL风格做wordcount
- SQL分析2个步骤
- 注册临时视图
- 在spark.sql()中使用上面的视图名
- 做wordcount的步骤: split->explode->group by+count+order by
- 最好把每一步的中间结果打印出来,做到心中有数。
- SQL分析2个步骤
-
代码
- 最好先写【SQL】风格,可以推断出【DSL】函数的调用顺序。
import os from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['SPARK_HOME'] = '/export/server/spark' PYSPARK_PYTHON = "/root/anaconda3/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON if __name__ == '__main__': #1创建上下文对象 spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate() sc=spark.sparkContext #加载文件形成DataFrame df=spark.read.text('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/words.txt') df.printSchema() df.show() #用SQL风格做wordcount #映射成临时表名 df.createOrReplaceTempView('words_t') spark.sql(''' select word, count(1) as cnt from (select explode(split(value , ' ')) as word from words_t ) t group by word order by cnt desc ''').show() #用DSL风格做wordcount from pyspark.sql import functions as F df2=df.select(F.split('value',' ').alias('arr')) df3=df2.select(F.explode('arr').alias('word')) #下面调用count()后,会自动拼接一列名叫count df4=df3.groupby('word').count() df5=df4.orderBy('count',ascending=False) df5.show() spark.stop()
【重要】案例三 电影评分数据
-
需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)。
-
注意:
-
最好先写【SQL】风格,可以推断出【DSL】函数的调用顺序。
-
对shuffle调优,因为SQL语句中的【group by】、【order by】,【distinct】关键字都会触发shuffle,那么就会触发shuffle的并行度参数【spark.sql.shuffle.partitions】起作用,他默认是【200】,可以适时降低。可以为申请核数的2~3倍。
-
查看官网参数 http://spark.apache.org/docs/latest/configuration.html
-
对shuffle的调优设置,有2种方式
-
既可以在【上下文对象】中配置,
spark=SparkSession.builder\ .appName('test')\ .master('local[*]')\ .config('spark.sql.shuffle.partitions',4)\ .getOrCreate()
-
也可以在【sql会话】中设置。
spark.sql('set spark.sql.shuffle.partitions=4')
-
-
-
代码
# -*- coding:utf-8 -*- # Desc:This is Code Desc import os # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置 import time from pyspark import Row, StorageLevel from pyspark.sql import SparkSession os.environ['SPARK_HOME'] = '/export/server/spark' PYSPARK_PYTHON = "/root/anaconda3/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON if __name__ == '__main__': #1-创建上下文对象 spark=SparkSession.builder\ .appName('test')\ .master('local[*]')\ .config('spark.sql.shuffle.partitions','4')\ .getOrCreate() #设置方式2 spark.sql('set spark.sql.shuffle.partitions=4') #2-加载电影文件形成DataFrame df=spark.read.format('csv')\ .option('sep','::') \ .schema('user_id string, movie_id string, rating float, timestamp long') \ .load('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/sql/ratings.dat') df.persist(StorageLevel.MEMORY_AND_DISK_2) df.printSchema() df.show() #获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000) #3-先用SQL风格做一遍 df.createOrReplaceTempView('movie_t') result_df = spark.sql(''' select movie_id, round(avg(rating) , 2) as avg_rate,--平均分 count(1) as cnt from movie_t group by movie_id having cnt>2000 order by avg_rate desc limit 10 ''') #优化:由于result_df被复用了多次,所以可以做持久化 result_df.persist(StorageLevel.MEMORY_AND_DISK_2) #对于DataFrame来说,他的show,write等操作,跟RDD的action操作是一样的都是触发执行。 result_df.show() #4-再用DSL风格做一遍 from pyspark.sql import functions as F df.groupby('movie_id').agg( F.round(F.avg('rating'), 2).alias('avg_rate'), F.count('user_id').alias('cnt') ).where('cnt>2000') \ .orderBy('avg_rate', ascending=False) \ .limit(10)\ .show() #5-将结果输出成csv文件 result_df.write.format('csv') \ .option('header', True) \ .save('file:///export/pyworkspace/pyspark_dev/spark_sql/data/out/movie') #6-将结果输出到MySQL. #TODO 1-需要事先创建数据库名,比如叫bigdata #TODO 2-需要将MySQL的驱动jar包复制到spark/jars目录中 #TODO 3-无需创建表,Spark会自动在MySQL中建表。 result_df.write.format('jdbc')\ .mode('overwrite')\ .option('url','jdbc:mysql://node1:3306/bigdata')\ .option('dbtable','tb_movie_top10')\ .option('user','root')\ .option('password','123456')\ .save() time.sleep(10*60)
-
由webui页面可知,【4040首页jobs标签页,点击对应的jobid,跳转到job详情页,显示DAG和拆分的各stage运行情况。】shuffle调优之后,shuffle的耗时从【9s降到了1s】。shuffle的Task数从【200降到了4】
-
不管是SQL还是DSL 2种风格,他们的【DAG】执行过程都是一样的。因为底层都是【catalyst】引擎。
SparkSQL的外部数据源
-
Spark的统一的读取数据的入口
- spark.read.format(【保存格式】).load(【文件所在的路径】)
- 简写: spark.read.格式的名称('文件的路径')
- 例如: spark.read.option('key','value').json('文件路径')
-
Spark的统一的数据的写出的出口
- DataFrame.write.format(【格式】).mode('保存模式').save(【保存的文件路径】)
- 简写: DataFrame.write.保存的格式(保存到哪个路径);
- 例如: df.write.mode('保存模式').option('key','value').json('保存的路径')
-
Spark的保存模式有4种方式append、【overwrite】、ignore、errorifexists、
-
保存的综合案例。
# -*- coding:utf-8 -*- # Desc:This is Code Desc import os # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置 import time from pyspark import Row, StorageLevel from pyspark.sql import SparkSession os.environ['SPARK_HOME'] = '/export/server/spark' PYSPARK_PYTHON = "/root/anaconda3/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON if __name__ == '__main__': #1-创建上下文对象 spark=SparkSession.builder\ .appName('test')\ .master('local[*]')\ .getOrCreate() #2-通过jdbc读取mysql的数据 df=spark.read.format('jdbc')\ .option('url','jdbc:mysql://node1:3306/bigdata') \ .option('dbtable','tb_movie_top10') \ .option('user','root') \ .option('password','123456') \ .load() df.printSchema() df.show() #3-将上面的DataFrame保存为text #注意保存为text纯文本时,要求DataFrame的列只能有1列,不能有多列 #所以折中方式是利用concat_ws函数将多列拼接为1列。 from pyspark.sql import functions as F df.select(F.concat_ws('_','movie_id','avg_rate','cnt'))\ .write\ .format('text')\ .mode('overwrite')\ .save('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/out/text') #4-将上面的DataFrame保存为csv df.write.format('csv').mode('overwrite')\ .option('header',True) \ .option('sep','\t') \ .save('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/out/csv') #简写方式 df.write.mode('overwrite')\ .option('header',True) \ .option('sep','\t') \ .csv('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/out/csv2') #5-将上面的DataFrame保存为json df.write.mode('overwrite').json('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/out/json') #6-将上面的DataFrame保存为parquet df.write.mode('overwrite').parquet('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/out/parquet') #spark的默认的读写格式就是parquet,所以此时parquet可以默认不写 df.write.mode('overwrite').save('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/out/parquet2') #7-将上面的DataFrame保存到mysql #格式1 df.write.mode('overwrite') \ .format('jdbc')\ .option('url', 'jdbc:mysql://node1:3306/test') \ .option('dbtable', 'tb_movie_top10_2') \ .option('user', 'root') \ .option('password', '123456')\ .save() #格式2 properties={'user':'root','password':'123456'} df.write.mode('overwrite')\ .jdbc('jdbc:mysql://node1:3306/bigdata','tb_movie_top10_2','overwrite',properties)