本文主要涉及图游走算法DeepWalk的代码实现。
1. DeepWalk采样算法
对于给定的节点,DeepWalk会等概率的选取下一个相邻节点加入路径,直至达到最大路径长度,或者没有下一个节点可选。Graph类的实现可参考 https://github.com/PaddlePaddle/PGL/blob/main/pgl/graph.py,DeepWalk的代码详见 ./deepwalk.py
安装依赖
# !pip install paddlepaddle==1.8.5
!pip install pgl
构建一张图,其中包含了10个节点以及14条边
请实现Graph类的random_walk函数
%%writefile userdef_graph.py
from pgl.graph import Graph
import numpy as np
from pgl.utils.logger import log
class UserDefGraph(Graph):
def random_walk(self, nodes, walk_len):
"""
输入:nodes - 当前节点id list (batch_size,)
walk_len - 最大路径长度 int
输出:以当前节点为起点得到的路径 list (batch_size, walk_len)
用到的函数
1. self.successor(nodes)
描述:获取当前节点的下一个相邻节点id列表
输入:nodes - list (batch_size,)
输出:succ_nodes - list of list ((num_successors_i,) for i in range(batch_size))
2. self.outdegree(nodes)
描述:获取当前节点的出度
输入:nodes - list (batch_size,)
输出:out_degrees - list (batch_size,)
"""
walks = [[node] for node in nodes]
#log.info(walks )
walks_ids = np.arange(0, len(nodes))
cur_nodes = np.array(nodes)
for l in range(walk_len):
"""选取有下一个节点的路径继续采样,否则结束"""
outdegree = self.outdegree(cur_nodes)
walk_mask = (outdegree != 0)
if not np.any(walk_mask):
break
cur_nodes = cur_nodes[walk_mask]
walks_ids = walks_ids[walk_mask]
outdegree = outdegree[walk_mask]
######################################
# 请在此补充代码采样出下一个节点
succ_nodes = self.successor(cur_nodes)
sample_index = np.floor(
np.random.rand(outdegree.shape[0]) * outdegree).astype("int64")
next_nodes = []
for s, ind, walk_id in zip(succ_nodes, sample_index, walks_ids):
walks[walk_id].append(s[ind])
next_nodes.append(s[ind])
######################################
cur_nodes = np.array(next_nodes)
log.info(walks )
return walks
PGL官网的random_walk源代码:
def random_walk(self, nodes, max_depth):
"""Implement of random walk.
This function get random walks path for given nodes and depth.
Args:
nodes: Walk starting from nodes
max_depth: Max walking depth
Return:
A list of walks.
"""
walk = []
# init
for node in nodes:
walk.append([node])
cur_walk_ids = np.arange(0, len(nodes))
cur_nodes = np.array(nodes)
for l in range(max_depth):
# select the walks not end
outdegree = self.outdegree(cur_nodes)
mask = (outdegree != 0)
if np.any(mask):
cur_walk_ids = cur_walk_ids[mask]
cur_nodes = cur_nodes[mask]
outdegree = outdegree[mask]
else:
# stop when all nodes have no successor
break
succ = self.successor(cur_nodes)
sample_index = np.floor(
np.random.rand(outdegree.shape[0]) * outdegree).astype("int64")
nxt_cur_nodes = []
for s, ind, walk_id in zip(succ, sample_index, cur_walk_ids):
walk[walk_id].append(s[ind])
nxt_cur_nodes.append(s[ind])
cur_nodes = np.array(nxt_cur_nodes)
return walk
运行脚本:
!python my_deepwalk.py --use_my_random_walk --epoch 5 # 用自己实现的random walk训练DeepWalk模型,可在 ./tmp/deepwalk/walks/ 中查看构造的节点路径
#!python link_predict.py --ckpt_path ./tmp/deepwalk/paddle_model --epoch 100 #测试
运行结果:
[INFO] 2021-02-13 17:27:03,835 [my_deepwalk.py: 302]: Namespace(batch_size=512, epoch=5, hidden_size=128, neg_num=20, processes=2, save_path='./tmp/deepwalk', use_my_random_walk=True, walk_len=5, win_size=5)
[INFO] 2021-02-13 17:27:04,651 [my_deepwalk.py: 219]: Start random walk on disk...
[INFO] 2021-02-13 17:27:04,675 [userdef_graph.py: 51]: [[9, 7, 2, 0], [8, 0], [6, 5, 0], [0], [2, 1], [3, 1], [1], [5, 0], [7, 3, 1], [4, 0]]
[INFO] 2021-02-13 17:27:04,675 [userdef_graph.py: 51]: [[4, 0], [0], [2, 1], [7, 1], [5, 0], [8, 0], [1], [6, 5, 0], [9, 7, 3, 1], [3, 1]]
[INFO] 2021-02-13 17:27:04,677 [userdef_graph.py: 51]: [[6, 0], [7, 3, 1], [5, 0], [4, 0], [3, 1], [8, 0], [2, 0], [1], [0], [9, 7, 0]]
[INFO] 2021-02-13 17:27:04,677 [userdef_graph.py: 51]: [[9, 7, 1], [5, 0], [3, 1], [7, 2, 1], [2, 1], [0], [8, 0], [4, 0], [1], [6, 5, 0]]
[INFO] 2021-02-13 17:27:04,678 [userdef_graph.py: 51]: [[9, 7, 0], [8, 0], [0], [4, 0], [1], [2, 1], [5, 0], [3, 1], [7, 3, 1], [6, 4, 0]]
[INFO] 2021-02-13 17:27:04,679 [my_deepwalk.py: 230]: Random walk on disk Done.
2021-02-13 17:27:04,680-WARNING: paddle.fluid.layers.py_reader() may be deprecated in the near future. Please use paddle.fluid.io.DataLoader.from_generator() instead.
[INFO] 2021-02-13 17:27:04,730 [my_deepwalk.py: 278]: Step 1 DeepWalk Loss: 0.718020 0.020003 s/step.
附录:my_deepwalk.py
"""DeepWalk代码文件"""
# 导入需要的依赖包
import argparse
import time
import os
import io
import math
from multiprocessing import Pool
import glob
import numpy as np
from pgl import data_loader
from pgl.utils.logger import log
import paddle.fluid as fluid
import paddle.fluid.layers as l
def deepwalk_model(graph, hidden_size=16, neg_num=5):
"""
该函数为Skip Gram模型部分,即课堂所讲的 Skip Gram + 负采样
函数参数含义:
graph: 图
hidden_size: 节点维度
neg_num: 负采样数目
"""
# 创建在Python端提供数据的reader
pyreader = l.py_reader(
capacity=70,
shapes=[[-1, 1, 1], [-1, 1, 1], [-1, neg_num, 1]],
dtypes=['int64', 'int64', 'int64'],
lod_levels=[0, 0, 0],
name='train',
use_double_buffer=True)
# 创建参数的初始化器
embed_init = fluid.initializer.UniformInitializer(low=-1.0, high=1.0)
weight_init = fluid.initializer.TruncatedNormal(scale=1.0 /
math.sqrt(hidden_size))
# 从给定的reader中读取数据,包括中心节点编号,对应的正样本节点编号和负样本节点编号
src, pos, negs = l.read_file(pyreader)
# 从Embedding矩阵中,根据input参数的节点id信息,查询对应节点的embedding表示
embed_src = l.embedding(
input=src,
size=[graph.num_nodes, hidden_size],
param_attr=fluid.ParamAttr(
name='content', initializer=embed_init))
weight_pos = l.embedding(
input=pos,
size=[graph.num_nodes, hidden_size],
param_attr=fluid.ParamAttr(
name='weight', initializer=weight_init))
weight_negs = l.embedding(
input=negs,
size=[graph.num_nodes, hidden_size],
param_attr=fluid.ParamAttr(
name='weight', initializer=weight_init))
### 负采样计算部分——Multi Sigmoids
# 分别计算正样本和负样本的 logits
pos_logits = l.matmul(
embed_src, weight_pos, transpose_y=True) # [batch_size, 1, 1]
neg_logits = l.matmul(
embed_src, weight_negs, transpose_y=True) # [batch_size, 1, neg_num]
# 设置正样本标签,并计算正样本loss
ones_label = pos_logits * 0. + 1.
ones_label.stop_gradient = True
pos_loss = l.sigmoid_cross_entropy_with_logits(pos_logits, ones_label)
# 设置负样本标签,并计算负样本loss
zeros_label = neg_logits * 0.
zeros_label.stop_gradient = True
neg_loss = l.sigmoid_cross_entropy_with_logits(neg_logits, zeros_label)
# 总的Loss计算为正样本与负样本loss之和
loss = (l.reduce_mean(pos_loss) + l.reduce_mean(neg_loss)) / 2
return pyreader, loss
def gen_pair(walks, left_win_size=2, right_win_size=2):
"""
该函数用于生成正样本对
函数参数含义:
walks: 多条节点游走序列
left_win_size: 左窗口值大小
right_win_size: 右窗口值大小
"""
src = []
pos = []
for walk in walks:
for left_offset in range(1, left_win_size + 1):
src.extend(walk[left_offset:])
pos.extend(walk[:-left_offset])
for right_offset in range(1, right_win_size + 1):
src.extend(walk[:-right_offset])
pos.extend(walk[right_offset:])
src, pos = np.array(src, dtype=np.int64), np.array(pos, dtype=np.int64)
src, pos = np.expand_dims(src, -1), np.expand_dims(pos, -1)
src, pos = np.expand_dims(src, -1), np.expand_dims(pos, -1)
return src, pos
def deepwalk_generator(graph,
batch_size=512,
walk_len=5,
win_size=2,
neg_num=5,
epoch=200,
filelist=None):
"""
此函数用于生成训练所需要的(中心节点、正样本、负样本)
"""
def walks_generator():
# 读取 random walk 序列
if filelist is not None:
bucket = []
for filename in filelist:
with io.open(filename) as inf:
for line in inf:
walk = [int(x) for x in line.strip('\n').split(' ')]
bucket.append(walk)
if len(bucket) == batch_size:
yield bucket
bucket = []
if len(bucket):
yield bucket
else:
# 如果没有找到存储random walk的文件,则在此处设置开始游走
for _ in range(epoch):
for nodes in graph.node_batch_iter(batch_size):
walks = graph.random_walk(nodes, walk_len)
yield walks
def wrapper():
# 生成训练用样本
for walks in walks_generator():
src, pos = gen_pair(walks, win_size, win_size)
if src.shape[0] == 0:
continue
# 在图中随机采样节点,作为当前 src 节点的负节点
negs = graph.sample_nodes([len(src), neg_num, 1]).astype(np.int64)
yield [src, pos, negs]
return wrapper
def process(args):
idx, graph, save_path, epoch, batch_size, walk_len, seed = args
with open('%s/%s' % (save_path, idx), 'w') as outf:
for _ in range(epoch):
np.random.seed(seed)
for nodes in graph.node_batch_iter(batch_size):
walks = graph.random_walk(nodes, walk_len)
for walk in walks:
outf.write(' '.join([str(token) for token in walk]) + '\n')
def main(args):
"""
主函数
"""
hidden_size = args.hidden_size
neg_num = args.neg_num
epoch = args.epoch
save_path = args.save_path
batch_size = args.batch_size
walk_len = args.walk_len
win_size = args.win_size
# 设置模型保存路径
if not os.path.isdir(save_path):
os.makedirs(save_path)
# 导入数据集
dataset = data_loader.ArXivDataset()
#print(type(dataset))
# 构建图
num_nodes = 10
# 定义图中的边集
edge_list = [(2, 0), (2, 1), (3, 1),(4, 0), (5, 0),
(6, 0), (6, 4), (6, 5), (7, 0), (7, 1),
(7, 2), (7, 3), (8, 0), (9, 7)]
# 随机初始化节点特征,特征维度为 d
d = 16
feature = np.random.randn(num_nodes, d).astype("float32")
# 随机地为每条边赋值一个权重
edge_feature = np.random.randn(len(edge_list), 1).astype("float32")
# Use your random_walk -- DeepWalk作业部分
if args.use_my_random_walk:
from userdef_graph import UserDefGraph
'''
pgl_graph = dataset.graph
dataset.graph = UserDefGraph(num_nodes=pgl_graph.num_nodes,
edges=pgl_graph.edges,
node_feat=pgl_graph.node_feat,
edge_feat=pgl_graph.edge_feat)
'''
dataset.graph = UserDefGraph(num_nodes = num_nodes,
edges = edge_list,
node_feat = {'feature':feature},
edge_feat ={'edge_feature': edge_feature})
log.info("Start random walk on disk...")
walk_save_path = os.path.join(save_path, "walks")
if not os.path.isdir(walk_save_path):
os.makedirs(walk_save_path)
# 多进程随机游走过程
pool = Pool(args.processes)
args_list = [(x, dataset.graph, walk_save_path, 1, batch_size,
walk_len, np.random.randint(2**32))
for x in range(epoch)]
pool.map(process, args_list)
filelist = glob.glob(os.path.join(walk_save_path, "*"))
log.info("Random walk on disk Done.")
train_steps = int(dataset.graph.num_nodes / batch_size) * epoch
place = fluid.CPUPlace()
deepwalk_prog = fluid.Program()
startup_prog = fluid.Program()
with fluid.program_guard(deepwalk_prog, startup_prog):
with fluid.unique_name.guard():
deepwalk_pyreader, deepwalk_loss = deepwalk_model(
dataset.graph, hidden_size=hidden_size, neg_num=neg_num)
#lr = l.polynomial_decay(0.025, train_steps, 0.0001) # 对初始学习率使用多项式衰减
lr = 0.0001 # 对初始学习率使用多项式衰减
adam = fluid.optimizer.Adam(lr)
adam.minimize(deepwalk_loss)
# 将deepwalk_generator生成的数据源feed到deepwalk_pyreader
deepwalk_pyreader.decorate_tensor_provider(
deepwalk_generator(
dataset.graph,
batch_size=batch_size,
walk_len=walk_len,
win_size=win_size,
epoch=epoch,
neg_num=neg_num,
filelist=filelist))
deepwalk_pyreader.start() # 开始数据传递
exe = fluid.Executor(place)
exe.run(startup_prog)
prev_time = time.time()
step = 0
# 开始训练
while 1:
try:
deepwalk_loss_val = exe.run(deepwalk_prog,
fetch_list=[deepwalk_loss],
return_numpy=True)[0]
cur_time = time.time()
use_time = cur_time - prev_time
prev_time = cur_time
step += 1
if step == 1 or step % 10 == 0:
log.info("Step %d " % step + "DeepWalk Loss: %f " %
deepwalk_loss_val + " %f s/step." % use_time)
except fluid.core.EOFException:
deepwalk_pyreader.reset()
break
# 保存训练好的DeepWalk模型
fluid.io.save_persistables(exe,
os.path.join(save_path, "paddle_model"),
deepwalk_prog)
if __name__ == '__main__':
# 超参数设置
parser = argparse.ArgumentParser(description='deepwalk')
parser.add_argument("--use_my_random_walk", action='store_true', help="use_my_random_walk")
parser.add_argument("--hidden_size", type=int, default=128)
parser.add_argument("--neg_num", type=int, default=20)
parser.add_argument("--epoch", type=int, default=1)
parser.add_argument("--batch_size", type=int, default=512)
parser.add_argument("--walk_len", type=int, default=5)
parser.add_argument("--win_size", type=int, default=5)
parser.add_argument("--save_path", type=str, default="./tmp/deepwalk")
parser.add_argument("--processes", type=int, default=2)
args = parser.parse_args()
log.info(args)
main(args)
标签:DeepWalk,graph,walk,神经网络,np,游走,deepwalk,nodes,size
From: https://blog.51cto.com/u_10561036/6123459