neo4j spark connector使用query模式写数据的传参问题

我有如下执行成功的Neo4j Cypher语句,筛选两个节点并为其连接一条新的边:

match (n:entity) where id(n)=188 with n
match (m:entity) where id(m)=189 with m, n
create (n)-[r:relation {type:'test'}]->(m)

现在需要使用Neo4j Spark Connector来将以上命令通过spark执行,使用pyspark方式如下(采用query来读取点):

from pyspark.sql import *

spark = SparkSession.builder.getOrCreate()

result = spark.read \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("query", "match (n:entity) where id(n)=188 with n match (m:entity) where id(m)=189 return m, n") \
    .load().toDF('m', 'n')

得到如下节点m与n

+---------------------+---------------------+
|                    m|                    n|
+---------------------+---------------------+
|{189, [entity], 好...|{188, [entity], 好...|
+---------------------+---------------------+

接下来对读取到的m和n,通过query写入新的关系:

result.write\
    .format("org.neo4j.spark.DataSource")\
    .mode("Overwrite")\
    .option("url", "bolt://localhost:7687") \
    .option("query", "create (n)-[r:relation {type:'test'}]->(m)")\
    .save()

执行后发现此处使用的m和n并不是上一步得到的m和n,导致创建了两个全新的节点并为之建立了新边。

请问应如何将读取到的m和n传参到写入的query里?

  1. 使用parameters参数:在query中使用参数时,可以使用parameters参数传递参数,如:
val parameters = Map("id" -> 1)
graph.cypher("MERGE (n:User {id: {id}})").on("id" -> 1).run
  1. 使用parametersSeq参数:可以使用parametersSeq参数传递参数,如:
val parametersSeq = Seq("id" -> 1)
graph.cypher("MERGE (n:User {id: {id}})").on(parametersSeq:_*).run
  1. 使用parametersMap参数:可以使用parametersMap参数传递参数,如:
val parametersMap = Map("id" -> 1)
graph.cypher("MERGE (n:User {id: {id}})").on(parametersMap).run
  1. 使用parametersIterable参数:可以使用parametersIterable参数传递参数,如:
val parametersIterable = Iterable("id" -> 1)
graph.cypher("MERGE (n:User {id: {id}})").on(parametersIterable).run

要将读取到的 m 和 n 传参到写入的 query,可以使用 PySpark 中的 udf 功能(用户自定义函数)。具体如下:

1.定义一个用户自定义函数,可以使用读取到的 m 和 n 参数并返回需要执行的 Cypher 语句。

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def create_query(n, m):
return "create (n)-[r:relation {type:'test'}]->(m)"

将函数注册为 UDF


udf_create_query = udf(create_query, StringType())

2.在写入操作中使用该 UDF。

应用 UDF 并写入数据

result.withColumn("query", udf_create_query(result["n"], result["m"]))
.write
.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("url", "bolt://localhost:7687")
.option("query", "query")
.save()

使用上面的代码,可以将读取到的 m 和 n 作为参数传递到写入的 Cypher 语句中。


您可以使用Neo4j Spark Connector的write()方法,并使用Cypher参数传递参数,以便在查询中使用参数值。您可以使用以下代码将参数传递给查询:
result.write \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("query", "match (n:entity) where id(n)= {node1Id} with n match (m:entity) where id(m)= {node2Id} create (n)-[r:relation {type:'test'}]->(m)") \
    .option("params", { "node1Id": 188, "node2Id": 189 }) \
    .save()