首页 > 其他分享 >1/28 学习进度笔记

1/28 学习进度笔记

时间:2024-01-28 17:44:23浏览次数:30  
标签:笔记 df 28 rank 进度 sql avg id user

SQL风格语法-注册DataFrame成为表

DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql()来执行SQL语句查询,结果返回一个DataFrame。

如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:

df.createTempView( ""score")     #注册一个临时视图(表)

df.create0rReplaceTempView("score")     #注册一个临时表,如果存在进行替换.

df.createGlobalTempView( "score")     #注册一个全局表

全局表:跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:global_temp.

临时表:只在当前SparkSession中可用

完成了WordCount案例和电影评分数据筛选案例:

电影评分数据:

一共由四列组成,分别是用户ID,电影ID,电影评分,时间

 

电影评分数据筛选:

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
schema = StructType().\
add("user_id", StringType()).\
add("movie_id", StringType()).\
add("rank", IntegerType()).\
add("ts", StringType())
df = spark.read.format("csv"). \
option("header", "False"). \
option("encoding", "utf-8"). \
option("sep","\t"). \
schema(schema=schema). \
load("../data/sql/u.data")
df.createTempView("movies")
# TODO 1:用户平均分
print("TODO 1:用户平均分")
spark.sql("SELECT user_id, ROUND(AVG(rank), 2) AS avg_rank FROM movies GROUP BY user_id ORDER BY avg_rank DESC ").show()
df.groupby("user_id").\
agg(
F.round(F.avg("rank"), 2).alias("avg_rank")
).\
orderBy("avg_rank", ascending=False).\
show()
# TODO 2:电影平均分查询
print("TODO 2:电影平均分查询")
spark.sql("SELECT movie_id, ROUND(AVG(rank), 2) AS avg_rank FROM movies GROUP BY movie_id ORDER BY avg_rank DESC").show()
df.groupby("movie_id").\
avg("rank").\
withColumn("avg(rank)",F.round("avg(rank)",2)).\
orderBy("avg(rank)",ascending=False).\
withColumnRenamed("avg(rank)", "avg_rank").\
show()
# TODO 3:查询电影平均分的数量
print("TODO 3:查询大于电影平均分的数量:", df.where(df['rank'] > df.select(F.avg("rank")).first()['avg(rank)']).\
count())
spark.sql("SELECT count(*) as cnt FROM movies WHERE CAST(rank AS DOUBLE) > ROUND((SELECT AVG(rank) FROM movies), 2)").show()

# TODO 4:查询高分电影(>3)中打分次数最多的用户,此人打分的平均分
user_id = df.where(df['rank'] > 3).\
groupby("user_id").\
count().\
withColumnRenamed("count","cnt").\
orderBy("cnt",ascending=False).\
first()['user_id']

print("TODO 4:查询高分电影(>3)中打分次数最多的用户:",user_id,", 此人打分的平均分:",df.where(df['user_id'] == user_id).select(F.round(F.avg("rank"), 2)).first()['round(avg(rank), 2)'])

# TODO 5:查询每个用户的平均打分,最高打分,最低打分
print("TODO 5:查询每个用户的平均打分,最高打分,最低打分")
spark.sql("SELECT user_id, round(avg(rank), 2) as avg_rank, max(rank) as max_rank, min(rank) as min_rank FROM movies GROUP BY user_id ORDER BY avg(rank) DESC").show()
df.groupby("user_id").\
agg(
F.round(F.avg("rank"), 2).alias("avg_rank"),
F.max("rank").alias("max_rank"),
F.min("rank").alias("min_rank")
).\
orderBy("avg_rank", ascending=False).\
show()

# TODO 6:查询评分超过100次的电影的平均分排名TOP10
print("TODO 6:查询评分超过100次的电影的平均分排名TOP10")
df.groupby("movie_id").agg(
F.count("movie_id").alias("cnt_movie"),
F.round(F.avg("rank"), 2).alias("avg_rank")
).\
where("cnt_movie > 100").\
orderBy("avg_rank",ascending=False).\
show()




"""
1. agg:它是GroupedData对象的API,作用是在里面可以写多个聚合
2. alias:它是Column对象的API,可以针对一个列进行改名
3. withColumnRenamed:它是DataFrame的API,可以对DF中的列进行改名,一次改一个列,改多个列可以链式调用
4. orderBy: DataFrame的API,进行排序,参数1是被排序的列,参数2是升序(True)或降序False
5. first: DataFrame的API,取出DF的第一行数据,返回值结果是RoW对象.
#Row对象就是一个数组,你可以通过row['列名'〕来取出当前行中,某一列的具体数值,返回值不再是DF或者GroupedData或者Co1umn而是具体的值(字符串,数字等)
"""

WordCount:

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()

sc = spark.sparkContext

rdd = sc.textFile("../data/words.txt").\
flatMap(lambda line: line.split(" ")).\
map(lambda word: [word])

print(rdd.collect())

df = rdd.toDF(["word"])
df.createTempView("words")

spark.sql("SELECT word,count(*) AS cnt FROM words group by word order by cnt DESC").show()

schema = StructType().add("words", StringType())
df2 = spark.read.format("text").\
schema(schema=schema).\
load("../data/words.txt")
df3 = df2.withColumn("words" , F.explode(F.split(df2['words'] , " ")))

df3.groupBy("words").\
count().\
withColumnRenamed("words", "word").\
withColumnRenamed("count", "cnt").\
orderBy("cnt", ascending=False).\
show()

标签:笔记,df,28,rank,进度,sql,avg,id,user
From: https://www.cnblogs.com/yuncannotjava/p/17993073

相关文章

  • 1.28
    今天学习一下小程序对应的一些基础知识,其中小程序开发和网页开发还是存在一些区别的,比如运行环境、API、开发模式不同等不同。 接下来我们注册微信小程序开发账号 注册完后登录开发主页面,获取到创建项目时所需要的微信小程序ID 然后我们安装开发小程序工具--微信开发者工......
  • BUIR论文阅读笔记
    这个领域不熟悉,是看的第一篇论文,记录细一点Abstract单类协作过滤(OCCF)的目标是识别出与之呈正相关但尚未交互的用户-项目对,其中只有一小部分积极的用户-项目交互被观察到,对于积极和消极交互的区分建模,以往的工作在一定程度上依赖于负抽样,即将未观察到的用户项目对视为负对,因为实......
  • SelfCF论文阅读笔记
    Abstract讲述现存的挑战,现有的方法通常采用负抽样来区分不同的项目,也就是观察到的用户-项目对被视为正实例,未观察到的对被称为负实例,并且在一个定义的分布下进行采样以进行训练。在大数据集上进行负采样的计算成本高,所以负项目应该在定义的分布下仔细的进行抽样,避免在数据集中观......
  • 1.22-1.28 部分补题
    [蓝桥杯2016省A]密码脱落题意:给定一个回文串,但是有一些字母消失不见了。问:至少补全多少个字母,使得字符串变回回文串最开始想一个一个枚举,但是无论怎么写都是错的。后来被提醒回文串的特性,反转之后还是一样的。所以要求最少的需要补全的字母,直接求一个正着和反着的字符串......
  • panghu week04 笔记
    长度最小的子数组一开始想的是框定一个区间,然后如果大于等于target,从区间头弹出一个元素,从尾部append进入一个元素,发现并不能覆盖所有的区间看了题解以后,可以定尾,然后移动头部进行比较funcminSubArrayLen(targetint,nums[]int)int{slide:=make([]int,0)slid......
  • 《Confusion Graph: Detecting Confusion Communities in Large Scale Image Classifi
    论文标题《ConfusionGraph:DetectingConfusionCommunitiesinLargeScaleImageClassification》混淆图:在大规模图像分类中检测混淆社区作者RuochunJin、YongDou、YueqingWang和XinNiu来自国防科技大学并行和分布式处理国家实验室,和上一篇是姊妹篇。初读摘要......
  • 24.1.28(读后感)
    今天看了构建之法的第一章,有一些心得体会。在这一章中,作者为我们介绍了一些关于软件工程的基本知识。①软件=程序+软件工程:正是因为对软件开发活动(构建管理、源代码管理、软件设计、软件测试、项目管理)相关的内容的完成,才能完成把整个程序转化成为一个可用的软件的过程。扩展的......
  • 洛谷题解-P2888 [USACO07NOV] Cow Hurdles S (Floyd)
    https://www.luogu.com.cn/problem/P2888题目描述FarmerJohnwantsthecowstoprepareforthecountyjumpingcompetition,soBessieandthegangarepracticingjumpingoverhurdles.Theyaregettingtired,though,sotheywanttobeabletouseaslittleene......
  • Docker学习笔记05:私有库
    DockerRegistry基本流程下载DockerRegistry镜像启动Registry容器推动镜像到自建Registry查看从自建Registry拉镜像。启动镜像dockerpullregistry#运行registry映射端口挂载映射容器卷开启特权模式dockerrun-d-p5000:5000-v/opt/registry:/tmp/registry--privilege......
  • 【学习笔记】部分树上算法(概念篇)
    本文包括:轻重链剖分(done)线段树合并(done)tobeupd:长链剖分DSUontree(树上启发式合并)点分治边分治LCT有待更新本文非例题代码大多未经过编译,谨慎使用本文本来只有重剖长剖dsu,但是发现不会写,另外几个甚至更简单就带歪了.jpgpart1轻重链剖分树剖是一类算法的总......