首页 > 编程语言 >Spark实现PageRank算法

Spark实现PageRank算法

时间:2024-10-22 21:21:08浏览次数:3  
标签:pr val outLink avgDiffDF 算法 PageRank spark Spark page

详细步骤:

1、创建Spark sql 环境

2、读取数据

3、数据切分 (分为page列,outLink列)形成表 pageDF

4、新增pr一列  (给定初始值)    形成表 initPrDF

5、新增avgPr一列(根据出链关系,求每个页面所分到的Pr)

6、分组聚合 (将outLink列explode炸开,在按照page分组,然后sum求和,这就是表 newPrDF

7、两表关联(newPrDF.join initPrDF),求pr列的差值平均数,形成列 avgDiffDF

8、if判断,取出avgDiffDF的值,与0.001比较,小于0.001循环结束

9、循环赋值,initPrDF=newPrDF,这样每次循环都能与上一次的进行比较

代码实现:

import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import java.math.BigDecimal
object PageRank {
  def main(args: Array[String]): Unit = {
    /**网页关系表示
     * A,B|D
     * B,C
     * C,A|B
     * D,B|C
     * */
    //网页排名,先了解每个网页的入链和出链,然后用数据表示各个网页之间的关系
    //再循环迭代,编程计算PageRank公式,得到最终的收敛值,就是排名
    // TODO: PR公式:(1-q)/N + q*∑(Pr/L)
    // TODO: q为阻尼系数(q=0.85),N为网页数量, L为出链数量,Pr为初始值(给定为1,后面每次循环都用上一次的Pr为初始值)

    //创建Spark sql 环境
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("pageRank")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    //读取数据
    val pageRankDF: DataFrame = spark.read
      .format("csv")
      .option("sep", ",")
      .schema("page STRING,outLink STRING")
      .load("spark/src/main/data/pageRank")

    import spark.implicits._
    import org.apache.spark.sql.functions._
    //进行数据切分
    val pageDF: DataFrame = pageRankDF
      .select($"page", split($"outLink", "\\|") as "outLink")

    //缓存
    pageDF.cache()

    //给定初始值
    var initPrDF: DataFrame = pageDF
      .withColumn("pr", expr("1.0"))
    var flag=true
    val q=0.85
    //页面数量
    val N: Long = pageDF.count()

  while(flag){
    val newPrDF: DataFrame = initPrDF
      //计算每个页面平均分到的pr值
      .withColumn("avgPr", $"pr" / size($"outLink"))
      //一行变多行,将外链的数据都炸开,保留outLink和avgPr,因为后面要分组聚合
      .select(explode($"outLink") as "page", $"avgPr")
      .groupBy($"page")
      //增加阻尼系数
      .agg(sum($"avgPr")* q + ( 1 - q ) / N as "pr")
      //关联原列表,为了求差值
      .join(pageDF, "page")

    //求该列表与上次一Pr的差值的平均值,最后如果小于0.001就收敛结束
    val avgDiffDF: DataFrame = newPrDF
      .as("a")
      .join(initPrDF.as("b"), "page")
      //abs是求绝对值
      .withColumn("diffPr", abs($"a.pr" - $"b.pr"))
      .agg(avg($"diffPr") as "avgDiff")

    //查看aggDiffDF的表结构,便于从列中取值
    avgDiffDF.printSchema()
    //取出差值平均值
    // TODO: 这个有个类型转换的问题,在没有修正的情况下,也就是不加阻尼系数时,
    //  会出现java.math.BigDecimal not cast to scala.math.BigDecimal,这里导一下java.BigDecimal的包就行
//    val row: Row = avgDiffDF.head()
//    val avgDiff: Double = row.getAs[BigDecimal]("avgDiff").doubleValue()

    // TODO: 加上阻尼系数后,就是正常的Double类型
    val row: Row = avgDiffDF.head()
    val avgDiff: Double = row.getAs[Double]("avgDiff").doubleValue()
    //收敛条件
    if(avgDiff<0.001){
      flag=false
    }
    //每次展示一下新的表
    newPrDF.show()
    //每次得到的新表作为初始表
    initPrDF=newPrDF

  }

  }

}

标签:pr,val,outLink,avgDiffDF,算法,PageRank,spark,Spark,page
From: https://blog.csdn.net/ABU009/article/details/143168354

相关文章

  • 排序算法 —— 快速排序(理论+代码)
    目录1.快速排序的思想2.快速排序的实现hoare版挖坑法前后指针法快排代码汇总3.快速排序的优化三数取中小区间优化三路划分4.快速排序的非递归版本5.快速排序总结1.快速排序的思想快速排序是一种类似于二叉树结构的排序方法。其基本思想为从待排序序列中任取一个......
  • 计算法统计二叉树中度为1的节点个数
    最近学习有关于二叉树类的知识,学习时使用的是C语言。代码如下:include<stdio.h>include<stdlib.h>//添加这一行,包含malloc的声明typedefstructTreeNode{intval;structTreeNode*left;structTreeNode*right;}TreeNode;//创建树节点TreeNode*createNode(in......
  • 代码随想录算法训练营 | 图论理论基础,98. 所有可达路径
    图论理论基础1.图的种类:有向图,无向图,加权有向图,加权无向图;2.度:无向图中有几条边连接该节点,该节点就有几度,在有向图中,每个节点有出度和入度;出度:从该节点出发的边的个数;入度:指向该节点边的个数;3.连通图:在无向图中,任何两个节点都是可以到达的;强连通图:在有向图中,任何两个节点是可以......
  • 【Python-AI篇】数据结构和算法
    1.算法概念1.1什么是数据结构存储,组织数据的方式1.2什么是算法实现业务目的的各种方法和思路算法是独立的存在,只是思想,不依附于代码和程序,可以使用不同语言实现(java,python,c)1.2.1算法的五大特性输入:算法具有0个或多个输入输出:算法至少有1个或多个输出有穷性:算法......
  • KNN算法
    这是内容这是内容###这是内容这是内容###这是内容这是内容###这是内容这是内容###这是内容这是内容###这是内容这是内容###这是内容这是内容###这是内容这是内容###这是内容这是内容###这是内容这是内容###这是内容这是内容###这是内容这是内容###这是内......
  • 【从零开始的LeetCode-算法】884. 两句话中的不常见单词
    句子 是一串由空格分隔的单词。每个 单词 仅由小写字母组成。如果某个单词在其中一个句子中恰好出现一次,在另一个句子中却 没有出现 ,那么这个单词就是 不常见的 。给你两个 句子 s1 和 s2 ,返回所有 不常用单词 的列表。返回列表中单词可以按 任意顺序 组织。......
  • 【从零开始的LeetCode-算法】3184. 构成整天的下标对数目 I
    给你一个整数数组 hours,表示以 小时 为单位的时间,返回一个整数,表示满足 i<j 且 hours[i]+hours[j] 构成 整天 的下标对 i, j 的数目。整天 定义为时间持续时间是24小时的 整数倍 。例如,1天是24小时,2天是48小时,3天是72小时,以此类推。示例1:......
  • c++ STL标准模板库-算法
    C++StandardTemplateLibrary(STL)算法是一组泛型算法,它们可以在各种容器上操作。这些算法被设计为与容器无关,因此可以在任何提供必要迭代器接口的容器上使用。STL算法分为以下几个主要类别:非修改算法Non-modifyingsequenceoperations:不改变容器内容,主要用于搜索和排序。......
  • Boruvka求最小生成树(菠萝算法)
    更新日志前言据说Boruvka算法是最古早的最短路算法,多半是真的。为什么叫菠萝算法?不知道。多半是音译吧。思路这个算法需要执行多轮,直到生成最小生成树。在每一轮中,对于每一个点(或者说,连通块),都找出以它为一个端点(或者说,一个端点在这个连通块中)的最短的边,并且将这条边加入......
  • 基于模仿学习的自动泊车运动规划算法 ResNet+BERT分类模型
    本文使用ResNet+BERT分类模型来实现APA自动泊车算法首先定义模型的输出动作类别类别名说明S0停车S+直行前进单位距离S-直行后退单位距离L+左转前进单位角度L-左转后退单位角度R+右转前进单位角度R-右转后退单位角度设单位距离为0.05米,单位......