SQL风格语法-注册DataFrame成为表
DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql()来执行SQL语句查询,结果返回一个DataFrame。
如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:
df.createTempView( ""score") #注册一个临时视图(表)
df.create0rReplaceTempView("score") #注册一个临时表,如果存在进行替换.
df.createGlobalTempView( "score") #注册一个全局表
全局表:跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:global_temp.
临时表:只在当前SparkSession中可用
完成了WordCount案例和电影评分数据筛选案例:
电影评分数据:
一共由四列组成,分别是用户ID,电影ID,电影评分,时间
电影评分数据筛选:
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
schema = StructType().\
add("user_id", StringType()).\
add("movie_id", StringType()).\
add("rank", IntegerType()).\
add("ts", StringType())
df = spark.read.format("csv"). \
option("header", "False"). \
option("encoding", "utf-8"). \
option("sep","\t"). \
schema(schema=schema). \
load("../data/sql/u.data")
df.createTempView("movies")
# TODO 1:用户平均分
print("TODO 1:用户平均分")
spark.sql("SELECT user_id, ROUND(AVG(rank), 2) AS avg_rank FROM movies GROUP BY user_id ORDER BY avg_rank DESC ").show()
df.groupby("user_id").\
agg(
F.round(F.avg("rank"), 2).alias("avg_rank")
).\
orderBy("avg_rank", ascending=False).\
show()
# TODO 2:电影平均分查询
print("TODO 2:电影平均分查询")
spark.sql("SELECT movie_id, ROUND(AVG(rank), 2) AS avg_rank FROM movies GROUP BY movie_id ORDER BY avg_rank DESC").show()
df.groupby("movie_id").\
avg("rank").\
withColumn("avg(rank)",F.round("avg(rank)",2)).\
orderBy("avg(rank)",ascending=False).\
withColumnRenamed("avg(rank)", "avg_rank").\
show()
# TODO 3:查询电影平均分的数量
print("TODO 3:查询大于电影平均分的数量:", df.where(df['rank'] > df.select(F.avg("rank")).first()['avg(rank)']).\
count())
spark.sql("SELECT count(*) as cnt FROM movies WHERE CAST(rank AS DOUBLE) > ROUND((SELECT AVG(rank) FROM movies), 2)").show()
# TODO 4:查询高分电影(>3)中打分次数最多的用户,此人打分的平均分
user_id = df.where(df['rank'] > 3).\
groupby("user_id").\
count().\
withColumnRenamed("count","cnt").\
orderBy("cnt",ascending=False).\
first()['user_id']
print("TODO 4:查询高分电影(>3)中打分次数最多的用户:",user_id,", 此人打分的平均分:",df.where(df['user_id'] == user_id).select(F.round(F.avg("rank"), 2)).first()['round(avg(rank), 2)'])
# TODO 5:查询每个用户的平均打分,最高打分,最低打分
print("TODO 5:查询每个用户的平均打分,最高打分,最低打分")
spark.sql("SELECT user_id, round(avg(rank), 2) as avg_rank, max(rank) as max_rank, min(rank) as min_rank FROM movies GROUP BY user_id ORDER BY avg(rank) DESC").show()
df.groupby("user_id").\
agg(
F.round(F.avg("rank"), 2).alias("avg_rank"),
F.max("rank").alias("max_rank"),
F.min("rank").alias("min_rank")
).\
orderBy("avg_rank", ascending=False).\
show()
# TODO 6:查询评分超过100次的电影的平均分排名TOP10
print("TODO 6:查询评分超过100次的电影的平均分排名TOP10")
df.groupby("movie_id").agg(
F.count("movie_id").alias("cnt_movie"),
F.round(F.avg("rank"), 2).alias("avg_rank")
).\
where("cnt_movie > 100").\
orderBy("avg_rank",ascending=False).\
show()
"""
1. agg:它是GroupedData对象的API,作用是在里面可以写多个聚合
2. alias:它是Column对象的API,可以针对一个列进行改名
3. withColumnRenamed:它是DataFrame的API,可以对DF中的列进行改名,一次改一个列,改多个列可以链式调用
4. orderBy: DataFrame的API,进行排序,参数1是被排序的列,参数2是升序(True)或降序False
5. first: DataFrame的API,取出DF的第一行数据,返回值结果是RoW对象.
#Row对象就是一个数组,你可以通过row['列名'〕来取出当前行中,某一列的具体数值,返回值不再是DF或者GroupedData或者Co1umn而是具体的值(字符串,数字等)
"""
WordCount:
# coding:utf8标签:笔记,df,28,rank,进度,sql,avg,id,user From: https://www.cnblogs.com/yuncannotjava/p/17993073
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile("../data/words.txt").\
flatMap(lambda line: line.split(" ")).\
map(lambda word: [word])
print(rdd.collect())
df = rdd.toDF(["word"])
df.createTempView("words")
spark.sql("SELECT word,count(*) AS cnt FROM words group by word order by cnt DESC").show()
schema = StructType().add("words", StringType())
df2 = spark.read.format("text").\
schema(schema=schema).\
load("../data/words.txt")
df3 = df2.withColumn("words" , F.explode(F.split(df2['words'] , " ")))
df3.groupBy("words").\
count().\
withColumnRenamed("words", "word").\
withColumnRenamed("count", "cnt").\
orderBy("cnt", ascending=False).\
show()