风控图算法Graph Embedding(DeepWalk&Node2Vec)代码实现
在上一篇中我们简单介绍了常用的Graph Embedding算法,今天来对其中较为常用的两种算法——DeepWalk和Node2Vec进行python代码实现。
文章目录
一、Karate Club算法包
Karate Club 是一个 Python 算法包,专门用于图形分析和图形挖掘。它提供了一系列经典和先进的图形聚类和图形嵌入算法,旨在帮助研究人员和数据科学家处理和分析各种类型的图形数据。该算法包的名字取自 Zachary’s Karate Club,这是一个关于社交网络的著名案例,用于研究社区结构和社交网络动态性。Karate Club 旨在为用户提供一种简单而灵活的方式来探索和分析图形数据,同时提供了一些高效的算法和工具,以解决图形分析中的常见问题。
import networkx as nx
# 创建或加载一个图
G = nx.karate_club_graph()
二、DeepWalk算法实现
- 原始版本的DeepWalk针对无向无权图设计,即不考虑边的方向也不考虑边的权重。
- 应用DeepWalk于有向图时,随机游走会遵循边的方向,这可能会影响游走的覆盖性和最终的嵌入结果。
- 有些变体或类似的方法可能会扩展DeepWalk以考虑边权重,通过调整随机游走的过程来偏好权重较高的边。
DeepWalk参数
- walk_number:游走次数
1.游走次数指的是对每个节点执行随机游走的次数。增加游走次数可以提高模型对节点之间关系的把握程度,但也会增加计算成本。
2.通常情况下,较多的游走次数可以帮助算法更好地探索图形的结构,尤其是对于大型图形数据集。
3.可以选择 10 到 80之间的游走次数,但具体的选择取决于数据集的规模和复杂度。 - walk_length:游走长度
1.游走长度指的是在每次随机游走中经过的节点数量。较长的游走长度能够更好地捕捉到节点之间的全局结构信息,但也会增加计算成本。
2.对于大型图形数据集或者密集连接的图形,通常选择较长的游走长度,以便更全面地探索图形的结构。
3.对于稀疏连接的图形或者想要更注重局部结构信息的情况,可以选择较短的游走长度。
4.常见的游走长度通常在 10 到 80 之间,具体选择取决于数据集的特点和算法的目标。 - dimensions:嵌入维度
1.嵌入维度指的是学习到的节点表示的维度大小。较高的嵌入维度可以提供更丰富的节点表示信息,但也可能增加计算和存储成本。
2.对于大型图形数据集或者需要较高精度的任务,可以选择较高的嵌入维度。
3.对于资源受限或者对计算效率要求较高的情况,可以选择较低的嵌入维度
Karate Club实现
from karateclub import DeepWalk
from karateclub import Estimator
# 初始化DeepWalk模型
deepwalk = DeepWalk(walk_number=10, walk_length=80, dimensions=64, epochs=10)
# 训练模型
deepwalk.fit(G)
# 获取嵌入
embedding = deepwalk.get_embedding()
模型参数
- walk_number(int):随机行走的次数。默认值为10。
- walk_length(int):随机行走的长度。默认值为80。
- dimensions(int):嵌入的维数。默认值为128。
- workers(int):核心数。默认值为4。
- window_size(int):矩阵幂序。默认值为5。
- epochs(int):迭代次数。默认值为1。
- use_hierarchical_softmax(bool):是使用分层softmax还是负采样来训练模型。默认值为True。
- number_of_onegative_samples(int):要采样的负节点数(通常在5-20之间)。如果设置为0,则不使用负采样。默认值为5。
- learning_rate(float):学习率。默认值为0.05。
- min_count(int):节点出现次数的最小值。默认值为1。
- seed(int):随机种子值。默认值为42
DeepWalk手动代码实现
from gensim.models.word2vec import Word2Vec
import random
from functools import partial
from typing import List, Callable
import numpy as np
# 随机游走
class RandomWalker:
"""
Class to do fast first-order random walks.
Args:
walk_length (int): Number of random walks.
walk_number (int): Number of nodes in truncated walk.
"""
def __init__(self, walk_length: int, walk_number: int):
self.walk_length = walk_length
self.walk_number = walk_number
def do_walk(self, node: int) -> List[str]:
"""
Doing a single truncated random walk from a source node.
Arg types:
* **node** *(int)* - The source node of the random walk.
Return types:
* **walk** *(list of strings)* - A single truncated random walk.
"""
walk: List[int] = [node]
for _ in range(self.walk_length - 1):
nebs = [node for node in self.graph.neighbors(walk[-1])]
if len(nebs) > 0:
walk = walk + random.sample(nebs, 1)
walk = [str(w) for w in walk]
return walk
def do_walks(self, graph):
"""
Doing a fixed number of truncated random walk from every node in the graph.
Arg types:
* **graph** *(NetworkX graph)* - The graph to run the random walks on.
"""
self.walks: List[List[str]] = []
self.graph = graph
for node in self.graph.nodes():
for _ in range(self.walk_number):
walk_from_node = self.do_walk(node)
self.walks.append(walk_from_node)
# DeepWalk
class CustomDeepWalk(Estimator):
def __init__(
self,
walk_number: int = 10,
walk_length: int = 80,
dimensions: int = 128,
workers: int = 4,
window_size: int = 5,
epochs: int = 1,
use_hierarchical_softmax: bool = True,
number_of_negative_samples: int = 5,
learning_rate: float = 0.05,
min_count: int = 1,
seed: int = 42,
):
self.walk_number = walk_number
self.walk_length = walk_length
self.dimensions = dimensions
self.workers = workers
self.window_size = window_size
self.epochs = epochs
self.use_hierarchical_softmax = use_hierarchical_softmax
self.number_of_negative_samples = number_of_negative_samples
self.learning_rate = learning_rate
self.min_count = min_count
self.seed = seed
def fit(self, graph: nx.classes.graph.Graph):
"""
Fitting a DeepWalk model.
Arg types:
* **graph** *(NetworkX graph)* - The graph to be embedded.
"""
self._set_seed()
graph = self._check_graph(graph)
# 随机游走生成序列
walker = RandomWalker(self.walk_length, self.walk_number)
walker.do_walks(graph)
# Word2Vec
model = Word2Vec(
walker.walks,
hs=1 if self.use_hierarchical_softmax else 0,
negative=self.number_of_negative_samples,
alpha=self.learning_rate,
epochs=self.epochs,
vector_size=self.dimensions,
window=self.window_size,
min_count=self.min_count,
workers=self.workers,
seed=self.seed,
)
num_of_nodes = graph.number_of_nodes()
self._embedding = [model.wv[str(n)] for n in range(num_of_nodes)]
def get_embedding(self) -> np.array:
r"""Getting the node embedding.
Return types:
* **embedding** *(Numpy array)* - The embedding of nodes.
"""
return np.array(self._embedding)
# 初始化自定义DeepWalk模型
customdeepwalk = CustomDeepWalk(walk_number=10, walk_length=80, dimensions=64, epochs=10)
# 训练模型
customdeepwalk.fit(G)
# 获取嵌入
embedding = customdeepwalk.get_embedding()
三、Node2Vec算法实现
- 原始版本的DeepWalk针对无向无权图设计,即不考虑边的方向也不考虑边的权重。
- 相比DeepWalk中的随机游走过程,Node2Vec通过引入两个参数 p 和 q来扩展随机游走过程,从而在游走过程中平衡探索(breadth-first)和开发(depth-first)策略。参数 p控制了回到先前节点的可能性,而参数 q 控制了跳转到邻居节点的可能性。这样,Node2Vec 可以在随机游走中更加灵活地探索图的局部结构。
- Node2Vec 算法相对于 DeepWalk 更复杂一些,因为它引入了额外的参数 p 和 q,以及更复杂的随机游走策略。这可能使得Node2Vec 在一些情况下更耗时,尤其是在参数调优和图规模较大时。
Karate Club实现
from karateclub import Node2Vec
# 初始化 Node2Vec 模型
model = Node2Vec()
# 训练 Node2Vec 模型
model.fit(G)
# 获取节点的嵌入向量
embeddings = model.get_embedding()
模型参数
- walk_number(int):随机行走的次数。默认值为10。
- walk_length(int):随机行走的长度。默认值为80。
- p(float):返回上一个节点的概率(1/p转换概率)。
- q(float):输入-输出参数(1/q转换概率),用于远离前一个节点。
- dimensions(int):嵌入的维数。默认值为128。
- workers(int):核心数。默认值为4。
- window_size(int):矩阵幂序。默认值为5。
- epochs(int):迭代次数。默认值为1。
- use_hierarchical_softmax(bool):是使用分层softmax还是负采样来训练模型。默认值为True。
- number_of_onegative_samples(int):要采样的负节点数(通常在5-20之间)。如果设置为0,则不使用负采样。默认值为5。
- learning_rate(float):学习率。默认值为0.05。
- min_count(int):节点出现次数的最小值。
Node2Vec手动代码实现
def _undirected(node, graph) -> List[tuple]:
# 对于无向图,返回所有的边
edges = graph.edges(node)
return edges
def _directed(node, graph) -> List[tuple]:
# 对于有向图,只返回出边
edges = graph.out_edges(node, data=True)
return edges
def _get_edge_fn(graph) -> Callable:
# 判断图的类型,采取不同的获取边的方案
fn = _directed if nx.classes.function.is_directed(graph) else _undirected
fn = partial(fn, graph=graph)
return fn
def _unweighted(edges: List[tuple]) -> np.ndarray:
# 没有权重都设置为1
return np.ones(len(edges))
def _weighted(edges: List[tuple]) -> np.ndarray:
# 有权重用自身权重
weights = map(lambda edge: edge[-1]["weight"], edges)
return np.array([*weights])
def _get_weight_fn(graph) -> Callable:
# 判断图形是否存在权重
fn = _weighted if nx.classes.function.is_weighted(graph) else _unweighted
return fn
# 扩展的随机游走过程(引入p、q)
class BiasedRandomWalker:
"""
Args:
walk_length (int): Number of random walks.
walk_number (int): Number of nodes in truncated walk.
p (float): Return parameter (1/p transition probability) to move towards previous node.
q (float): In-out parameter (1/q transition probability) to move away from previous node.
"""
walks: list
graph: nx.classes.graph.Graph
# 表示可调用对象
edge_fn: Callable
weight_fn: Callable
def __init__(self, walk_length: int, walk_number: int, p: float, q: float):
self.walk_length = walk_length
self.walk_number = walk_number
self.p = p
self.q = q
def do_walk(self, node: int) -> List[str]:
"""
从源节点执行单个截断的二阶随机遍历
Arg types:
* **node** *(int)* - The source node of the random walk.
Return types:
* **walk** *(list of strings)* - A single truncated random walk.
"""
walk = [node]
previous_node = None
previous_node_neighbors = []
for _ in range(self.walk_length - 1):
current_node = walk[-1]
edges = self.edge_fn(current_node)
# 获取当前节点的邻居节点
current_node_neighbors = np.array([edge[1] for edge in edges])
# 获取到邻居节点的边的权重
weights = self.weight_fn(edges)
# 根据权重生成概率
probability = np.piecewise(
weights,
[
current_node_neighbors == previous_node,
# 判断当前节点的邻居节点是否在前一个节点的邻居节点中
np.isin(current_node_neighbors, previous_node_neighbors),
],
# 返回前一个节点的概率
[lambda w: w / self.p, lambda w: w / 1, lambda w: w / self.q],
)
# 标准化概率值
norm_probability = probability / sum(probability)
# 从 current_node_neighbors中以norm_probability概率抽取一个值
selected = np.random.choice(current_node_neighbors, 1, p=norm_probability)[
0
]
walk.append(selected)
previous_node_neighbors = current_node_neighbors
previous_node = current_node
walk = [str(w) for w in walk]
return walk
def do_walks(self, graph) -> None:
"""
Doing a fixed number of truncated random walk from every node in the graph.
Arg types:
* **graph** *(NetworkX graph)* - The graph to run the random walks on.
"""
self.walks = []
self.graph = graph
self.edge_fn = _get_edge_fn(graph)
self.weight_fn = _get_weight_fn(graph)
for node in self.graph.nodes():
for _ in range(self.walk_number):
walk_from_node = self.do_walk(node)
self.walks.append(walk_from_node)
# 扩展的随机游走过程(引入p、q)
class BiasedRandomWalker:
"""
Args:
walk_length (int): Number of random walks.
walk_number (int): Number of nodes in truncated walk.
p (float): Return parameter (1/p transition probability) to move towards previous node.
q (float): In-out parameter (1/q transition probability) to move away from previous node.
"""
walks: list
graph: nx.classes.graph.Graph
# 表示可调用对象
edge_fn: Callable
weight_fn: Callable
def __init__(self, walk_length: int, walk_number: int, p: float, q: float):
self.walk_length = walk_length
self.walk_number = walk_number
self.p = p
self.q = q
def do_walk(self, node: int) -> List[str]:
"""
从源节点执行单个截断的二阶随机遍历
Arg types:
* **node** *(int)* - The source node of the random walk.
Return types:
* **walk** *(list of strings)* - A single truncated random walk.
"""
walk = [node]
previous_node = None
previous_node_neighbors = []
for _ in range(self.walk_length - 1):
current_node = walk[-1]
edges = self.edge_fn(current_node)
# 获取当前节点的邻居节点
current_node_neighbors = np.array([edge[1] for edge in edges])
# 获取到邻居节点的边的权重
weights = self.weight_fn(edges)
# 根据权重生成概率
probability = np.piecewise(
weights,
[
current_node_neighbors == previous_node,
# 判断当前节点的邻居节点是否在前一个节点的邻居节点中
np.isin(current_node_neighbors, previous_node_neighbors),
],
# 返回前一个节点的概率
[lambda w: w / self.p, lambda w: w / 1, lambda w: w / self.q],
)
# 标准化概率值
norm_probability = probability / sum(probability)
# 从 current_node_neighbors中以norm_probability概率抽取一个值
selected = np.random.choice(current_node_neighbors, 1, p=norm_probability)[
0
]
walk.append(selected)
previous_node_neighbors = current_node_neighbors
previous_node = current_node
walk = [str(w) for w in walk]
return walk
def do_walks(self, graph) -> None:
"""
Doing a fixed number of truncated random walk from every node in the graph.
Arg types:
* **graph** *(NetworkX graph)* - The graph to run the random walks on.
"""
self.walks = []
self.graph = graph
self.edge_fn = _get_edge_fn(graph)
self.weight_fn = _get_weight_fn(graph)
for node in self.graph.nodes():
for _ in range(self.walk_number):
walk_from_node = self.do_walk(node)
self.walks.append(walk_from_node)
# 初始化自定义Node2Vec模型
customnode2vec = CustomNode2Vec(walk_number=10, walk_length=80, dimensions=128, epochs=10)
# 训练模型
customnode2vec.fit(G)
# 获取嵌入
embedding = customnode2vec.get_embedding()