首页 > 其他分享 >TuGraph任务能力增强:通过API定制流图计算逻辑

TuGraph任务能力增强:通过API定制流图计算逻辑

时间:2023-08-15 13:07:14浏览次数:46  
标签:API fields value GeaFlow PageRank TuGraph result 流图


layout: post read_time: true show_date: true show_author: true title: "GeaFlow任务能力增强:通过API定制流图计算逻辑" date: 2023-08-15 tags: [图计算, 高阶API, TuGraph, GeaFlow, Java] category: opinion author: TuGraph description: "GeaFlow API是对高阶用户提供的开发接口,用户可以直接通过编写java代码来编写计算作业,相比于DSL,API的方式开发更加灵活,也能实现更丰富的功能和更复杂的计算逻辑。"


GeaFlow API介绍

GeaFlow API是对高阶用户提供的开发接口,用户可以直接通过编写java代码来编写计算作业,相比于DSL,API的方式开发更加灵活,也能实现更丰富的功能和更复杂的计算逻辑。 在GeaFlow中,API支持Graph API和Stream API两种类型:

  • Graph API:Graph是GeaFlow框架的一等公民,当前GeaFlow框架提供了一套基于GraphView的图计算编程接口,包含图构建、图计算及遍历。在GeaFlow中支持Static Graph和Dynamic Graph两种类型。
  • Static Graph API:静态图计算API,基于该类API可以进行全量的图计算或图遍历。
  • Dynamic Graph API:动态图计算API,GeaFlow中GraphView是动态图的数据抽象,基于GraphView 之上,可以进行动态图计算或图遍历。同时支持对Graphview生成Snapshot快照,基于Snapshot可以提供和Static Graph API一样的接口能力。
  • Stream API:GeaFlow提供了一套通用计算的编程接口,包括source构建、流批计算及sink输出。在GeaFlow中支持Batch和Stream两种类型。
  • Batch API:批计算API,基于该类API可以进行批量计算。
  • Stream API:流计算API,GeaFlow中StreamView是动态流的数据抽象,基于StreamView之上,可以进行流计算。

更多API的介绍可参考 https://github.com/TuGraph-family/tugraph-analytics/blob/master/docs/docs-cn/application-development/api/overview.md

PageRank算法示例

本例子是从文件中读取点边进行构图,执行pageRank算法后,将每个点的pageRank值进行打印。 其中,用户需要实现AbstractVcFunc,在compute方法中进行每一轮迭代的计算逻辑。 在本例子中,只计算了两轮迭代的结果。在第一轮中,每个点都会向邻居点发送当前点的value值,而在第二轮中,每个点收到邻居点发送的消息,将其value值进行累加,并更新为自己的value值,即为最后的PageRank值。

public class PageRank {

    private static final Logger LOGGER = LoggerFactory.getLogger(PageRank.class);

    public static final String RESULT_FILE_PATH = "./target/tmp/data/result/pagerank";

    private static final double alpha = 0.85;

    public static void main(String[] args) {
        Environment environment = EnvironmentUtil.loadEnvironment(args);
        IPipelineResult result = PageRank.submit(environment);
        PipelineResultCollect.get(result);
        environment.shutdown();
    }

    public static IPipelineResult submit(Environment environment) {
        Pipeline pipeline = PipelineFactory.buildPipeline(environment);
        Configuration envConfig = environment.getEnvironmentContext().getConfig();
        envConfig.put(FileSink.OUTPUT_DIR, RESULT_FILE_PATH);
        ResultValidator.cleanResult(RESULT_FILE_PATH);

        pipeline.submit((PipelineTask) pipelineTaskCxt -> {
            Configuration conf = pipelineTaskCxt.getConfig();
            PWindowSource<IVertex<Integer, Double>> prVertices =
                pipelineTaskCxt.buildSource(new FileSource<>("email_vertex",
                        line -> {
                            String[] fields = line.split(",");
                            IVertex<Integer, Double> vertex = new ValueVertex<>(
                                Integer.valueOf(fields[0]), Double.valueOf(fields[1]));
                            return Collections.singletonList(vertex);
                        }), AllWindow.getInstance())
                    .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));

            PWindowSource<IEdge<Integer, Integer>> prEdges = pipelineTaskCxt.buildSource(new FileSource<>("email_edge",
                    line -> {
                        String[] fields = line.split(",");
                        IEdge<Integer, Integer> edge = new ValueEdge<>(Integer.valueOf(fields[0]), Integer.valueOf(fields[1]), 1);
                        return Collections.singletonList(edge);
                    }), AllWindow.getInstance())
                .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));

            int iterationParallelism = conf.getInteger(ExampleConfigKeys.ITERATOR_PARALLELISM);
            GraphViewDesc graphViewDesc = GraphViewBuilder
                .createGraphView(GraphViewBuilder.DEFAULT_GRAPH)
                .withShardNum(2)
                .withBackend(BackendType.Memory)
                .build();
            PGraphWindow<Integer, Double, Integer> graphWindow =
                pipelineTaskCxt.buildWindowStreamGraph(prVertices, prEdges, graphViewDesc);

            SinkFunction<IVertex<Integer, Double>> sink = ExampleSinkFunctionFactory.getSinkFunction(conf);
            graphWindow.compute(new PRAlgorithms(10))
                .compute(iterationParallelism)
                .getVertices()
                .sink(v -> {
                    LOGGER.info("result {}", v);
                })
                .withParallelism(conf.getInteger(ExampleConfigKeys.SINK_PARALLELISM));
        });

        return pipeline.execute();
    }

    public static class PRAlgorithms extends VertexCentricCompute<Integer, Double, Integer, Double> {

        public PRAlgorithms(long iterations) {
            super(iterations);
        }

        @Override
        public VertexCentricComputeFunction<Integer, Double, Integer, Double> getComputeFunction() {
            return new PRVertexCentricComputeFunction();
        }

        @Override
        public VertexCentricCombineFunction<Double> getCombineFunction() {
            return null;
        }

    }

    public static class PRVertexCentricComputeFunction extends AbstractVcFunc<Integer, Double, Integer, Double> {

        @Override
        public void compute(Integer vertexId,
                            Iterator<Double> messageIterator) {
            IVertex<Integer, Double> vertex = this.context.vertex().get();
            List<IEdge<Integer, Integer>> outEdges = context.edges().getOutEdges();
            if (this.context.getIterationId() == 1) {
                if (!outEdges.isEmpty()) {
                    this.context.sendMessageToNeighbors(vertex.getValue() / outEdges.size());
                }

            } else {
                double sum = 0;
                while (messageIterator.hasNext()) {
                    double value = messageIterator.next();
                    sum += value;
                }
                double pr = sum * alpha + (1 - alpha);
                this.context.setNewVertexValue(pr);

                if (!outEdges.isEmpty()) {
                    this.context.sendMessageToNeighbors(pr / outEdges.size());
                }
            }
        }

    }
}

提交API作业

(以容器模式,PageRank算法示例)

算法打包

在新的项目中新建一个PageRank的demo,pom中引入geaflow依赖

<dependency>
	<groupId>com.antgroup.tugraph</groupId>
	<artifactId>geaflow-assembly</artifactId>
	<version>0.2-SNAPSHOT</version>
</dependency>

新建PageRank类,编写上述相关代码。 在项目resources路径下,创建测试数据文件email_vertex和email_edge,代码中会从resources://资源路径读取数据进行构图。

PWindowSource<IVertex<Integer, Double>> prVertices =
                pipelineTaskCxt.buildSource(new FileSource<>("email_vertex",
                        line -> {
                            String[] fields = line.split(",");
                            IVertex<Integer, Double> vertex = new ValueVertex<>(
                                Integer.valueOf(fields[0]), Double.valueOf(fields[1]));
                            return Collections.singletonList(vertex);
                        }), AllWindow.getInstance())
                    .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));

   PWindowSource<IEdge<Integer, Integer>> prEdges = pipelineTaskCxt.buildSource(new FileSource<>("email_edge",
                    line -> {
                        String[] fields = line.split(",");
                        IEdge<Integer, Integer> edge = new ValueEdge<>(Integer.valueOf(fields[0]), Integer.valueOf(fields[1]), 1);
                        return Collections.singletonList(edge);
                    }), AllWindow.getInstance())
                .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));

email_vertex

0,1
1,1
2,1
3,1
4,1
5,1
6,1
7,1
8,1
9,1

email_edge

4,3
0,1
2,3
4,6
2,4
6,8
0,2
4,8
0,5
0,7
0,8
9,0
7,0
7,1
7,2
9,5
3,0
7,4
5,3
7,5
1,0
5,4
9,8
3,4
7,9
3,7
3,8
1,6
8,0
6,0
6,2
8,5
4,2

maven打包,在target目录获取算法的jar包

mvn clean install

新增HLA图任务

在GeaFlow Console中新增图任务,任务类型选择“HLA”, 并上传jar包(或者选择已存在的jar包),其中entryClass为算法主函数所在的类。 点击“提交”,创建任务。

提交作业

点击"发布",可进入作业详情界面,点击“提交”即可提交作业。

查看运行结果

进入容器 /tmp/logs/task/ 目录下,查看对应作业的日志,可看到日志中打印了最终计算得到的每个点的pageRank值。

2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:0, value:1.5718675107490019)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:1, value:0.5176947080197076)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:2, value:1.0201253300467092)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:3, value:1.3753756869824914)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:4, value:1.4583114077692536)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:5, value:1.1341668910561529)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:6, value:0.6798184364673463)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:7, value:0.70935427506243)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:8, value:1.2827529511906106)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:9, value:0.2505328026562969)

可在作业详情中查看运行详情,

至此,我们就成功使用Geaflow实现并运行API任务了!是不是超简单!快来试一试吧!

GeaFlow(品牌名TuGraph-Analytics) 已正式开源,欢迎大家关注!!!

欢迎给我们 Star 哦!

Welcome to give us a Star!

GitHub

标签:API,fields,value,GeaFlow,PageRank,TuGraph,result,流图
From: https://blog.51cto.com/u_16180133/7086790

相关文章

  • 为什么 API 治理需要内部倡导
    API治理旨在帮助人们通过API实现最大价值。但是,只有了解API是什么以及API的重要性,并且认识到API治理是在帮助他们而不是监管他们,才能实现这一目标。这就是为什么在任何API治理举措中都必须包括内部API倡导的关键原因。一个API治理计划必须包括API倡导的三个方面......
  • 华为云API Explorer:自动化运维的得力助手
    华为云APIExplorer为开发者提供一站式API解决方案统一平台,集成华为云服务所有开放API,支持全量快速检索、可视化调试、帮助文档、代码示例等能力,帮助开发者快速学习API,使用API开发代码实现自动化运维。本文分享自华为云社区《深度开发者故事|华为云APIExplorer自动化运维的......
  • .net6webapi中配置Jwt实现鉴权验证
    JWT(JsonWebToken)jwt是一种用于身份验证的开放标准,他可以在网络之间传递信息,jwt由三部分组成:头部,载荷,签名。头部包含了令牌的类型和加密算法,载荷包含了用户的信息,签名则是对头部和载荷的加密结果。jwt鉴权验证是指在用户登录成功后,服务器生成一个jwt令牌并返回给客户端,客户端在......
  • C# 获取网络API接口中的数据(1)
    控制台案例:usingSystem;usingSystem.Net.Http;usingSystem.Threading.Tasks;usingNewtonsoft.Json;usingNewtonsoft.Json.Linq;usingSystem.Data;usingSystem.Xml.Linq;usingSystem.Net;namespaceConsoleApp{classProgram{staticvoidM......
  • Apipost接口自动化控制器使用详解
    测试人员在编写测试用例以及实际测试过程中,经常会遇到两个棘手的问题:稍微复杂一些的自动化测试逻辑,往往需要手动写代码才能实现,难以实现和维护测试用例编写完成后,需要手动执行,难以接入自动化体系这里,小编在Apipost自动化测试中还原了用户在电商平台购物实例,并利用这个实例来给大家......
  • Apipost接口自动化控制器使用详解
    测试人员在编写测试用例以及实际测试过程中,经常会遇到两个棘手的问题:•稍微复杂一些的自动化测试逻辑,往往需要手动写代码才能实现,难以实现和维护•测试用例编写完成后,需要手动执行,难以接入自动化体系这里,小编在Apipost自动化测试中还原了用户在电商平台购物实例,并利用这个实例来......
  • 图解 history api 和 React Router 实现原理
    Router是开发React应用的必备功能,那ReactRouter是怎么实现的呢?今天我们就来读一下ReactRouter的源码吧!首先,我们来学一下HistoryAPI,这是基础。什么是history呢?就是这个东西:我打开了一个新的标签页、然后访问baidu.com、sougou.com、taobao.com。长按后退按钮,就会列出......
  • 一文详解Apipost数据模型功能
    在Apipost数据模型中用户可以预先创建多个数据模型,并在API设计过程中重复利用这些模型来构建API创建数据模型在左侧导航点击「数据模型」-「新建数据模型」在右侧工作台配置数据模型参数 引入数据模型在API设计预定义响应期望下点击引用数据模型,并选择需要导入的数据模型......
  • API接口对接电商平台高效获取各大电商平台数据,商品详情数据代码示例
     电商可以通过使用API接口获取商品信息数据。API是应用程序编程接口的缩写,它允许程序之间进行通信和数据传输。为了获取商品信息数据,电商可以利用API接口向商品供应商的数据库发送请求,然后接收并解析返回的数据。具体来说,电商可以按照以下步骤利用API接口获取商品信息数据:1.找......
  • NET web api 利用NPOI 读取excel
    安装NPOI`[HttpPost("users/upload")]publicasyncTaskUpload(IFormFilefile){if(file==null||file.Length==0)returnthis.BadRequest("文件未来上传");varapi_result=newList<string>();//文件......