首页 > 其他分享 >在 Spark 上实现 Graph Embedding

在 Spark 上实现 Graph Embedding

时间:2024-12-18 11:57:23浏览次数:6  
标签:嵌入 Graph walk Embedding Spark 节点

        在 Spark 上实现 Graph Embedding 主要涉及利用大规模图数据来训练模型,以学习节点的低维表示(嵌入)。这些嵌入能够捕捉和反映图中的节点间关系,如社交网络的朋友关系或者物品之间的相似性。在 Spark 上进行这一任务,可以使用 Spark 的图计算库 GraphX 或者利用外部库如 GraphFrames。

        下面,我将介绍如何在 Spark 环境中实现基本的 Graph Embedding,我们将使用 GraphFrames,因为它提供了对 DataFrame 的支持,更为易用。

环境准备

  1. 安装 Spark:确保你的环境中已经安装了 Spark。
  2. 安装 GraphFrames:GraphFrames 是在 Spark DataFrames 上操作图的库。安装方法通常是将 GraphFrames 的依赖项添加到你的 Spark 作业中。

Graph Embedding 实现步骤

Step 1: 创建 Spark Session

        首先,你需要创建一个 Spark 会话,这是使用 Spark 的入口。

from pyspark.sql import SparkSession

# 创建 Spark 会话
spark = SparkSession.builder \
    .appName("Graph Embedding Example") \
    .getOrCreate()
Step 2: 构建图

使用 GraphFrames 构建图,你需要两个主要的 DataFrame:顶点 DataFrame 和边 DataFrame。

from graphframes import *

# 创建顶点 DataFrame
vertices = spark.createDataFrame([
    ("1", "Alice"),
    ("2", "Bob"),
    ("3", "Charlie"),
], ["id", "name"])

# 创建边 DataFrame
edges = spark.createDataFrame([
    ("1", "2", "friend"),
    ("2", "3", "follow"),
    ("3", "1", "follow"),
], ["src", "dst", "relationship"])

# 创建图
graph = GraphFrame(vertices, edges)
Step 3: 使用 GraphFrames 进行图计算

        我们将使用随机游走算法作为生成节点嵌入的基础。此处简化处理,考虑基于 PageRank 的方法来初始化我们的 Graph Embedding。

# 计算 PageRank
results = graph.pageRank(resetProbability=0.15, tol=0.01)
results.vertices.select("id", "pagerank").show()
Step 4: 进一步的嵌入处理

        实际的 Graph Embedding 通常需要更复杂的处理,如 DeepWalk, Node2Vec 等。这些算法涉及随机游走以及后续使用 Word2Vec 算法来生成嵌入。这些步骤在 Spark 上实现需要额外的处理,可能涉及到自定义 PySpark 代码或者使用额外的库。

        在现实世界的应用中,单靠 PageRank 并不足以捕获复杂的节点相互关系。更高级的方法如 Node2Vec,可以更有效地学习节点的低维表示。这里,我们将简化 Node2Vec 的实现思想,使用 PySpark 自定义实现随机游走和使用 Spark MLlib 的 Word2Vec 来生成嵌入。

随机游走算法

        随机游走是 Graph Embedding 中一个重要的步骤,用于生成节点序列。这里我们简单实现随机选择下一个节点的逻辑。

from pyspark.sql.functions import explode, col

def random_walk(graph, num_walks, walk_length):
    walks = []
    for _ in range(num_walks):
        # 随机选择初始节点
        vertices = graph.vertices.rdd.map(lambda vertex: vertex.id).collect()
        for vertex in vertices:
            walk = [vertex]
            for _ in range(walk_length - 1):
                current_vertex = walk[-1]
                # 获取与当前节点相连的节点
                neighbors = graph.edges.filter(col("src") == current_vertex).select("dst").rdd.flatMap(lambda x: x).collect()
                if neighbors:
                    # 随机选择下一个节点
                    next_vertex = random.choice(neighbors)
                    walk.append(next_vertex)
            walks.append(walk)
    return walks

# 使用自定义的随机游走函数
walks = random_walk(graph, num_walks=10, walk_length=10)
使用 Word2Vec 生成嵌入

        接下来,我们将使用 Spark MLlib 中的 Word2Vec 来从随机游走生成的序列中学习嵌入。

from pyspark.ml.feature import Word2Vec

# 将随机游走的结果转化为 DataFrame
walks_df = spark.createDataFrame(walks, ["walk"])

# 设置 Word2Vec 模型
word2Vec = Word2Vec(vectorSize=100, inputCol="walk", outputCol="result", minCount=0)
model = word2Vec.fit(walks_df)

# 获取节点的嵌入
node_embeddings = model.getVectors()
node_embeddings.show()
Step 5: 评估和使用嵌入

        生成的节点嵌入可以用于多种下游任务,得到节点嵌入后,可以将其用于各种图分析任务,比如节点分类、图聚类等、链接预测等。评估嵌入通常需要具体任务相关的指标。评估嵌入的效果通常依赖于这些任务的性能。

节点分类示例

        如果有节点的标签数据,可以使用这些嵌入来训练一个分类器,并评估其性能。

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 假设有一个包含节点标签的 DataFrame
labels = spark.createDataFrame([
    ("1", "Class1"),
    ("2", "Class2"),
    ("3", "Class3"),
], ["id", "label"])

# 将标签与嵌入进行合并
data = labels.join(node_embeddings, labels.id == node_embeddings.word, how='inner')

# 准备数据集
data = data.select("result", "label")
(trainingData, testData) = data.randomSplit([0.8, 0.2])

# 训练逻辑回归模型
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, featuresCol="result", labelCol="label")
lrModel = lr.fit(trainingData)

# 评估模型
predictions = lrModel.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

        这个简单的流程展示了如何使用 Spark 和 GraphFrames 进行更高级的 Graph Embedding,并利用嵌入来执行图分析任务。实际应用中,你可能需要进一步调整模型的参数,或者对特定任务做优化。

Step 6: 部署到生产环境

        将模型部署到生产环境通常涉及将模型保存并在生产环境中加载它,使用如下:

# 保存模型
model_path = "/path/to/save/model"
graph_embedding_model.save(model_path)

# 在生产环境中加载模型
loaded_model = GraphEmbeddingModel.load(model_path)

总结

        这个示例提供了在 Spark 上进行基本图嵌入的框架,但请注意,真正的 Graph Embedding 如 DeepWalk 或 Node2Vec 需要更复杂的实现。如果你的需求超出了 PageRank 等简单算法的范围,可能需要查阅更多资源或使用专门的图分析工具来实现。这个示例提供了一个简单的示范引导,以便理解图嵌入的基本概念,并在 Spark 环境中实现它们。

标签:嵌入,Graph,walk,Embedding,Spark,节点
From: https://blog.csdn.net/goTsHgo/article/details/144556658

相关文章

  • C# Graphics 中如何快速反转Y轴坐标系,方便后续绘图
    这段时间搞东西发现C#Windows库里没有了以前C++中修改坐标系的一些API,比如  SetViewportExtEx, SetWindowExtEx这些东西。众所周知,在Windows系统中,坐标系的原点是从左上角开始为(0,0)的,Y轴越下值越大。这与现实生活中场景,低的Y才是0,向上Y值会越来越大,正好相反。 如......
  • Graph - Study Notes 4
          communitydetection   centrality   PageRank   APOC ......
  • Spark优化----Spark 性能调优
    目录常规性能调优常规性能调优一:最优资源配置常规性能调优二:RDD 优化RDD 复用RDD 持久化RDD 尽可能早的filter 操作常规性能调优三:并行度调节常规性能调优四:广播大变量常规性能调优五:Kryo 序列化常规性能调优六:调节本地化等待时长算子调优算子调优一:mapPar......
  • QGraphicsScene保存图片
    QGraphicsScene保存图片1importsys2importtime3fromPySide6.QtCoreimport*4fromPySide6.QtGuiimport*5fromPySide6.QtWidgetsimport*67classMyQWidget(QWidget):8def__init__(self,parent=...,f=...):9super().__init......
  • 在不同环境下安装Spark的详细步骤
    一、前提条件Java安装Spark是基于Java开发的,所以需要先安装Java。确保Java8或更高版本已经安装在你的系统中。你可以通过在命令行中输入java-version来检查Java是否安装以及其版本。例如,在Ubuntu系统中,可以使用以下命令安装Java:sudoapt-getupdatesudoapt-getin......
  • 分布式内存计算引擎Spark
    一、Spark概述定义与背景Spark是一个快速、通用的分布式计算引擎,最初是在加州大学伯克利分校的AMPLab开发的。它旨在处理大规模数据处理任务,如数据分析、机器学习和图计算等。与传统的HadoopMapReduce相比,Spark在性能上有显著的提升,尤其是在迭代计算和交互式查询方面。例如......
  • 12.11 每日总结(Spark 去重)
    今天学习Spark去重。学习时长2小时 importorg.apache.spark.sql.{SparkSession}objectMergeAndDeduplicate{defmain(args:Array[String]):Unit={//创建SparkSessionvalspark=SparkSession.builder().appName("MergeandDeduplicate").mas......
  • Graph - Study Notes 3
                ......
  • 在华为开发者空间,基于FunctionGraph快速部署在线画图工具
    本文分享自华为云社区《【开发者空间实践指导】使用函数工作流部署画图工具Excalidraw》,作者:开发者空间小蜜蜂。1.1案例介绍函数工作流FunctionGraph是一项基于事件驱动的函数托管计算服务,只需要编写业务函数代码并设置运行的条件,无需配置和管理服务器等基础设施,函数以弹性、免......
  • Spark向量化计算在美团生产环境的实践15
     1什么是向量化计算1.1并行数据处理:SIMD指令让我们从一个简单问题开始:假设要实现“数组a+b存入c”,设三个整型数组的长度都是100,那么只需将“c[i]=a[i]+b[i]”置于一个100次的循环内,代码如下:voidaddArrays(constint*a,constint*b,int*c,intnum){for(in......