详细步骤:
1、创建Spark sql 环境
2、读取数据
3、数据切分 (分为page列,outLink列)形成表 pageDF
4、新增pr一列 (给定初始值) 形成表 initPrDF
5、新增avgPr一列(根据出链关系,求每个页面所分到的Pr)
6、分组聚合 (将outLink列explode炸开,在按照page分组,然后sum求和,这就是表 newPrDF
7、两表关联(newPrDF.join initPrDF),求pr列的差值平均数,形成列 avgDiffDF
8、if判断,取出avgDiffDF的值,与0.001比较,小于0.001循环结束
9、循环赋值,initPrDF=newPrDF,这样每次循环都能与上一次的进行比较
代码实现:
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import java.math.BigDecimal
object PageRank {
def main(args: Array[String]): Unit = {
/**网页关系表示
* A,B|D
* B,C
* C,A|B
* D,B|C
* */
//网页排名,先了解每个网页的入链和出链,然后用数据表示各个网页之间的关系
//再循环迭代,编程计算PageRank公式,得到最终的收敛值,就是排名
// TODO: PR公式:(1-q)/N + q*∑(Pr/L)
// TODO: q为阻尼系数(q=0.85),N为网页数量, L为出链数量,Pr为初始值(给定为1,后面每次循环都用上一次的Pr为初始值)
//创建Spark sql 环境
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("pageRank")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
//读取数据
val pageRankDF: DataFrame = spark.read
.format("csv")
.option("sep", ",")
.schema("page STRING,outLink STRING")
.load("spark/src/main/data/pageRank")
import spark.implicits._
import org.apache.spark.sql.functions._
//进行数据切分
val pageDF: DataFrame = pageRankDF
.select($"page", split($"outLink", "\\|") as "outLink")
//缓存
pageDF.cache()
//给定初始值
var initPrDF: DataFrame = pageDF
.withColumn("pr", expr("1.0"))
var flag=true
val q=0.85
//页面数量
val N: Long = pageDF.count()
while(flag){
val newPrDF: DataFrame = initPrDF
//计算每个页面平均分到的pr值
.withColumn("avgPr", $"pr" / size($"outLink"))
//一行变多行,将外链的数据都炸开,保留outLink和avgPr,因为后面要分组聚合
.select(explode($"outLink") as "page", $"avgPr")
.groupBy($"page")
//增加阻尼系数
.agg(sum($"avgPr")* q + ( 1 - q ) / N as "pr")
//关联原列表,为了求差值
.join(pageDF, "page")
//求该列表与上次一Pr的差值的平均值,最后如果小于0.001就收敛结束
val avgDiffDF: DataFrame = newPrDF
.as("a")
.join(initPrDF.as("b"), "page")
//abs是求绝对值
.withColumn("diffPr", abs($"a.pr" - $"b.pr"))
.agg(avg($"diffPr") as "avgDiff")
//查看aggDiffDF的表结构,便于从列中取值
avgDiffDF.printSchema()
//取出差值平均值
// TODO: 这个有个类型转换的问题,在没有修正的情况下,也就是不加阻尼系数时,
// 会出现java.math.BigDecimal not cast to scala.math.BigDecimal,这里导一下java.BigDecimal的包就行
// val row: Row = avgDiffDF.head()
// val avgDiff: Double = row.getAs[BigDecimal]("avgDiff").doubleValue()
// TODO: 加上阻尼系数后,就是正常的Double类型
val row: Row = avgDiffDF.head()
val avgDiff: Double = row.getAs[Double]("avgDiff").doubleValue()
//收敛条件
if(avgDiff<0.001){
flag=false
}
//每次展示一下新的表
newPrDF.show()
//每次得到的新表作为初始表
initPrDF=newPrDF
}
}
}
标签:pr,val,outLink,avgDiffDF,算法,PageRank,spark,Spark,page
From: https://blog.csdn.net/ABU009/article/details/143168354