首页 > 编程语言 >Spark实现PageRank算法

Spark实现PageRank算法

时间:2024-10-22 21:21:08浏览次数:15  
标签:pr val outLink avgDiffDF 算法 PageRank spark Spark page

详细步骤:

1、创建Spark sql 环境

2、读取数据

3、数据切分 (分为page列,outLink列)形成表 pageDF

4、新增pr一列  (给定初始值)    形成表 initPrDF

5、新增avgPr一列(根据出链关系,求每个页面所分到的Pr)

6、分组聚合 (将outLink列explode炸开,在按照page分组,然后sum求和,这就是表 newPrDF

7、两表关联(newPrDF.join initPrDF),求pr列的差值平均数,形成列 avgDiffDF

8、if判断,取出avgDiffDF的值,与0.001比较,小于0.001循环结束

9、循环赋值,initPrDF=newPrDF,这样每次循环都能与上一次的进行比较

代码实现:

import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import java.math.BigDecimal
object PageRank {
  def main(args: Array[String]): Unit = {
    /**网页关系表示
     * A,B|D
     * B,C
     * C,A|B
     * D,B|C
     * */
    //网页排名,先了解每个网页的入链和出链,然后用数据表示各个网页之间的关系
    //再循环迭代,编程计算PageRank公式,得到最终的收敛值,就是排名
    // TODO: PR公式:(1-q)/N + q*∑(Pr/L)
    // TODO: q为阻尼系数(q=0.85),N为网页数量, L为出链数量,Pr为初始值(给定为1,后面每次循环都用上一次的Pr为初始值)

    //创建Spark sql 环境
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("pageRank")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    //读取数据
    val pageRankDF: DataFrame = spark.read
      .format("csv")
      .option("sep", ",")
      .schema("page STRING,outLink STRING")
      .load("spark/src/main/data/pageRank")

    import spark.implicits._
    import org.apache.spark.sql.functions._
    //进行数据切分
    val pageDF: DataFrame = pageRankDF
      .select($"page", split($"outLink", "\\|") as "outLink")

    //缓存
    pageDF.cache()

    //给定初始值
    var initPrDF: DataFrame = pageDF
      .withColumn("pr", expr("1.0"))
    var flag=true
    val q=0.85
    //页面数量
    val N: Long = pageDF.count()

  while(flag){
    val newPrDF: DataFrame = initPrDF
      //计算每个页面平均分到的pr值
      .withColumn("avgPr", $"pr" / size($"outLink"))
      //一行变多行,将外链的数据都炸开,保留outLink和avgPr,因为后面要分组聚合
      .select(explode($"outLink") as "page", $"avgPr")
      .groupBy($"page")
      //增加阻尼系数
      .agg(sum($"avgPr")* q + ( 1 - q ) / N as "pr")
      //关联原列表,为了求差值
      .join(pageDF, "page")

    //求该列表与上次一Pr的差值的平均值,最后如果小于0.001就收敛结束
    val avgDiffDF: DataFrame = newPrDF
      .as("a")
      .join(initPrDF.as("b"), "page")
      //abs是求绝对值
      .withColumn("diffPr", abs($"a.pr" - $"b.pr"))
      .agg(avg($"diffPr") as "avgDiff")

    //查看aggDiffDF的表结构,便于从列中取值
    avgDiffDF.printSchema()
    //取出差值平均值
    // TODO: 这个有个类型转换的问题,在没有修正的情况下,也就是不加阻尼系数时,
    //  会出现java.math.BigDecimal not cast to scala.math.BigDecimal,这里导一下java.BigDecimal的包就行
//    val row: Row = avgDiffDF.head()
//    val avgDiff: Double = row.getAs[BigDecimal]("avgDiff").doubleValue()

    // TODO: 加上阻尼系数后,就是正常的Double类型
    val row: Row = avgDiffDF.head()
    val avgDiff: Double = row.getAs[Double]("avgDiff").doubleValue()
    //收敛条件
    if(avgDiff<0.001){
      flag=false
    }
    //每次展示一下新的表
    newPrDF.show()
    //每次得到的新表作为初始表
    initPrDF=newPrDF

  }

  }

}

标签:pr,val,outLink,avgDiffDF,算法,PageRank,spark,Spark,page
From: https://blog.csdn.net/ABU009/article/details/143168354

相关文章

  • 【从零开始的LeetCode-算法】884. 两句话中的不常见单词
    句子 是一串由空格分隔的单词。每个 单词 仅由小写字母组成。如果某个单词在其中一个句子中恰好出现一次,在另一个句子中却 没有出现 ,那么这个单词就是 不常见的 。给你两个 句子 s1 和 s2 ,返回所有 不常用单词 的列表。返回列表中单词可以按 任意顺序 组织。......