首页 > 其他分享 >neo4j实现表字段级血缘关系

neo4j实现表字段级血缘关系

时间:2023-08-22 14:11:36浏览次数:33  
标签:String List 表字 currentVertex ColumnVertex table neo4j 段级 name

需求背景

需要在前端页面展示当前表字段的所有上下游血缘关系,以进一步做数据诊断治理。大致效果图如下:

首先这里解释什么是表字段血缘关系,SQL 示例:

CREATE TABLE IF NOT EXISTS table_b
AS SELECT order_id, order_status FROM table_a;

如上 DDL 语句中,创建的 table_b 的 order_id 和 order_status 字段来源于 table_a,代表table_a 就是 table_b 的来源表,也叫上游表,table_b 就是 table_a 下游表,另外 table_a.order_id 就是 table_b.order_id 的上游字段,它们之间就存在血缘关系。

INSERT INTO table_c
SELECT a.order_id, b.order_status
FROM table_a a JOIN table_b b ON a.order_id = b.order_id;

如上 DML 语句中,table_c 的 order_id 字段来源于 table_a,而 order_status 来源于 table_b,表示 table_c 和 table_a、table_b 之间也存在血缘关系。

由上也可看出想要存储血缘关系,还需要先解析 sql,这块儿主要使用了开源项目 calcite 的解析器,这篇文章不再展开,本篇主要讲如何存储和如何展示

环境配置

参考另一篇:springboot 配置内嵌式 neo4j

Node 数据结构定义

因为要展示表的字段之间的血缘关系,所以直接将表字段作为图节点存储,表字段之间的血缘关系就用图节点之间的关系表示,具体 node 定义如下:

public class ColumnVertex {
  // 唯一键
  private String name;

  public ColumnVertex(String catalogName, String databaseName, String tableName, String columnName) {
    this.name = catalogName + "." + databaseName + "." + tableName + "." + columnName;
  }

  public String getCatalogName() {
    return Long.parseLong(name.split("\\.")[0]);
  }

  public String getDatabaseName() {
    return name.split("\\.")[1];
  }

  public String getTableName() {
    return name.split("\\.")[2];
  }

  public String getColumnName() {
    return name.split("\\.")[3];
  }
}

通用 Service 定义

public interface EmbeddedGraphService {
    // 添加图节点以及与上游节点之间的关系
    void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex);
    // 寻找上游节点
    List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex);
    // 寻找下游节点
    List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex);
}

Service 实现

import javax.annotation.Resource;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.springframework.stereotype.Service;

@Service
public class EmbeddedGraphServiceImpl implements EmbeddedGraphService {

  @Resource private GraphDatabaseService graphDb;

  @Override
  public void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex) {
    try (Transaction tx = graphDb.beginTx()) {
      tx.execute(
          "MERGE (c:ColumnVertex {name: $currentName}) MERGE (u:ColumnVertex {name: $upstreamName})"
              + " MERGE (u)-[:UPSTREAM]->(c)",
          Map.of("currentName", currentVertex.getName(), "upstreamName", upstreamVertex.getName()));
      tx.commit();
    }
  }

  @Override
  public List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex) {
    List<ColumnVertex> result = new ArrayList<>();
    try (Transaction tx = graphDb.beginTx()) {
      Result queryResult =
          tx.execute(
              "MATCH (u:ColumnVertex)-[:UPSTREAM]->(c:ColumnVertex) WHERE c.name = $name RETURN"
                  + " u.name AS name",
              Map.of("name", currentVertex.getName()));
      while (queryResult.hasNext()) {
        Map<String, Object> row = queryResult.next();
        result.add(new ColumnVertex().setName((String) row.get("name")));
      }
      tx.commit();
    }
    return result;
  }

  @Override
  public List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex) {
    List<ColumnVertex> result = new ArrayList<>();
    try (Transaction tx = graphDb.beginTx()) {
      Result queryResult =
          tx.execute(
              "MATCH (c:ColumnVertex)-[:UPSTREAM]->(d:ColumnVertex) WHERE c.name = $name RETURN"
                  + " d.name AS name",
              Map.of("name", currentVertex.getName()));
      while (queryResult.hasNext()) {
        Map<String, Object> row = queryResult.next();
        result.add(new ColumnVertex().setName((String) row.get("name")));
      }
      tx.commit();
    }
    return result;
  }
}

遍历图节点

实现逻辑:

  1. restful 接口入参:当前表(catalogName, databaseName, tableName)
  2. 定义返回给前端的数据结构,采用 nodes 和 edges 方式返回,然后前端再根据节点与边关系渲染出完整的血缘关系图
public class ColumnLineageVO {
  List<ColumnLineageNode> nodes;
  List<ColumnLineageEdge> edges;
}

public class ColumnLineageNode {
  private String databaseName;
  private String tableName;
  private List<String> columnNames;
}

public class ColumnLineageEdge {
  private ColumnLineageEdgePoint source;
  private ColumnLineageEdgePoint target;
}

public class ColumnLineageEdgePoint {
  private String databaseName;
  private String tableName;
  private String columnName;
}
  1. 查询表字段
  2. 采用递归的方式,利用当前表字段遍历与当前表字段关联的所有上下游图节点
  3. 将所有节点封装成 List ColumnLineageVO 返回给前端
public ColumnLineageVO getColumnLineage(Table table) {
    ColumnLineageVO columnLineageVO = new ColumnLineageVO();
    List<ColumnLineageNode> nodes = new ArrayList<>();
    List<ColumnLineageEdge> edges = new ArrayList<>();
    // Deduplication
    Set<String> visitedNodes = new HashSet<>();
    Set<String> visitedEdges = new HashSet<>();
    Map<String, List<ColumnVertex>> upstreamCache = new HashMap<>();
    Map<String, List<ColumnVertex>> downstreamCache = new HashMap<>();

    ColumnLineageNode currentNode =
        ColumnLineageNode.builder()
            .databaseName(table.getDatabaseName())
            .tableName(table.getTableName())
            .type(TableType.EXTERNAL_TABLE.getDesc())
            .build();
    nodes.add(currentNode);
    visitedNodes.add(currentNode.getDatabaseName() + "." + currentNode.getTableName());

    for (String columnName : table.getColumnNames()) {
      ColumnVertex currentVertex =
          new ColumnVertex(
              table.getScriptId(), table.getDatabaseName(), table.getTableName(), columnName);
      traverseUpstreamColumnVertex(
          currentVertex, nodes, edges, visitedNodes, visitedEdges, upstreamCache);
      traverseDownstreamColumnVertex(
          currentVertex, nodes, edges, visitedNodes, visitedEdges, downstreamCache);
    }

    columnLineageVO.setNodes(nodes);
    columnLineageVO.setEdges(edges);
    return columnLineageVO;
  }

private void traverseUpstreamColumnVertex(
      ColumnVertex currentVertex,
      List<ColumnLineageNode> nodes,
      List<ColumnLineageEdge> edges,
      Set<String> visitedNodes,
      Set<String> visitedEdges,
      Map<String, List<ColumnVertex>> cache) {
    List<ColumnVertex> upstreamVertices;
    if (cache.containsKey(currentVertex.getName())) {
      upstreamVertices = cache.get(currentVertex.getName());
    } else {
      upstreamVertices = embeddedGraphService.findUpstreamColumnVertex(currentVertex);
      cache.put(currentVertex.getName(), upstreamVertices);
    }
    for (ColumnVertex upstreamVertex : upstreamVertices) {
      String nodeKey = upstreamVertex.getDatabaseName() + "." + upstreamVertex.getTableName();
      if (!visitedNodes.contains(nodeKey)) {
        ColumnLineageNode upstreamNode =
            ColumnLineageNode.builder()
                .databaseName(upstreamVertex.getDatabaseName())
                .tableName(upstreamVertex.getTableName())
                .type(TableType.EXTERNAL_TABLE.getDesc())
                .build();
        nodes.add(upstreamNode);
        visitedNodes.add(nodeKey);
      }
      String edgeKey =
          upstreamVertex.getDatabaseName()
              + upstreamVertex.getTableName()
              + upstreamVertex.getColumnName()
              + currentVertex.getDatabaseName()
              + currentVertex.getTableName()
              + currentVertex.getColumnName();
      if (!visitedEdges.contains(edgeKey)) {
        ColumnLineageEdge edge = createEdge(upstreamVertex, currentVertex);
        edges.add(edge);
        visitedEdges.add(edgeKey);
      }
      traverseUpstreamColumnVertex(upstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);
    }
  }
  
private void traverseDownstreamColumnVertex(
      ColumnVertex currentVertex,
      List<ColumnLineageNode> nodes,
      List<ColumnLineageEdge> edges,
      Set<String> visitedNodes,
      Set<String> visitedEdges,
      Map<String, List<ColumnVertex>> cache) {
    List<ColumnVertex> downstreamVertices;
    if (cache.containsKey(currentVertex.getName())) {
      downstreamVertices = cache.get(currentVertex.getName());
    } else {
      downstreamVertices = embeddedGraphService.findDownstreamColumnVertex(currentVertex);
      cache.put(currentVertex.getName(), downstreamVertices);
    }
    for (ColumnVertex downstreamVertex : downstreamVertices) {
      String nodeKey = downstreamVertex.getDatabaseName() + "." + downstreamVertex.getTableName();
      if (!visitedNodes.contains(nodeKey)) {
        ColumnLineageNode downstreamNode =
            ColumnLineageNode.builder()
                .databaseName(downstreamVertex.getDatabaseName())
                .tableName(downstreamVertex.getTableName())
                .type(TableType.EXTERNAL_TABLE.getDesc())
                .build();
        nodes.add(downstreamNode);
        visitedNodes.add(nodeKey);
      }
      String edgeKey =
          currentVertex.getDatabaseName()
              + currentVertex.getTableName()
              + currentVertex.getColumnName()
              + downstreamVertex.getDatabaseName()
              + downstreamVertex.getTableName()
              + downstreamVertex.getColumnName();
      if (!visitedEdges.contains(edgeKey)) {
        ColumnLineageEdge edge = createEdge(currentVertex, downstreamVertex);
        edges.add(edge);
        visitedEdges.add(edgeKey);
      }
      traverseDownstreamColumnVertex(
          downstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);
    }
  }

标签:String,List,表字,currentVertex,ColumnVertex,table,neo4j,段级,name
From: https://www.cnblogs.com/pandacode/p/17648359.html

相关文章

  • Neo4j Neo.TransientError.General.MemoryPoolOutOfMemoryError
    Theallocationofanextra4.0MiBwouldusemorethanthelimit2.0GiB.Currentlyusing2.0GiB.dbms.memory.transaction.total.maxthresholdreached#通过语句查看,发现捞出来的数据太多,导致内存溢出MATCH(cwt:Persion)RETURNcwt......
  • Springboot内嵌neo4j配置
    环境说明MacOSAppleM1|Jdk17|Maven3.8.5|SpringBoot2.6.9|neo4j5.10.0注:neo4j内嵌最大的坑就是版本兼容性,所以引入前一定检查neo4j与springboot版本兼容性,其次neo4j各版本间配置使用上,区别也挺大的,本文只针对特定版本,因此建议更多参考官网文档,有最新的配置......
  • (Java实体类比表字段多处理方案)注解忽略实体类属性
    背景实体类多添加了几个字段用于查询,如果项目中使用了mybatis或mybatisplus会导致找不到表中字段的错误Causedby:java.sql.SQLSyntaxErrorException:Unknowncolumn'create_start_time'in'fieldlist'解决项目中使用mybatisimportorg.springframework.data.annotat......
  • 知识图谱(Knowledge Graph)- Neo4j 5.10.0 使用 - CQL
    删除数据库中以往的图MATCH(n)DETACHDELETEn创建节点CREATE命令语法Neo4jCQL“CREATE”命令用于创建没有属性的节点。它只是创建一个没有任何数据的节点。CREATE(<node-name>:<label-name>{<Property1-name>:<Property1-Value>..............
  • 知识图谱(Knowledge Graph)- Neo4j 5.10.0 Desktop & GraphXR 连接自建数据库
    #输入查看数据库连接neo4j$:serverstatus添加远程连接,输入连接地址GraphApps选择GraphXR打开显示......
  • 知识图谱(Knowledge Graph)- Neo4j 5.10.0 Desktop & GraphXR
    下载地址:https://neo4j.com/download/安装下载时会产生激活码(保存下来)下载完成后安装运行后,输入激活码进入主页面运行自带的电影知识谱图测试是否安装成功安装GraphXRhttps://neo4j.com/blog/graphxr-graph-app-neo4j-desktop/输入:https://graphxr.kineviz.com......
  • 知识图谱(Knowledge Graph)- Neo4j 5.10.0 Docker 安装
    知识图谱(KnowledgeGraph)-Neo4j5.10.0Docker安装知识图谱(KnowledgeGraph)-Neo4j5.10.0CentOS安装https://neo4j.com/docs/operations-manual/5/docker/introduction/拉镜像[root@localhost~]#cat/etc/docker/daemon.json{"registry-mirrors":["http......
  • 知识图谱(Knowledge Graph)- Neo4j 5.10.0 Docker 安装
    拉镜像[root@localhost~]#cat/etc/docker/daemon.json{"registry-mirrors":["https://XXX.mirror.aliyuncs.com"]#阿里镜像源}#拉取镜像[root@localhost~]#dockerpullneo4j:5.10.0运行#创建目录[root@localhost~]#mkdir-p/opt/neo4j#--envNEO......
  • 知识图谱(Knowledge Graph)- Neo4j 5.10.0 CentOS 安装
    系统需求版本JDKCPU内存硬盘Neo4j5.x17Intelx86-x64Corei3minimum,Corei7recommended.AMDx86-x64,MacARM.最低2GB,推荐16GB+10G+Neo4j5.x11Neo4j5.x8JDK17下载:https://www.oracle.com/java/technologies/downloads/#java17基于jar的......
  • 知识图谱(Knowledge Graph)- Neo4j 5.10.0 CentOS 安装
    系统需求版本JDKCPU内存硬盘Neo4j5.x17Intelx86-x64Corei3minimum,Corei7recommended.AMDx86-x64,MacARM.最低2GB,推荐16GB+10G+Neo4j5.x11Neo4j5.x8JDK17下载:https://www.oracle.com/java/technologies/do......