我有如下执行成功的Neo4j Cypher语句,筛选两个节点并为其连接一条新的边:
match (n:entity) where id(n)=188 with n
match (m:entity) where id(m)=189 with m, n
create (n)-[r:relation {type:'test'}]->(m)
现在需要使用Neo4j Spark Connector来将以上命令通过spark执行,使用pyspark方式如下(采用query来读取点):
from pyspark.sql import *
spark = SparkSession.builder.getOrCreate()
result = spark.read \
.format("org.neo4j.spark.DataSource") \
.option("url", "bolt://localhost:7687") \
.option("query", "match (n:entity) where id(n)=188 with n match (m:entity) where id(m)=189 return m, n") \
.load().toDF('m', 'n')
得到如下节点m与n
+---------------------+---------------------+
| m| n|
+---------------------+---------------------+
|{189, [entity], 好...|{188, [entity], 好...|
+---------------------+---------------------+
接下来对读取到的m和n,通过query写入新的关系:
result.write\
.format("org.neo4j.spark.DataSource")\
.mode("Overwrite")\
.option("url", "bolt://localhost:7687") \
.option("query", "create (n)-[r:relation {type:'test'}]->(m)")\
.save()
执行后发现此处使用的m和n并不是上一步得到的m和n,导致创建了两个全新的节点并为之建立了新边。
请问应如何将读取到的m和n传参到写入的query里?
val parameters = Map("id" -> 1)
graph.cypher("MERGE (n:User {id: {id}})").on("id" -> 1).run
val parametersSeq = Seq("id" -> 1)
graph.cypher("MERGE (n:User {id: {id}})").on(parametersSeq:_*).run
val parametersMap = Map("id" -> 1)
graph.cypher("MERGE (n:User {id: {id}})").on(parametersMap).run
val parametersIterable = Iterable("id" -> 1)
graph.cypher("MERGE (n:User {id: {id}})").on(parametersIterable).run
要将读取到的 m 和 n 传参到写入的 query,可以使用 PySpark 中的 udf 功能(用户自定义函数)。具体如下:
1.定义一个用户自定义函数,可以使用读取到的 m 和 n 参数并返回需要执行的 Cypher 语句。
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def create_query(n, m):
return "create (n)-[r:relation {type:'test'}]->(m)"
将函数注册为 UDF
udf_create_query = udf(create_query, StringType())
2.在写入操作中使用该 UDF。
应用 UDF 并写入数据
result.withColumn("query", udf_create_query(result["n"], result["m"]))
.write
.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("url", "bolt://localhost:7687")
.option("query", "query")
.save()
使用上面的代码,可以将读取到的 m 和 n 作为参数传递到写入的 Cypher 语句中。
您可以使用Neo4j Spark Connector的write()方法,并使用Cypher参数传递参数,以便在查询中使用参数值。您可以使用以下代码将参数传递给查询:
result.write \
.format("org.neo4j.spark.DataSource") \
.option("url", "bolt://localhost:7687") \
.option("query", "match (n:entity) where id(n)= {node1Id} with n match (m:entity) where id(m)= {node2Id} create (n)-[r:relation {type:'test'}]->(m)") \
.option("params", { "node1Id": 188, "node2Id": 189 }) \
.save()