图解JanusGraph系列-生成Hbase file离线批量导入方案

源码分析github地址,包含图数据库序列化逻辑分析,下述介绍结合源码分析,应该可以减少大家对这种导入方式花费的时间;

大家好,我是洋仔,JanusGraph图解系列文章,实时更新~

图数据库文章总目录:

**源码分析相关可查看github(码文不易,求个star~)**: https://github.com/YYDreamer/janusgraph

版本:JanusGraph-0.5.2

转载文章请保留以下声明:

作者:洋仔聊编程

微信公众号:匠心Java

原文地址:https://liyangyang.blog.csdn.net/

正文

源码分析github地址,包含图数据库序列化逻辑分析,下述介绍结合源码分析,应该可以减少大家对这种导入方式花费的时间,github地址见上述;

本文涉及以下部分:

  1. 离线导入的理论前提
  2. 离线导入的流程
  3. 离线导入数据的验证
  4. 离线导入的现状

一:离线导入的前提

1、数据相关

  • 大规模导入的情况下使用,过亿级别
  • 针对要离线导入的数据要做充分数据探查
  • 导入时,尽量导入相关联的数据;例如:user节点 + phone节点 + user_login_phone_number边 在同一批次导入;从而尽量保证相关联数据在同一分区

2、技术点

二:离线导入流程

  • 项目初始化阶段
  • 数据准备阶段
  • 序列化阶段
  • 验证阶段

项目初始化阶段

  1. 调用API接口获取JanusGraph的图实例和当前图实例的Transaction

  2. 调用janusgraph对应的API获取图库中所有的label节点的id,包括 节点label、边label、属性label、索引label; 并保存到内存中,等待使用

    1
    QueryUtil.getVertices(consistentTx, BaseKey.SchemaName, typeName)
  3. 保存属性和对应的索引之间的关系

  4. 占用id block;需要多少,占用多少,block size可以自己配置(数据库+号段模式)

    1
    2
    3
    4
    // partition 分区
    // idNamespace id生成器的命名空间,包含多种,此处默认是NORMAL_VERTEX
    // 超时时间
    public IDBlock getIDBlock(int partition, int idNamespace, Duration timeout);

上述,我们拿到了:

  • schema所有label的节点值

  • 属性和索引的对应关系

  • 占用了一批id生成范围(避免离线导入和线上导入自动生成的id重复)

  1. 获取该批次要导入Vertex的总数量vertex_num; 将相应id block缓存到本地vertexBlocks中;
  2. 获取该批次要导入Vertex中的property总量 ( 所有属性的的总数量 + vertex节点总数量*2) = property_num; 获取满足数量的统一partition中的连续的block;
  3. 获取该批次要导入Edge的总数量num; 将相应block缓存到本地edgeBlocks中;
  4. 打包成jar包上传到指定服务器,作为id生成器 和 序列化工具包
  5. 生成需要的vertex id数量,备用
  6. 生成需要的edge id数量,备用

数据准备阶段

  1. 将节点数据和唯一vertex id整合;

  2. 对于节点,需要添加index值,index为从0开始;

    1. 获取导入批次数据的最大属性个数num+2,作为step
    2. 将vertex id和 index+=step值和原始数据整合(注意: 此处的index需要对一批次数据所有节点统一赋值! 统一赋值index后可分批序列化;)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    {
    "index":16878090,
    "label":"user",
    "propertyMap":{
    "create_time":"2016-12-09 02:29:26",
    "productid":"2",
    "real_name":"张三",
    "user_id":"4882374234234"
    },
    "vertexId":197596753968
    }
  3. 边数据和唯一edge id整合;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    {
    "edgeId":17514510,
    "label":"user_login_phone_number",
    "propertyMap":{
    "productid":"2"
    },
    "sourceId":197596753968,
    "targetId":40964208
    }

序列化阶段

主要是抽取出源码中的序列化逻辑; 在序列化逻辑中使用到了上述的:schema所有label的节点值、属性和索引的对应关系、唯一id等

  1. 序列化节点
1
2
3
4
5
6
7
8
9
10
jsonResult = {ArrayList@1231}  size = 8
0 = "{"family":"101","qualifier":"2","rowKey":"48 0 0 1 112 13 121 -128","timestamp":"1595490235427","value":"0 1 9 24 77 0 44 -122"}"
1 = "{"family":"101","qualifier":"36","rowKey":"48 0 0 1 112 13 121 -128","timestamp":"1595490235427","value":"60 -115 9 24 77 0 48 -122 -1"}"
2 = "{"family":"101","qualifier":"80 -64","rowKey":"48 0 0 1 112 13 121 -128","timestamp":"1595490235427","value":"-96 50 48 49 54 45 49 50 45 48 57 32 48 50 58 50 57 58 50 -74 9 24 77 0 52 -122"}"
3 = "{"family":"101","qualifier":"80 -96","rowKey":"48 0 0 1 112 13 121 -128","timestamp":"1595490235427","value":"-96 -78 9 24 77 0 56 -122"}"
4 = "{"family":"101","qualifier":"81 -128","rowKey":"48 0 0 1 112 13 121 -128","timestamp":"1595490235427","value":"-96 85 95 51 53 50 49 57 48 -71 9 24 77 0 60 -122"}"
5 = "{"family":"101","qualifier":"83 -64","rowKey":"48 0 0 1 112 13 121 -128","timestamp":"1595490235427","value":"-88 -27 -68 -96 -23 -94 -106 9 24 77 0 64 -122"}"

6 = "{"family":"103","qualifier":"0","rowKey":"111 -103 32 103 48 -119 -96 85 95 51 53 50 49 57 48 -71","timestamp":"1595490235427","value":"5 96 13 60 96 -80"}"
7 = "{"family":"103","qualifier":"0 5 96 13 60 96 -80","rowKey":"-73 -16 125 -9 68 -119 -88 -27 -68 -96 -23 -94 -106","timestamp":"1595490235427","value":"5 96 13 60 96 -80"}"

101(边类型):每个自定义属性、节点存在属性、节点和label的边

103(索引类型):属性对应的索引

  1. 序列化边

image-20200723155133550

1
2
3
jsonResult = {ArrayList@1237}  size = 2
0 = "{"family":"101","qualifier":"121 -64 -96 19 68 32 112","rowKey":"112 0 0 0 0 0 0 -128","timestamp":"1595490669289","value":"8 45 0 -114 -64 -96 -78"}"
1 = "{"family":"101","qualifier":"121 -63 -128 32 112","rowKey":"112 0 0 0 0 19 -120 -128","timestamp":"1595490669289","value":"8 45 0 -114 -64 -96 -78"}"

101(边类型):出边、入边

103(索引类型):属性对应的索引

  1. 将序列化后的数据根据rowkey,family,column 进行三级排序

为什么要进行排序?

image-20200724144513449

sorted by id: rowkey

sorted by type: family(101 103)

sorted by sort key:

image-20200724144743522

  1. 将序列化排序后的数据生成hfile

  2. 将生成的hfile导入到hbse中

离线导入验证阶段

  1. 节点、边数量验证

    1. 保留导入前的数量:节点、边的总数量;和导入数据同类型的边、节点总数量;
    2. 导入数据后查询:节点、边的总量;和导入数据同类型的边、节点总数量;
    3. 前后两次查询的相关数量和导入数据的数量,前后进行比对
  2. 节点、边内容抽样抽样验证

    1. 导入数据后,从导入的源数据中抽取部分数据,使用gremlin语句进行查询相对应的节点,并比对图中节点、边内容和源数据内容是否相同
  3. 节点和边的对应关系验证

    1. 抽样查询相应节点,并通过gremlin获取节点对应的边数据,和源数据进行比较
    2. 抽样查询相应的边,并通过gremlin获取边对应的节点数据,和源数据进行比较
  4. 节点、边索引抽样验证

    1. 使用节点对应的唯一索引查询数据,确保可以查询出对应数据并且数据内容和源数据相同
    2. 使用节点对应的唯一索引查询对应数量和内容,确保可以查询出对应数据并且数据内容和源数据相同
    3. 使用边对应的索引查询数据,确保可以查询出对应数据并且数据内容和源数据相同

三:离线导入的验证

前提:

  1. 两个schema一致的图
  2. 图中通过janusgraph api 或者 离线导入相同的数据
  1. 离线导入数据 对 线上图中已有数据是否有影响验证
  2. 通过janusgraph api导入数据 和 离线导入数据一致性验证
  3. 离线导入的节点的数量、内容、索引验证
  4. 离线导入的边的数量、内容、索引验证
  5. 离线导入的数据的 修改、删除验证
  6. 是否影响schema的相关操作验证

四:现状

  • 只支持数据插入,不支持数据删除和修改

  • 需要在图T+1更新时间段之外或者停止图线上的T+1数据更新之后,再离线的批量数据导入

  • 当前不支持外部索引的添加

可以通过导入数据后,调用janusgraph的索引重建语句进行重建

获取es的索引的相关逻辑,直接调用es rest api插入

作者

洋仔

发布于

2021-03-03

更新于

2021-03-03

许可协议


评论