首页 > 编程语言 >GeaFlow图计算快速上手之K-hop算法

GeaFlow图计算快速上手之K-hop算法

时间:2023-08-16 11:32:20浏览次数:38  
标签:算法 GeaFlow dsl geaflow hop 上手 import com


layout: post read_time: true show_date: true show_author: true title: "GeaFlow图计算快速上手之K-hop算法" date: 2023-08-15 tags: [图计算, 图算法, K-hop, GeaFlow, 开源, GitHub] category: opinion author: TuGraph description: "GeaFlow API是对高阶用户提供的开发接口,用户可以直接通过编写java代码来编写计算作业,相比于DSL,API的方式开发更加灵活,也能实现更丰富的功能和更复杂的计算逻辑。"


引言

随着年轻人的社交需求不断增长,各种社交软件应运而生,这些社交软件通常都会有好友推荐功能,根据六度分离理论,理想情况下,每个人通过6个人就可以跟所有人产生关联,因此K-hop算法(K跳算法)被用于实现好友推荐,现在让我们来尝试使用GeaFlow在5分钟内实现K-hop算法吧!

K-hop(K跳)算法介绍

K-hop算法是一种基于图论的算法,用于寻找一个起点通过K次以内跳跃能够到达的节点,也就是从起点出发,找出K层内与之关联的节点。K-hop算法广泛应用于好友推荐、影响力预测和关系发现等场景。 K-hop算法本质上是一种广度优先搜索(BFS)算法,通过从起点开始不断向外扩散的方式来计算每一个节点到起点的跳跃数。算法流程如下:

GeaFlow实现K-hop算法

首先需要通过实现AlgorithmUserFunction接口来编写K-hop算法的UDGA,K-hop算法的参考实现如下:

package com.antfin.rayag.myUDF;

import com.antgroup.geaflow.common.type.primitive.IntegerType;
import com.antgroup.geaflow.common.type.primitive.StringType;
import com.antgroup.geaflow.dsl.common.algo.AlgorithmRuntimeContext;
import com.antgroup.geaflow.dsl.common.algo.AlgorithmUserFunction;
import com.antgroup.geaflow.dsl.common.data.RowEdge;
import com.antgroup.geaflow.dsl.common.data.RowVertex;
import com.antgroup.geaflow.dsl.common.data.impl.ObjectRow;
import com.antgroup.geaflow.dsl.common.data.impl.types.IntVertex;
import com.antgroup.geaflow.dsl.common.function.Description;
import com.antgroup.geaflow.dsl.common.types.StructType;
import com.antgroup.geaflow.dsl.common.types.TableField;
import com.antgroup.geaflow.model.graph.edge.EdgeDirection;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

@Description(name = "khop", description = "built-in udga for KHop")
public class KHop implements AlgorithmUserFunction<Object, Integer> {

    private AlgorithmRuntimeContext<Object, Integer> context;
    private int srcId = 1;
    private int k = 1;

    @Override
    public void init(AlgorithmRuntimeContext<Object, Integer> context, Object[] parameters) {
        this.context = context;
        if (parameters.length > 2) {
            throw new IllegalArgumentException(
                    "Only support zero or more arguments, false arguments "
                            + "usage: func([alpha, [convergence, [max_iteration]]])");
        }
        if (parameters.length > 0) {
            srcId = Integer.parseInt(String.valueOf(parameters[0]));
        }
        if (parameters.length > 1) {
            k = Integer.parseInt(String.valueOf(parameters[1]));
        }
    }

    @Override
    public void process(RowVertex vertex, Iterator<Integer> messages) {
        List<RowEdge> outEdges = new ArrayList<>(context.loadEdges(EdgeDirection.OUT));
        //第一轮迭代将所有顶点初始化,目标点的K值初始化为0,并向邻点发送消息,其他点的K值初始化为Integer.MAX_VALUE
        if (context.getCurrentIterationId() == 1L) {
            if(srcId == (int) vertex.getId()) {
                sendMessageToNeighbors(outEdges, 1);
                context.updateVertexValue(ObjectRow.create(0));
                context.take(ObjectRow.create(vertex.getId(), 0));
            }else{
                context.updateVertexValue(ObjectRow.create(Integer.MAX_VALUE));
            }
        } else if (context.getCurrentIterationId() <= k+1) {
            int currentK = (int) vertex.getValue().getField(0, IntegerType.INSTANCE);
            //如果当前顶点收到消息,并且K值为Integer.MAX_VALUE(没有被遍历到),则本轮应该修改K值,并向邻边发消息
            if(messages.hasNext() && currentK == Integer.MAX_VALUE){
                Integer currK = messages.next();
                //将当前顶点写出
                context.take(ObjectRow.create(vertex.getId(), currK));
                //更新当前顶点的K值
                context.updateVertexValue(ObjectRow.create(currK));
                //向邻点发消息
                sendMessageToNeighbors(outEdges, currK+1);
            }
        }
    }

    //设置输出类型
    @Override
    public StructType getOutputType() {
        return new StructType(
                new TableField("id", IntegerType.INSTANCE, false),
                new TableField("k", IntegerType.INSTANCE, false)
        );
    }

    private void sendMessageToNeighbors(List<RowEdge> outEdges, Integer message) {
        for (RowEdge rowEdge : outEdges) {
            context.sendMessage(rowEdge.getTargetId(), message);
        }
    }
}

Geaflow运行K-hop算法实战

将KHop类打包成UDGA

新建一个maven工程,将KHop类放/src/main/java目录下,pom文件中需要添加如下依赖:

<dependency>
            <groupId>com.antgroup.tugraph</groupId>
            <artifactId>geaflow-dsl-common</artifactId>
            <version>0.1</version>
</dependency>

参考https://github.com/TuGraph-family/tugraph-analytics/blob/master/docs/docs-cn/application-development/dsl/overview.md

将UDGA上传至console平台

注册k-hop函数,并在DSL中使用

set geaflow.dsl.window.size = -1;
set geaflow.dsl.ignore.exception = true;

CREATE GRAPH IF NOT EXISTS g (
  Vertex v (
    vid int ID,
    vvalue int
  ),
  Edge e (
    srcId int SOURCE ID,
    targetId int DESTINATION ID
  )
) WITH (
  storeType='rocksdb',
  shardCount = 1
);

CREATE TABLE IF NOT EXISTS v_source (
    v_id int,
    v_value int
) WITH (
  type='file',
  //vertex文件中保存了点的信息,文件放在与KHop类目录下的resources目录下,此处可以换成其他数据源
  geaflow.dsl.file.path = 'resource:///input/vertex'
);


CREATE TABLE IF NOT EXISTS e_source (
    src_id int,
    dst_id int
) WITH (
  type='file',
    //edge文件中保存了边的信息,文件放在与KHop类目录下的resources目录下,此处可以换成其他数据源
  geaflow.dsl.file.path = 'resource:///input/edge'
);

//定义结果表
CREATE TABLE IF NOT EXISTS tbl_result (
  v_id int,
  k_value int
) WITH (
  type='file',
   geaflow.dsl.file.path = '/tmp/result'
);

USE GRAPH g;

INSERT INTO g.v(vid, vvalue)
SELECT
v_id, v_value
FROM v_source;

INSERT INTO g.e(srcId, targetId)
SELECT
 src_id, dst_id
FROM e_source;

//注册khop函数
CREATE Function khop AS 'com.antfin.rayag.myUDF.KHop';

INSERT INTO tbl_result(v_id, k_value)
//调用khop函数,并返回结果
CALL khop(1,2) YIELD (vid, kValue)
RETURN vid, kValue
;

运行结果

输入数据如下:

//vertex文件内容:
1,1
2,1
3,1
4,1
5,1
6,1

//edge文件内容:
1,3
1,5
1,6
2,3
3,4
4,1
4,6
5,4
5,6

在container的/tmp/result文件中可以得到结果如下:

1,0
3,1
5,1
6,1
4,2

至此,我们就成功使用Geaflow实现并运行了K-hop算法了!是不是超简单!快来试一试吧!

GeaFlow(品牌名TuGraph-Analytics) 已正式开源,欢迎大家关注!!!

欢迎给我们 Star 哦!

Welcome to give us a Star!

GitHub

标签:算法,GeaFlow,dsl,geaflow,hop,上手,import,com
From: https://blog.51cto.com/u_16180133/7100531

相关文章

  • 图加速数据湖分析-GeaFlow和Apache Hudi集成
    表模型现状与问题关系模型自1970年由埃德加·科德提出来以后被广泛应用于数据库和数仓等数据处理系统的数据建模。关系模型以表作为基本的数据结构来定义数据模型,表为二维数据结构,本身缺乏关系的表达能力,关系的运算通过Join关联运算来处理。表模型简单且易于理解,在关系模型中被广......
  • Mac系统Photoshop AI版本下载
    大梦想        来到创新的前沿玩耍吧。召唤概念,将视野转化为远景,建造天空中的城堡。有了简单的文本提示,生成填充(测试版)和生成扩展(测试版),谁都无法告诉你,你的想法将把你带到何方。生成式人工智能现已应用于Photoshop(测试版)从你最狂野的梦想到惊人的图像,只需几秒钟的时间。使......
  • 在线photoshop网页版工具开发
    基于javascript开发的在线ps工具,打包方式webpack在线预览在线ps网页版源码地址https://github.com/geeeeeeeek功能介绍在线图像编辑器允许您使用HTML5技术创建、编辑图像。无需购买、下载、安装或拥有过时的闪存。没有广告。主要功能:图层,过滤器,HTML5,开源,Photoshop替代......
  • Shopify 真实案例技术赚钱营销课视频教程
    Shopify真实案例技术赚钱营销课视频教程本视频课程是为那些希望学习如何利用Shopify平台以及有效的市场营销技巧来赚钱的人群而设计的。无论你是一名初学者还是有一定经验的网店经营者,本课程都将为你提供实际案例和技术指导,从而帮助你成功创建和运营一个盈利的电子商务企业。课......
  • 探索未知,即刻搭建AI原生应用!WAVE SUMMIT Workshop等你来参加
    你是否希望掌握大模型开发的秘诀?你是否渴望得到实践操作的机会?如果你的心中充满热情和期待,那么,WAVESUMMIT2023特别设置的Workshop将会是你的知识启航站!本次Workshop专注于AI开发与大模型应用,邀请一线优秀的AI软件工程师和产品专家,分享他们的技术心得,带领每一位参与者深入了解AI的......
  • apache/hop-web 2.5安装和简单入门
    一、使用Docker安装部署1、拉取镜像推荐使用下面的web版本dockerpullapache/hop:latestdockerpullapache/hop-web:latest2、部署a、简单部署(不使用用户名密码)dockerrun-p8080:8080apache/hop-web:latestb、使用用户名密码和相关数据库配置的部署docker文件......
  • 让Photoshop支持.ICO文件格式
    需要安装一个文件插件ICOFormat.8bi。官方下载地址:http://www.telegraphics.net/sw/下载以后的存放路径:...\Required\Plug-ins\FileFormats参考网址:https://blog.csdn.net/weixin_44222492/article/details/101596183......
  • FX110: 简单易上手!移动平均线的五大特性及应用
    移动平均线是看盘过程中,很常用的、很有重要的技术指标。我们常规采用的移动平均线周期,短期是5日、10日。中期是30日、60日。长期是120日、250日,也就是常说的半年线和年线。 移动平均线的特性:1、趋势性。移动平均线会保持与趋势一致的方向,能消除股价在运行过程中出现大的起伏。2、......
  • 《从0到1:JavaScript快速上手》笔记(一)
    一、两个十分有用的方法document.write():表示在页面输出一个内容alert():表示弹出一个对话框二、变量与常量在JavaScript中,变量指的是一个可以改变的量,也就是说,变量的值在程序运行过程中是可以改变的。(1)在JavaScript中,给一个变量命名,我们需要遵循以下2个方面的原则。变量有字母、......
  • Flink Unaligned Checkpoint 在 Shopee 的优化和实践
    一、Checkpoint存在的问题1.1数据库的Snapshot和恢复机制首先看一下数据库的Snapshot和恢复机制。数据库周期性地快照,每一次Snapshot是一个全量快照,同时要持续地写ChangeLog或WAL。当数据库crash后,新的数据库实例会从Snapshot3以及最新的ChangeLog恢复。1.2Fli......