跳到主要内容
版本:3.12.0

NebulaGraph Spark Connector

NebulaGraph Spark Connector 是一个 Spark 连接器,提供通过 Spark 标准形式读写 悦数图数据库 数据的能力。NebulaGraph Spark Connector 由 Reader 和 Writer 两部分组成。

  • Reader

    提供一个 Spark SQL 接口,用户可以使用该接口编程读取 悦数图数据库 图数据,单次读取一个点或 Edge type 的数据,并将读取的结果组装成 Spark 的 DataFrame。

  • Writer

    提供一个 Spark SQL 接口,用户可以使用该接口编程将 DataFrame 格式的数据逐条或批量写入 悦数图数据库

更多使用说明请参见 NebulaGraph Spark Connector

版本兼容性

NebulaGraph Spark Connector、 悦数图数据库 内核版本和 Spark 版本对应关系如下。

Spark Connector 版本悦数图数据库 版本Spark 版本
nebula-spark-connector_3.0-3.0-SNAPSHOT.jarnightly3.x
nebula-spark-connector_2.2-3.0-SNAPSHOT.jarnightly2.2.x
nebula-spark-connector-3.0-SNAPSHOT.jarnightly2.4.x
nebula-spark-connector_3.0-3.8.0.jar3.x3.x
nebula-spark-connector_2.2-3.8.0.jar3.x2.2.x
nebula-spark-connector-3.8.0.jar3.x2.4.x
nebula-spark-connector_3.0-3.6.0.jar3.x3.x
nebula-spark-connector_2.2-3.6.0.jar3.x2.2.x
nebula-spark-connector-3.6.0.jar3.x2.4.x
nebula-spark-connector_2.2-3.4.0.jar3.x2.2.x
nebula-spark-connector-3.4.0.jar3.x2.4.x
nebula-spark-connector_2.2-3.3.0.jar3.x2.2.x
nebula-spark-connector-3.3.0.jar3.x2.4.x
nebula-spark-connector-3.0.0.jar3.x2.4.x
nebula-spark-connector-2.6.1.jar2.6.0, 2.6.12.4.x
nebula-spark-connector-2.6.0.jar2.6.0, 2.6.12.4.x
nebula-spark-connector-2.5.1.jar2.5.0, 2.5.12.4.x
nebula-spark-connector-2.5.0.jar2.5.0, 2.5.12.4.x
nebula-spark-connector-2.1.0.jar2.0.0, 2.0.12.4.x
nebula-spark-connector-2.0.1.jar2.0.0, 2.0.12.4.x
nebula-spark-connector-2.0.0.jar2.0.0, 2.0.12.4.x

适用场景

NebulaGraph Spark Connector 适用于以下场景:

  • 读取悦数图数据库数据进行分析计算。
  • 分析计算完的数据写入悦数图数据库
  • 迁移悦数图数据库数据。
  • 结合 NebulaGraph Algorithm 进行图计算。

特性

NebulaGraph Spark Connector 3.8.0版本特性如下:

  • 提供多种连接配置项,如超时时间、连接重试次数、执行重试次数等。

  • 提供多种数据配置项,如写入数据时设置对应列为点 ID、起始点 ID、目的点 ID 或属性。

  • Reader 支持无属性读取和全属性读取。

  • Reader 支持将 悦数图数据库 数据读取成 Graphx 的 VertexRDD 和 EdgeRDD,支持非 Long 型点 ID。

  • 统一了 SparkSQL 的扩展数据源,统一采用 DataSourceV2 进行 悦数图数据库 数据扩展。

  • 支持insertupdatedelete三种写入模式。insert模式会插入(覆盖)数据,update模式仅会更新已存在的数据,delete模式只删除数据。

  • 支持与 悦数图数据库 之间的 SSL 加密连接。

更新说明

Release notes

获取 NebulaGraph Spark Connector

编译打包

  1. 克隆仓库nebula-spark-connector
$ git clone -b release-3.8 https://github.com/vesoft-inc/nebula-spark-connector.git
  1. 进入目录nebula-spark-connector

  2. 编译打包。不同版本的 Spark 命令略有不同。

备注

需已安装对应版本 Spark。

  • Spark 2.4

    $ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true -pl nebula-spark-connector -am -Pscala-2.11 -Pspark-2.4
  • Spark 2.2

    $ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true -pl nebula-spark-connector_2.2 -am -Pscala-2.11 -Pspark-2.2
  • Spark 3.x

    $ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true -pl nebula-spark-connector_3.0 -am -Pscala-2.12 -Pspark-3.0

编译完成后,在目录的文件夹target下生成类似文件nebula-spark-connector-3.8.0-SHANPSHOT.jar

Maven 远程仓库下载

下载地址

使用方法

使用 NebulaGraph Spark Connector 读写 悦数图数据库 时,只需要编写以下代码即可实现。

# 从 <ProductName /> 读取点边数据。
spark.read.nebula().loadVerticesToDF()
spark.read.nebula().loadEdgesToDF()

# 将 dataframe 数据作为点和边写入 <ProductName /> 中。
dataframe.write.nebula().writeVertices()
dataframe.write.nebula().writeEdges()

nebula()接收两个配置参数,包括连接配置和读写配置。

备注

如果数据的属性值包含中文字符,可能出现乱码。请在提交 Spark 任务时加上以下选项:

--conf spark.driver.extraJavaOptions=-Dfile.encoding=utf-8
--conf spark.executor.extraJavaOptions=-Dfile.encoding=utf-8

悦数图数据库 读取数据

val config = NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withConenctionRetry(2)
.withExecuteRetry(2)
.withTimeout(6000)
.build()

val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withUser("root")
.withPasswd("nebula")
.withSpace("test")
.withLabel("person")
.withNoColumn(false)
.withReturnCols(List("birthday"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()

val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withUser("root")
.withPasswd("nebula")
.withSpace("test")
.withLabel("knows")
.withNoColumn(false)
.withReturnCols(List("degree"))
.withLimit(10)
.withPartitionNum(10)
.build()
val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
  • NebulaConnectionConfig是连接 悦数图数据库 的配置,说明如下。

    参数是否必须说明
    withMetaAddress所有 Meta 服务的地址,多个地址用英文逗号(,)隔开,格式为ip1:port1,ip2:port2,...。读取数据不需要配置withGraphAddress
    withConnectionRetryNebulaGraph Java Client 连接 悦数图数据库 的重试次数。默认值为1
    withExecuteRetryNebulaGraph Java Client 执行查询语句的重试次数。默认值为1
    withTimeoutNebulaGraph Java Client 请求响应的超时时间。默认值为6000,单位:毫秒(ms)。
  • ReadNebulaConfig是读取 悦数图数据库 数据的配置,说明如下。

    参数是否必须说明
    withUser悦数图数据库用户名。Storage 服务要求身份认证时,需要填写该参数。仅 悦数图数据库 企业版支持该参数。
    withPasswd悦数图数据库用户名对应的密码。Storage 服务要求身份认证时,需要填写该参数。仅 悦数图数据库 企业版支持该参数。
    withSpace悦数图数据库 图空间名称。
    withLabel悦数图数据库 图空间内的 Tag 或 Edge type 名称。
    withNoColumn是否不读取属性。默认值为false,表示读取属性。取值为true时,表示不读取属性,此时withReturnCols配置无效。
    withReturnCols配置要读取的点或边的属性集。格式为List(property1,property2,...),默认值为List(),表示读取全部属性。
    withLimit配置 NebulaGraph Java Storage Client 一次从服务端读取的数据行数。默认值为 1000。
    withPartitionNum配置读取 悦数图数据库 数据时 Spark 的分区数。默认值为 100。该值的配置最好不超过图空间的的分片数量(partition_num)。

悦数图数据库 写入数据

备注
  • DataFrame 中的列会自动作为属性写入 悦数图数据库
  • 请确保 DataFrame 中的列名和悦数图数据库中的属性名一致。若不一致,可通过DataFrame.withColumnRenamed方法修改列名。
val config = NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withGraphAddress("127.0.0.1:9669")
.withConenctionRetry(2)
.build()

val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace("test")
.withTag("person")
.withVidField("id")
.withVidPolicy("hash")
.withVidAsProp(true)
.withUser("root")
.withPasswd("nebula")
.withBatch(512)
.build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()

val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
.builder()
.withSpace("test")
.withEdge("friend")
.withSrcIdField("src")
.withSrcPolicy(null)
.withDstIdField("dst")
.withDstPolicy(null)
.withRankField("degree")
.withSrcAsProperty(true)
.withDstAsProperty(true)
.withRankAsProperty(true)
.withUser("root")
.withPasswd("nebula")
.withBatch(512)
.build()
df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()

默认写入模式为insert,可以通过withWriteMode配置修改为updatedelete

val config = NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withGraphAddress("127.0.0.1:9669")
.build()
val nebulaWriteVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace("test")
.withTag("person")
.withVidField("id")
.withVidAsProp(true)
.withBatch(512)
.withWriteMode(WriteMode.UPDATE)
.build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
  • NebulaConnectionConfig是连接 悦数图数据库 的配置,说明如下。

    参数是否必须说明
    withMetaAddress所有 Meta 服务的地址,多个地址用英文逗号(,)隔开,格式为ip1:port1,ip2:port2,...
    withGraphAddressGraph 服务的地址,多个地址用英文逗号(,)隔开,格式为ip1:port1,ip2:port2,...
    withConnectionRetryNebulaGraph Java Client 连接 悦数图数据库 的重试次数。默认值为1
  • WriteNebulaVertexConfig是写入点的配置,说明如下。

    参数是否必须说明
    withSpace悦数图数据库 图空间名称。
    withTag写入点时需要关联的 Tag 名称。
    withVidFieldDataFrame 中作为点 ID 的列。
    withVidPolicy写入点 ID 时,采用的映射函数,悦数图数据库 仅支持 HASH。默认不做映射。
    withVidAsPropDataFrame 中作为点 ID 的列是否也作为属性写入。默认值为false。如果配置为true,请确保 Tag 中有和VidField相同的属性名。
    withUser悦数图数据库 用户名。若未开启身份验证,无需配置用户名和密码。
    withPasswd悦数图数据库 用户名对应的密码。
    withBatch一次写入的数据行数,默认值为512。当withWriteModeupdate时,该参数的最大值为512
    withWriteMode写入模式。可选值为insertupdatedelete。默认为insert
    withDeleteEdge删除点时是否删除该点关联的边。默认为false。当withWriteModedelete时生效。
  • WriteNebulaEdgeConfig是写入边的配置,说明如下。

    参数是否必须说明
    withSpace悦数图数据库 图空间名称。
    withEdge写入边时需要关联的 Edge type 名称。
    withSrcIdFieldDataFrame 中作为起始点的列。
    withSrcPolicy写入起始点时,采用的映射函数,悦数图数据库 仅支持 HASH。默认不做映射。
    withDstIdFieldDataFrame 中作为目的点的列。
    withDstPolicy写入目的点时,采用的映射函数,悦数图数据库 仅支持 HASH。默认不做映射。
    withRankFieldDataFrame 中作为 rank 的列。默认不写入 rank。
    withSrcAsPropertyDataFrame 中作为起始点的列是否也作为属性写入。默认值为false。如果配置为true,请确保 Edge type 中有和SrcIdField相同的属性名。
    withDstAsPropertyDataFrame 中作为目的点的列是否也作为属性写入。默认值为false。如果配置为true,请确保 Edge type 中有和DstIdField相同的属性名。
    withRankAsPropertyDataFrame 中作为 rank 的列是否也作为属性写入。默认值为false。如果配置为true,请确保 Edge type 中有和RankField相同的属性名。
    withUser悦数图数据库 用户名。若未开启身份验证,无需配置用户名和密码。
    withPasswd悦数图数据库 用户名对应的密码。
    withBatch一次写入的数据行数,默认值为512。当withWriteModeupdate时,该参数的最大值为512
    withWriteMode写入模式。可选值为insertupdatedelete。默认为insert

示例代码

详细的使用方式参见 示例代码