You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
898 lines
37 KiB
898 lines
37 KiB
import json
|
|
import traceback
|
|
import datetime
|
|
import random
|
|
import time
|
|
from urllib.parse import unquote
|
|
from util.neo4j_utils import neo4j_client
|
|
|
|
|
|
class OperationService:
|
|
def __init__(self):
|
|
self.db = neo4j_client
|
|
|
|
# --- 0. 内部辅助工具:格式标准化 ---
|
|
def _format_node_data(self, node):
|
|
"""
|
|
统一转换前端传来的平铺或嵌套 JSON 格式。
|
|
输出标准结构: { "labels": [...], "properties": {...} }
|
|
"""
|
|
# 1. 提取并标准化 labels
|
|
raw_labels = node.get("labels") or node.get("label")
|
|
if isinstance(raw_labels, str):
|
|
labels = [raw_labels]
|
|
elif isinstance(raw_labels, list):
|
|
labels = raw_labels
|
|
else:
|
|
labels = []
|
|
|
|
# 2. 提取并标准化 properties
|
|
if "properties" in node and isinstance(node["properties"], dict):
|
|
# 嵌套格式:直接取属性字典
|
|
props = node["properties"]
|
|
else:
|
|
# 平铺格式:打包除特殊键外的所有键值对
|
|
props = {
|
|
k: v for k, v in node.items()
|
|
if k not in ["label", "labels", "identity", "elementId"]
|
|
}
|
|
|
|
return {"labels": labels, "properties": props}
|
|
|
|
def _normalize_rel_data(self, item):
|
|
"""
|
|
参考节点导入的逻辑,统一转换关系数据。
|
|
支持:标准嵌套 JSON、平铺 JSON、以及带有 properties 包装的格式。
|
|
"""
|
|
def clean_str(val):
|
|
return str(val).strip() if val is not None else None
|
|
|
|
# 1. 尝试识别标准嵌套结构 (start/end 对象)
|
|
if isinstance(item.get("start"), dict) and isinstance(item.get("end"), dict):
|
|
s_node = item["start"]
|
|
e_node = item["end"]
|
|
|
|
# 使用获取属性的通用逻辑 ( properties 优先 )
|
|
s_props = s_node.get("properties") if isinstance(s_node.get("properties"), dict) else s_node
|
|
e_props = e_node.get("properties") if isinstance(e_node.get("properties"), dict) else e_node
|
|
|
|
# 提取标签 (取第一个标签用于精确匹配)
|
|
s_label = s_node.get("labels", [""])[0] if s_node.get("labels") else ""
|
|
e_label = e_node.get("labels", [""])[0] if e_node.get("labels") else ""
|
|
|
|
# 获取关系信息
|
|
rel_obj = {}
|
|
if item.get("segments") and len(item["segments"]) > 0:
|
|
rel_obj = item["segments"][0].get("relationship", {})
|
|
else:
|
|
rel_obj = item.get("relationship", {})
|
|
|
|
r_props = rel_obj.get("properties") if isinstance(rel_obj.get("properties"), dict) else rel_obj
|
|
|
|
return {
|
|
"source_name": clean_str(s_props.get("name")),
|
|
"source_label": s_label,
|
|
"target_name": clean_str(e_props.get("name")),
|
|
"target_label": e_label,
|
|
"rel_type": clean_str(rel_obj.get("type")),
|
|
"rel_label": clean_str(r_props.get("label") or r_props.get("name") or "")
|
|
}
|
|
|
|
# 2. 扁平格式适配
|
|
alias_map = {
|
|
"source": ["source_name", "source", "start_name", "起点"],
|
|
"target": ["target_name", "target", "end_name", "终点"],
|
|
"type": ["rel_type", "type", "relationship"],
|
|
"label": ["rel_label", "label", "关系标签"]
|
|
}
|
|
|
|
def find_value(keys):
|
|
for k in keys:
|
|
val = item.get(k)
|
|
if val and not isinstance(val, dict): return val
|
|
return None
|
|
|
|
return {
|
|
"source_name": clean_str(find_value(alias_map["source"])),
|
|
"source_label": clean_str(item.get("source_label") or ""),
|
|
"target_name": clean_str(find_value(alias_map["target"])),
|
|
"target_label": clean_str(item.get("target_label") or ""),
|
|
"rel_type": clean_str(find_value(alias_map["type"])),
|
|
"rel_label": clean_str(find_value(alias_map["label"])) or ""
|
|
}
|
|
|
|
# --- 0. 数据修复工具 ---
|
|
def fix_all_missing_node_ids(self):
|
|
try:
|
|
check_cypher = "MATCH (n) WHERE n.nodeId IS NULL OR n.nodeId = 0 OR n.nodeId = '0' RETURN count(n) as cnt"
|
|
res = self.db.execute_read(check_cypher)
|
|
if not res or res[0]['cnt'] == 0:
|
|
return {"success": True, "msg": "没有需要修复的节点"}
|
|
|
|
update_cypher = """
|
|
MATCH (n)
|
|
WHERE n.nodeId IS NULL OR n.nodeId = 0 OR n.nodeId = '0'
|
|
WITH n, toInteger(100000 + rand() * 899999) as newId
|
|
SET n.nodeId = newId
|
|
RETURN count(n) as fixedCount
|
|
"""
|
|
result = self.db.execute_write_and_return(update_cypher)
|
|
return {"success": True, "msg": f"修复完成,共处理 {result[0]['fixedCount']} 个节点"}
|
|
except Exception as e:
|
|
return {"success": False, "msg": f"修复失败: {str(e)}"}
|
|
|
|
# --- 1. 全局统计接口 ---
|
|
def get_kg_stats(self):
|
|
try:
|
|
today_str = datetime.datetime.now().strftime('%Y-%m-%d')
|
|
cypher = """
|
|
CALL () {
|
|
MATCH (n) RETURN count(n) AS totalNodes
|
|
}
|
|
CALL () {
|
|
MATCH ()-[r]->() RETURN count(r) AS totalRels
|
|
}
|
|
CALL () {
|
|
MATCH (n) WHERE n.createTime STARTS WITH $today RETURN count(n) AS todayNodes
|
|
}
|
|
RETURN totalNodes, totalRels, todayNodes
|
|
"""
|
|
results = self.db.execute_read(cypher, {"today": today_str})
|
|
if results:
|
|
return {
|
|
"success": True,
|
|
"data": {
|
|
"totalNodes": results[0]['totalNodes'],
|
|
"totalRels": results[0]['totalRels'],
|
|
"todayNodes": results[0]['todayNodes']
|
|
}
|
|
}
|
|
return {"success": False, "msg": "未能获取统计数据"}
|
|
except Exception as e:
|
|
print(f"Stats Error: {e}")
|
|
return {"success": False, "data": {"totalNodes": 0, "totalRels": 0, "todayNodes": 0}}
|
|
|
|
# --- 2. 节点查询 ---
|
|
def get_nodes_subset(self, page=1, page_size=20, name=None, label=None):
|
|
try:
|
|
skip_val = (int(page) - 1) * int(page_size)
|
|
limit_val = int(page_size)
|
|
conditions = []
|
|
params = {"skip": skip_val, "limit": limit_val}
|
|
|
|
if name:
|
|
decoded_name = unquote(str(name)).strip()
|
|
conditions.append("n.name CONTAINS $name")
|
|
params["name"] = decoded_name
|
|
|
|
if label and str(label).strip() and label not in ["全部", ""]:
|
|
# 使用标准的标签匹配语法
|
|
params["label"] = str(label).strip()
|
|
conditions.append("$label IN labels(n)")
|
|
|
|
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
|
|
|
|
cypher = f"""
|
|
MATCH (n)
|
|
{where_clause}
|
|
WITH count(n) AS total_count
|
|
MATCH (n)
|
|
{where_clause}
|
|
RETURN elementId(n) AS id, labels(n) AS labels, n.name AS name, n.nodeId AS nodeId, total_count AS total
|
|
ORDER BY coalesce(n.createTime, '0000-00-00') DESC, toInteger(coalesce(n.nodeId, 0)) DESC
|
|
SKIP $skip LIMIT $limit
|
|
"""
|
|
|
|
raw_data = self.db.execute_read(cypher, params)
|
|
if not raw_data:
|
|
return {"items": [], "total": 0}
|
|
|
|
items = []
|
|
for item in raw_data:
|
|
db_node_id = item.get("nodeId")
|
|
items.append({
|
|
"id": item["id"],
|
|
"nodeId": db_node_id if (db_node_id and db_node_id != 0 and db_node_id != '0') else item["id"],
|
|
"labels": item["labels"],
|
|
"name": item.get("name") or "N/A"
|
|
})
|
|
return {"items": items, "total": raw_data[0]['total']}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return {"items": [], "total": 0}
|
|
|
|
# --- 3. 关系查询 ---
|
|
def get_relationships_subset(self, page=1, page_size=20, source=None, target=None, rel_type=None):
|
|
try:
|
|
skip_val = (int(page) - 1) * int(page_size)
|
|
limit_val = int(page_size)
|
|
conditions = []
|
|
params = {"skip": skip_val, "limit": limit_val}
|
|
|
|
if source:
|
|
params["source"] = unquote(str(source)).strip()
|
|
conditions.append("a.name CONTAINS $source")
|
|
if target:
|
|
params["target"] = unquote(str(target)).strip()
|
|
conditions.append("b.name CONTAINS $target")
|
|
if rel_type and rel_type not in ["全部", ""]:
|
|
conditions.append("type(r) = $rel_type")
|
|
params["rel_type"] = str(rel_type).strip()
|
|
|
|
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
|
|
|
|
cypher = f"""
|
|
MATCH (a)-[r]->(b)
|
|
{where_clause}
|
|
WITH count(r) AS total_count
|
|
MATCH (a)-[r]->(b)
|
|
{where_clause}
|
|
RETURN elementId(r) as id,
|
|
type(r) as type,
|
|
r.label as label,
|
|
a.name as source,
|
|
b.name as target,
|
|
total_count
|
|
ORDER BY coalesce(r.createTime, '0000-00-00') DESC
|
|
SKIP $skip LIMIT $limit
|
|
"""
|
|
|
|
raw_data = self.db.execute_read(cypher, params)
|
|
if not raw_data:
|
|
return {"items": [], "total": 0}
|
|
|
|
items = []
|
|
for row in raw_data:
|
|
items.append({
|
|
"id": row["id"],
|
|
"type": row["type"],
|
|
"label": row["label"] if row.get("label") is not None else "",
|
|
"source": row["source"],
|
|
"target": row["target"]
|
|
})
|
|
return {"items": items, "total": raw_data[0]['total_count']}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return {"items": [], "total": 0}
|
|
|
|
# --- 4. 联想建议 ---
|
|
def suggest_nodes(self, keyword: str, label: str = None):
|
|
"""
|
|
修复后的建议逻辑:
|
|
1. 优化 Label 过滤语法,确保在 keyword 为空时也能根据 Label 返回数据。
|
|
2. 增加对空字符串的宽容处理。
|
|
"""
|
|
try:
|
|
kw = unquote(str(keyword or "")).strip()
|
|
lb = str(label).strip() if label and label not in ["全部", "", "null", "undefined"] else None
|
|
|
|
# 如果既没有关键词也没有标签,直接返回空
|
|
if not kw and not lb:
|
|
return []
|
|
|
|
params = {}
|
|
# 基础匹配语句,排除无意义节点
|
|
match_clause = "MATCH (n)"
|
|
if lb:
|
|
# 动态构建标签匹配,使用 :`label` 语法更高效且准确
|
|
match_clause = f"MATCH (n:`{lb}`)"
|
|
|
|
conditions = ["n.name <> '未命名'"]
|
|
if kw:
|
|
conditions.append("n.name CONTAINS $kw")
|
|
params["kw"] = kw
|
|
|
|
where_clause = "WHERE " + " AND ".join(conditions)
|
|
|
|
# 查询数据库
|
|
cypher = f"{match_clause} {where_clause} RETURN DISTINCT n.name as name LIMIT 15"
|
|
results = self.db.execute_read(cypher, params)
|
|
db_suggestions = [row["name"] for row in results if row.get("name")]
|
|
|
|
# 如果依然没有结果,尝试去掉 Label 限制进行全库模糊匹配(保底逻辑)
|
|
if not db_suggestions and kw and lb:
|
|
fallback_cypher = "MATCH (n) WHERE n.name CONTAINS $kw AND n.name <> '未命名' RETURN DISTINCT n.name as name LIMIT 5"
|
|
fallback_res = self.db.execute_read(fallback_cypher, {"kw": kw})
|
|
db_suggestions = [row["name"] for row in fallback_res if row.get("name")]
|
|
|
|
return db_suggestions
|
|
except Exception as e:
|
|
print(f"Suggest Error Trace: {traceback.format_exc()}")
|
|
return []
|
|
|
|
# --- 5. 节点管理 ---
|
|
def add_node(self, label: str, name: str):
|
|
try:
|
|
nm = unquote(str(name)).strip()
|
|
if not nm: return {"success": False, "msg": "名称不能为空"}
|
|
now = datetime.datetime.now()
|
|
create_time = now.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
check_cypher = "MATCH (n) WHERE n.name = $name RETURN n LIMIT 1"
|
|
existing = self.db.execute_read(check_cypher, {"name": nm})
|
|
if existing:
|
|
return {"success": False, "msg": f"添加失败:已存在名为 '{nm}' 的节点"}
|
|
|
|
new_node_id = int(time.time() * 1000)
|
|
|
|
create_cypher = f"""
|
|
CREATE (n:`{label}` {{
|
|
name: $name,
|
|
nodeId: $nodeId,
|
|
createTime: $createTime
|
|
}})
|
|
RETURN n
|
|
"""
|
|
result = self.db.execute_write_and_return(create_cypher, {
|
|
"name": nm,
|
|
"nodeId": new_node_id,
|
|
"createTime": create_time
|
|
})
|
|
if result:
|
|
return {"success": True, "msg": "添加成功", "nodeId": new_node_id}
|
|
return {"success": False, "msg": "节点创建失败"}
|
|
except Exception as e:
|
|
return {"success": False, "msg": f"写入失败: {str(e)}"}
|
|
|
|
def update_node(self, node_id: str, name: str, label: str):
|
|
try:
|
|
nm = unquote(str(name)).strip()
|
|
if not nm: return {"success": False, "msg": "名称不能为空"}
|
|
|
|
check_name = "MATCH (n) WHERE n.name = $name AND elementId(n) <> $id RETURN n LIMIT 1"
|
|
existing = self.db.execute_read(check_name, {"name": nm, "id": node_id})
|
|
if existing:
|
|
return {"success": False, "msg": f"修改失败:库中已有其他名为 '{nm}' 的节点"}
|
|
|
|
cypher = f"""
|
|
MATCH (n) WHERE elementId(n) = $id
|
|
SET n.name = $name
|
|
WITH n
|
|
REMOVE n:Drug:Disease:Symptom:Entity:Medicine:Check:Food:Operation:CheckSubject:Complication:Diagnosis:Treatment:AdjuvantTherapy:adverseReactions:Department:DiseaseSite:RelatedDisease:RelatedSymptom:SpreadWay:Stage:Subject:SymptomAndSign:TreatmentPrograms:Type:Cause:Attribute:Indications:Ingredients:Pathogenesis:PathologicalType:Pathophysiology:Precautions:Prognosis:PrognosticSurvivalTime:DiseaseRatio:DrugTherapy:Infectious:MultipleGroups:DiseaseRate
|
|
WITH n
|
|
SET n:`{label}`
|
|
RETURN n
|
|
"""
|
|
result = self.db.execute_write_and_return(cypher, {"id": node_id, "name": nm})
|
|
if result:
|
|
return {"success": True, "msg": "节点修改成功"}
|
|
else:
|
|
return {"success": False, "msg": "找不到该节点或更新失败"}
|
|
except Exception as e:
|
|
return {"success": False, "msg": str(e)}
|
|
|
|
def delete_node(self, node_id: str):
|
|
try:
|
|
cypher = "MATCH (n) WHERE elementId(n) = $id DETACH DELETE n RETURN 1 as deleted"
|
|
result = self.db.execute_write_and_return(cypher, {"id": node_id})
|
|
if result:
|
|
return {"success": True, "msg": "删除成功"}
|
|
return {"success": False, "msg": "节点不存在或已被删除"}
|
|
except Exception as e:
|
|
return {"success": False, "msg": str(e)}
|
|
|
|
def get_all_labels(self):
|
|
cypher = "CALL db.labels()"
|
|
try:
|
|
results = self.db.execute_read(cypher)
|
|
labels = [list(row.values())[0] for row in results]
|
|
return labels if labels else ["Drug", "Disease", "Symptom"]
|
|
except:
|
|
return ["Drug", "Disease", "Symptom"]
|
|
|
|
# --- 6. 关系管理 ---
|
|
def get_all_relationship_types(self):
|
|
cypher = """
|
|
MATCH ()-[r]->()
|
|
RETURN DISTINCT type(r) AS type, r.label AS label
|
|
"""
|
|
try:
|
|
results = self.db.execute_read(cypher)
|
|
type_map = []
|
|
seen_types = set()
|
|
for row in results:
|
|
t_name = row["type"]
|
|
t_label = row["label"] if row.get("label") else t_name
|
|
if t_name not in seen_types:
|
|
type_map.append({
|
|
"type": t_name,
|
|
"label": t_label
|
|
})
|
|
seen_types.add(t_name)
|
|
return type_map if type_map else []
|
|
except Exception as e:
|
|
print(f"Fetch RelTypes Error: {e}")
|
|
return []
|
|
|
|
def add_relationship(self, source_name: str, target_name: str, rel_type: str, rel_label: str):
|
|
try:
|
|
s = unquote(str(source_name)).strip()
|
|
t = unquote(str(target_name)).strip()
|
|
l = str(rel_label).strip()
|
|
clean_rel_type = rel_type.strip().replace("`", "")
|
|
create_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
check_nodes = """
|
|
OPTIONAL MATCH (a) WHERE a.name = $s
|
|
WITH a LIMIT 1
|
|
OPTIONAL MATCH (b) WHERE b.name = $t
|
|
WITH a, b LIMIT 1
|
|
RETURN a IS NOT NULL as hasA, b IS NOT NULL as hasB
|
|
"""
|
|
nodes_res = self.db.execute_read(check_nodes, {"s": s, "t": t})
|
|
|
|
if not nodes_res or not nodes_res[0]['hasA'] or not nodes_res[0]['hasB']:
|
|
err_msg = "添加失败: "
|
|
if not nodes_res[0]['hasA']: err_msg += f"起始节点'{s}'不存在; "
|
|
if not nodes_res[0]['hasB']: err_msg += f"结束节点'{t}'不存在"
|
|
return {"success": False, "msg": err_msg}
|
|
|
|
check_rel = f"MATCH (a {{name: $s}})-[r:`{clean_rel_type}`]->(b {{name: $t}}) RETURN r LIMIT 1"
|
|
existing_rel = self.db.execute_read(check_rel, {"s": s, "t": t})
|
|
if existing_rel:
|
|
return {"success": False, "msg": f"添加失败:已存在该关系"}
|
|
|
|
create_cypher = f"""
|
|
MATCH (a {{name: $s}}), (b {{name: $t}})
|
|
WITH a, b LIMIT 1
|
|
CREATE (a)-[r:`{clean_rel_type}` {{
|
|
label: $l,
|
|
createTime: $create_time
|
|
}}]->(b)
|
|
RETURN r
|
|
"""
|
|
result = self.db.execute_write_and_return(create_cypher, {
|
|
"s": s, "t": t, "l": l, "create_time": create_time
|
|
})
|
|
|
|
if result:
|
|
return {"success": True, "msg": "添加成功"}
|
|
return {"success": False, "msg": "关系创建失败"}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return {"success": False, "msg": f"数据库写入异常: {str(e)}"}
|
|
|
|
def update_relationship(self, rel_id: str, source_name: str, target_name: str, rel_type: str, rel_label: str):
|
|
try:
|
|
s = unquote(str(source_name)).strip()
|
|
t = unquote(str(target_name)).strip()
|
|
l = str(rel_label).strip()
|
|
|
|
find_old = "MATCH (a)-[r]->(b) WHERE elementId(r) = $id RETURN type(r) as type, a.name as s, b.name as t"
|
|
old = self.db.execute_read(find_old, {"id": rel_id})
|
|
if not old: return {"success": False, "msg": "修改失败:原关系不存在"}
|
|
|
|
if old[0]['s'] == s and old[0]['t'] == t and old[0]['type'] == rel_type:
|
|
update_cypher = "MATCH ()-[r]->() WHERE elementId(r) = $id SET r.label = $l RETURN r"
|
|
self.db.execute_write_and_return(update_cypher, {"id": rel_id, "l": l})
|
|
return {"success": True, "msg": "修改成功"}
|
|
else:
|
|
self.delete_relationship(rel_id)
|
|
return self.add_relationship(s, t, rel_type, l)
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return {"success": False, "msg": f"修改异常: {str(e)}"}
|
|
|
|
def delete_relationship(self, rel_id: str):
|
|
try:
|
|
cypher = "MATCH ()-[r]->() WHERE elementId(r) = $id DELETE r RETURN 1"
|
|
result = self.db.execute_write_and_return(cypher, {"id": rel_id})
|
|
return {"success": True, "msg": "删除成功"} if result else {"success": False, "msg": "关系不存在"}
|
|
except Exception as e:
|
|
return {"success": False, "msg": f"删除失败: {str(e)}"}
|
|
|
|
# --- 7. 导出功能 ---
|
|
def export_nodes_to_json(self, label=None, name=None): # 删除了参数中的 limit=20
|
|
try:
|
|
conditions = []
|
|
params = {}
|
|
|
|
if name and str(name).strip() and name not in ["null", "undefined"]:
|
|
params["name"] = unquote(str(name)).strip()
|
|
conditions.append("n.name CONTAINS $name")
|
|
|
|
if label and str(label).strip() and label not in ["全部", "", "null", "undefined"]:
|
|
params["export_label"] = str(label).strip()
|
|
label_cypher = f":`{label}`"
|
|
else:
|
|
label_cypher = ""
|
|
|
|
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
|
|
|
|
# 彻底移除 limit_clause
|
|
cypher = f"""
|
|
MATCH (n{label_cypher})
|
|
{where_clause}
|
|
RETURN elementId(n) AS elementId,
|
|
labels(n) AS labels,
|
|
properties(n) AS properties
|
|
"""
|
|
|
|
export_items = []
|
|
with self.db.driver.session() as session:
|
|
result = session.run(cypher, params)
|
|
for index, row in enumerate(result):
|
|
export_items.append({
|
|
"identity": index,
|
|
"elementId": row.get("elementId"),
|
|
"labels": row.get("labels"),
|
|
"properties": row.get("properties")
|
|
})
|
|
|
|
return {"success": True, "data": export_items, "count": len(export_items)}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return {"success": False, "msg": f"导出节点失败: {str(e)}"}
|
|
|
|
def export_relationships_to_json(self, source=None, target=None, rel_type=None): # 删除了参数中的 limit=20
|
|
try:
|
|
conditions = []
|
|
params = {}
|
|
|
|
if source:
|
|
params["source"] = unquote(str(source)).strip()
|
|
conditions.append("a.name CONTAINS $source")
|
|
if target:
|
|
params["target"] = unquote(str(target)).strip()
|
|
conditions.append("b.name CONTAINS $target")
|
|
if rel_type and str(rel_type).strip() and rel_type not in ["全部", "", "null", "undefined"]:
|
|
params["rel_type"] = str(rel_type).strip()
|
|
conditions.append("type(r) = $rel_type")
|
|
|
|
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
|
|
|
|
# 彻底移除 limit_clause
|
|
cypher = f"""
|
|
MATCH (a)-[r]->(b)
|
|
{where_clause}
|
|
RETURN
|
|
{{
|
|
elementId: elementId(a),
|
|
labels: labels(a),
|
|
properties: properties(a)
|
|
}} AS start_node,
|
|
{{
|
|
elementId: elementId(b),
|
|
labels: labels(b),
|
|
properties: properties(b)
|
|
}} AS end_node,
|
|
{{
|
|
type: type(r),
|
|
properties: properties(r),
|
|
elementId: elementId(r),
|
|
startNodeElementId: elementId(a),
|
|
endNodeElementId: elementId(b)
|
|
}} AS rel_info
|
|
"""
|
|
|
|
export_items = []
|
|
with self.db.driver.session() as session:
|
|
result = session.run(cypher, params)
|
|
for index, record in enumerate(result):
|
|
s = record["start_node"]
|
|
e = record["end_node"]
|
|
r = record["rel_info"]
|
|
|
|
node_id_base = index * 2
|
|
s["identity"] = node_id_base
|
|
e["identity"] = node_id_base + 1
|
|
|
|
r["identity"] = index
|
|
r["start"] = s["identity"]
|
|
r["end"] = e["identity"]
|
|
|
|
export_items.append({
|
|
"start": s,
|
|
"end": e,
|
|
"segments": [{"start": s, "relationship": r, "end": e}],
|
|
"length": 1.0
|
|
})
|
|
|
|
return {"success": True, "data": export_items, "count": len(export_items)}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return {"success": False, "msg": f"导出关系失败: {str(e)}"}
|
|
|
|
# --- 8. 节点导入核心功能 ---
|
|
def precheck_nodes_batch(self, nodes_batch):
|
|
"""
|
|
全量预检:针对一批数据,检查格式无效性、nodeId冲突、name+label冲突
|
|
"""
|
|
conflicts = []
|
|
invalid_data = []
|
|
valid_nodes = []
|
|
|
|
# 1. 内存清洗:先转换格式,再严格校验“三要素”
|
|
for index, raw_node in enumerate(nodes_batch):
|
|
# 格式标准化 (处理平铺/嵌套)
|
|
node = self._format_node_data(raw_node)
|
|
props = node["properties"]
|
|
|
|
name = props.get("name")
|
|
labels = node["labels"]
|
|
node_id = props.get("nodeId") # 关键:获取 nodeId
|
|
|
|
# 严格判定逻辑:name、labels、nodeId 缺一不可
|
|
if not name or not labels or node_id is None:
|
|
reasons = []
|
|
if not name: reasons.append("缺少 name")
|
|
if not labels: reasons.append("缺少 label")
|
|
if node_id is None: reasons.append("缺少 nodeId")
|
|
|
|
invalid_data.append({
|
|
"index": index,
|
|
"name": name or "未知",
|
|
"reason": " | ".join(reasons)
|
|
})
|
|
continue
|
|
|
|
valid_nodes.append(node)
|
|
|
|
if not valid_nodes:
|
|
return {"success": True, "conflicts": conflicts, "invalid": invalid_data}
|
|
|
|
# 2. 批量数据库比对 (查询潜在冲突)
|
|
all_node_ids = [n["properties"]["nodeId"] for n in valid_nodes]
|
|
all_names = [n["properties"]["name"] for n in valid_nodes]
|
|
|
|
# 查询 nodeId 冲突
|
|
db_id_map = {}
|
|
if all_node_ids:
|
|
id_results = self.db.execute_read(
|
|
"MATCH (n) WHERE n.nodeId IN $ids RETURN n.nodeId as nodeId, n.name as name", {"ids": all_node_ids})
|
|
db_id_map = {row["nodeId"]: row for row in id_results}
|
|
|
|
# 查询 name+label 冲突
|
|
db_name_set = set()
|
|
if all_names:
|
|
name_results = self.db.execute_read(
|
|
"MATCH (n) WHERE n.name IN $names RETURN n.name as name, labels(n) as labels", {"names": all_names})
|
|
db_name_set = {f"{row['name']}_{lbl}" for row in name_results for lbl in row['labels']}
|
|
|
|
# 3. 组装冲突报告
|
|
for node in valid_nodes:
|
|
p = node["properties"]
|
|
n_id, name, labels = p["nodeId"], p["name"], node["labels"]
|
|
|
|
# 优先级 1: nodeId 冲突
|
|
if n_id in db_id_map:
|
|
conflicts.append({
|
|
"name": name, "label": labels[0], "nodeId": n_id,
|
|
"reason": f"业务ID冲突: 已存在 nodeId={n_id}",
|
|
"type": "nodeId_duplicate"
|
|
})
|
|
continue
|
|
|
|
# 优先级 2: name + label 冲突
|
|
for lbl in labels:
|
|
if f"{name}_{lbl}" in db_name_set:
|
|
conflicts.append({
|
|
"name": name, "label": lbl, "nodeId": n_id,
|
|
"reason": f"逻辑主键冲突: {lbl} 下已存在名称 '{name}'",
|
|
"type": "logic_key_duplicate"
|
|
})
|
|
break
|
|
|
|
return {
|
|
"success": True,
|
|
"conflicts": conflicts,
|
|
"invalid": invalid_data,
|
|
"summary": {"total": len(nodes_batch), "valid": len(valid_nodes), "conflict": len(conflicts)}
|
|
}
|
|
|
|
def execute_node_import_batch(self, nodes_batch, mode="skip"):
|
|
try:
|
|
formatted_batch = [self._format_node_data(n) for n in nodes_batch]
|
|
# 获取当前时间的标准字符串格式,确保与手动添加的节点一致
|
|
current_time_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
if mode == "strict":
|
|
check = self.precheck_nodes_batch(nodes_batch)
|
|
if check.get("conflicts"):
|
|
return {"success": False, "msg": "严格模式下发现冲突,停止导入", "conflicts": check["conflicts"]}
|
|
|
|
label_groups = {}
|
|
for node in formatted_batch:
|
|
lbls = node.get("labels")
|
|
if not lbls or len(lbls) == 0: continue
|
|
lbl = lbls[0]
|
|
if lbl not in label_groups: label_groups[lbl] = []
|
|
props = node.get("properties")
|
|
if props and props.get("name"):
|
|
label_groups[lbl].append(props)
|
|
|
|
total_imported = 0
|
|
with self.db.driver.session() as session:
|
|
for lbl, batch_props in label_groups.items():
|
|
if not batch_props: continue
|
|
|
|
# 关键修改:将 datetime() 替换为传入的 $now 字符串
|
|
if mode == "update":
|
|
cypher = f"""
|
|
UNWIND $batch AS props
|
|
MERGE (n:`{lbl}` {{name: props.name}})
|
|
ON CREATE SET n = props, n.createTime = $now
|
|
ON MATCH SET n += props, n.updateTime = $now
|
|
RETURN count(n) as cnt
|
|
"""
|
|
elif mode == "skip":
|
|
cypher = f"""
|
|
UNWIND $batch AS props
|
|
MERGE (n:`{lbl}` {{name: props.name}})
|
|
ON CREATE SET n = props, n.createTime = $now
|
|
RETURN count(n) as cnt
|
|
"""
|
|
else:
|
|
cypher = f"""
|
|
UNWIND $batch AS props
|
|
CREATE (n:`{lbl}`)
|
|
SET n = props, n.createTime = $now
|
|
RETURN count(n) as cnt
|
|
"""
|
|
|
|
res = session.run(cypher, {"batch": batch_props, "now": current_time_str})
|
|
record = res.single()
|
|
if record: total_imported += record["cnt"]
|
|
|
|
return {"success": True, "msg": f"成功处理 {total_imported} 个节点", "count": total_imported}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return {"success": False, "msg": f"批次导入异常: {str(e)}"}
|
|
|
|
def precheck_rels_batch(self, rels_batch):
|
|
"""
|
|
关系导入预检 - 修复版
|
|
"""
|
|
conflicts = [] # 关系已存在
|
|
invalid = [] # 节点不存在或格式错误
|
|
# 注意:这里不再使用 valid_to_check 这种模糊中间变量
|
|
|
|
# 统计真正可以执行导入的数量
|
|
actual_valid_count = 0
|
|
|
|
for raw_item in rels_batch:
|
|
item = self._normalize_rel_data(raw_item)
|
|
|
|
# 1. 基础格式校验
|
|
if not item["source_name"] or not item["target_name"] or not item["rel_type"]:
|
|
invalid.append({
|
|
"source": item.get("source_name") or "未知",
|
|
"target": item.get("target_name") or "未知",
|
|
"reason": "格式错误:缺少必要字段"
|
|
})
|
|
continue
|
|
|
|
# 2. 数据库存在性校验
|
|
cypher = f"""
|
|
OPTIONAL MATCH (s {{name: $s_name}})
|
|
OPTIONAL MATCH (t {{name: $t_name}})
|
|
OPTIONAL MATCH (s)-[r:`{item['rel_type']}`]->(t)
|
|
RETURN s IS NOT NULL as hasS, t IS NOT NULL as hasT, r IS NOT NULL as hasR
|
|
"""
|
|
res = self.db.execute_read(cypher, {"s_name": item["source_name"], "t_name": item["target_name"]})
|
|
|
|
if not res:
|
|
continue
|
|
rec = res[0]
|
|
|
|
if not rec["hasS"] or not rec["hasT"]:
|
|
# 关键修复:节点不存在,属于 invalid
|
|
invalid.append({
|
|
"source": item["source_name"],
|
|
"target": item["target_name"],
|
|
"reason": f"节点不存在(起点:{'√' if rec['hasS'] else '×'}, 终点:{'√' if rec['hasT'] else '×'})"
|
|
})
|
|
elif rec["hasR"]:
|
|
# 关系已存在,属于冲突
|
|
conflicts.append({
|
|
"source": item["source_name"],
|
|
"target": item["target_name"],
|
|
"type": item["rel_type"],
|
|
"reason": "关系已存在"
|
|
})
|
|
else:
|
|
# 只有走到这里,才是真正的有效数据
|
|
actual_valid_count += 1
|
|
|
|
return {
|
|
"success": True,
|
|
"conflicts": conflicts,
|
|
"invalid": invalid,
|
|
"summary": {
|
|
"total": len(rels_batch),
|
|
"valid": actual_valid_count, # 真正能导进去的数量
|
|
"conflict": len(conflicts),
|
|
"invalid": len(invalid)
|
|
}
|
|
}
|
|
|
|
def execute_rel_import_batch(self, rels_batch, mode="skip"):
|
|
"""
|
|
执行导入:加入 Label 辅助匹配,确保 ElementId 100% 捕获
|
|
"""
|
|
try:
|
|
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
final_results = []
|
|
|
|
with self.db.driver.session() as session:
|
|
for raw_item in rels_batch:
|
|
item = self._normalize_rel_data(raw_item)
|
|
|
|
# 基础检查:没有起点或终点名称直接跳过
|
|
if not item["source_name"] or not item["target_name"] or not item["rel_type"]:
|
|
continue
|
|
|
|
# 动态构建 Cypher:如果有 Label 则带上 Label,匹配更精准
|
|
s_label_cypher = f":`{item['source_label']}`" if item['source_label'] else ""
|
|
t_label_cypher = f":`{item['target_label']}`" if item['target_label'] else ""
|
|
|
|
op = "ON MATCH SET r.label = $label" if mode == "update" else ""
|
|
|
|
cypher = f"""
|
|
MATCH (s{s_label_cypher} {{name: $s_name}})
|
|
MATCH (t{t_label_cypher} {{name: $t_name}})
|
|
MERGE (s)-[r:`{item['rel_type']}`]->(t)
|
|
ON CREATE SET r.label = $label, r.createTime = $now
|
|
{op}
|
|
RETURN s, t, r,
|
|
id(s) as s_id, id(t) as t_id, id(r) as r_id,
|
|
elementId(s) as s_eid, elementId(t) as t_eid, elementId(r) as r_eid
|
|
"""
|
|
|
|
res = session.run(cypher, {
|
|
"s_name": item["source_name"],
|
|
"t_name": item["target_name"],
|
|
"label": item["rel_label"],
|
|
"now": now
|
|
})
|
|
|
|
record = res.single()
|
|
if record:
|
|
s_node, t_node, r_rel = record["s"], record["t"], record["r"]
|
|
|
|
# 严格按照你要求的“理想格式”拼装
|
|
graph_item = {
|
|
"start": {
|
|
"identity": record["s_id"],
|
|
"labels": list(s_node.labels),
|
|
"properties": dict(s_node),
|
|
"elementId": str(record["s_eid"])
|
|
},
|
|
"end": {
|
|
"identity": record["t_id"],
|
|
"labels": list(t_node.labels),
|
|
"properties": dict(t_node),
|
|
"elementId": str(record["t_eid"])
|
|
},
|
|
"segments": [{
|
|
"start": {
|
|
"identity": record["s_id"],
|
|
"labels": list(s_node.labels),
|
|
"properties": dict(s_node),
|
|
"elementId": str(record["s_eid"])
|
|
},
|
|
"relationship": {
|
|
"identity": record["r_id"],
|
|
"start": record["s_id"],
|
|
"end": record["t_id"],
|
|
"type": r_rel.type,
|
|
"properties": dict(r_rel),
|
|
"elementId": str(record["r_eid"]),
|
|
"startNodeElementId": str(record["s_eid"]),
|
|
"endNodeElementId": str(record["t_eid"])
|
|
},
|
|
"end": {
|
|
"identity": record["identity"] if "identity" in t_node else record["t_id"], # 备选方案
|
|
"labels": list(t_node.labels),
|
|
"properties": dict(t_node),
|
|
"elementId": str(record["t_eid"])
|
|
}
|
|
}],
|
|
"length": 1.0
|
|
}
|
|
final_results.append(graph_item)
|
|
|
|
return {"success": True, "data": final_results, "count": len(final_results)}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return {"success": False, "msg": f"导入执行失败: {str(e)}"}
|
|
|