import json import traceback import datetime import random import time from urllib.parse import unquote from util.neo4j_utils import neo4j_client class OperationService: def __init__(self): self.db = neo4j_client # --- 0. 内部辅助工具:格式标准化 --- def _format_node_data(self, node): """ 统一转换前端传来的平铺或嵌套 JSON 格式。 输出标准结构: { "labels": [...], "properties": {...} } """ # 1. 提取并标准化 labels raw_labels = node.get("labels") or node.get("label") if isinstance(raw_labels, str): labels = [raw_labels] elif isinstance(raw_labels, list): labels = raw_labels else: labels = [] # 2. 提取并标准化 properties if "properties" in node and isinstance(node["properties"], dict): # 嵌套格式:直接取属性字典 props = node["properties"] else: # 平铺格式:打包除特殊键外的所有键值对 props = { k: v for k, v in node.items() if k not in ["label", "labels", "identity", "elementId"] } return {"labels": labels, "properties": props} def _normalize_rel_data(self, item): """ 参考节点导入的逻辑,统一转换关系数据。 支持:标准嵌套 JSON、平铺 JSON、以及带有 properties 包装的格式。 """ def clean_str(val): return str(val).strip() if val is not None else None # 1. 尝试识别标准嵌套结构 (start/end 对象) if isinstance(item.get("start"), dict) and isinstance(item.get("end"), dict): s_node = item["start"] e_node = item["end"] # 使用获取属性的通用逻辑 ( properties 优先 ) s_props = s_node.get("properties") if isinstance(s_node.get("properties"), dict) else s_node e_props = e_node.get("properties") if isinstance(e_node.get("properties"), dict) else e_node # 提取标签 (取第一个标签用于精确匹配) s_label = s_node.get("labels", [""])[0] if s_node.get("labels") else "" e_label = e_node.get("labels", [""])[0] if e_node.get("labels") else "" # 获取关系信息 rel_obj = {} if item.get("segments") and len(item["segments"]) > 0: rel_obj = item["segments"][0].get("relationship", {}) else: rel_obj = item.get("relationship", {}) r_props = rel_obj.get("properties") if isinstance(rel_obj.get("properties"), dict) else rel_obj return { "source_name": clean_str(s_props.get("name")), "source_label": s_label, "target_name": clean_str(e_props.get("name")), "target_label": e_label, "rel_type": clean_str(rel_obj.get("type")), "rel_label": clean_str(r_props.get("label") or r_props.get("name") or "") } # 2. 扁平格式适配 alias_map = { "source": ["source_name", "source", "start_name", "起点"], "target": ["target_name", "target", "end_name", "终点"], "type": ["rel_type", "type", "relationship"], "label": ["rel_label", "label", "关系标签"] } def find_value(keys): for k in keys: val = item.get(k) if val and not isinstance(val, dict): return val return None return { "source_name": clean_str(find_value(alias_map["source"])), "source_label": clean_str(item.get("source_label") or ""), "target_name": clean_str(find_value(alias_map["target"])), "target_label": clean_str(item.get("target_label") or ""), "rel_type": clean_str(find_value(alias_map["type"])), "rel_label": clean_str(find_value(alias_map["label"])) or "" } # --- 0. 数据修复工具 --- def fix_all_missing_node_ids(self): try: check_cypher = "MATCH (n) WHERE n.nodeId IS NULL OR n.nodeId = 0 OR n.nodeId = '0' RETURN count(n) as cnt" res = self.db.execute_read(check_cypher) if not res or res[0]['cnt'] == 0: return {"success": True, "msg": "没有需要修复的节点"} update_cypher = """ MATCH (n) WHERE n.nodeId IS NULL OR n.nodeId = 0 OR n.nodeId = '0' WITH n, toInteger(100000 + rand() * 899999) as newId SET n.nodeId = newId RETURN count(n) as fixedCount """ result = self.db.execute_write_and_return(update_cypher) return {"success": True, "msg": f"修复完成,共处理 {result[0]['fixedCount']} 个节点"} except Exception as e: return {"success": False, "msg": f"修复失败: {str(e)}"} # --- 1. 全局统计接口 --- def get_kg_stats(self): try: today_str = datetime.datetime.now().strftime('%Y-%m-%d') cypher = """ CALL () { MATCH (n) RETURN count(n) AS totalNodes } CALL () { MATCH ()-[r]->() RETURN count(r) AS totalRels } CALL () { MATCH (n) WHERE n.createTime STARTS WITH $today RETURN count(n) AS todayNodes } RETURN totalNodes, totalRels, todayNodes """ results = self.db.execute_read(cypher, {"today": today_str}) if results: return { "success": True, "data": { "totalNodes": results[0]['totalNodes'], "totalRels": results[0]['totalRels'], "todayNodes": results[0]['todayNodes'] } } return {"success": False, "msg": "未能获取统计数据"} except Exception as e: print(f"Stats Error: {e}") return {"success": False, "data": {"totalNodes": 0, "totalRels": 0, "todayNodes": 0}} # --- 2. 节点查询 --- def get_nodes_subset(self, page=1, page_size=20, name=None, label=None): try: skip_val = (int(page) - 1) * int(page_size) limit_val = int(page_size) conditions = [] params = {"skip": skip_val, "limit": limit_val} if name: decoded_name = unquote(str(name)).strip() conditions.append("n.name CONTAINS $name") params["name"] = decoded_name if label and str(label).strip() and label not in ["全部", ""]: # 使用标准的标签匹配语法 params["label"] = str(label).strip() conditions.append("$label IN labels(n)") where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" cypher = f""" MATCH (n) {where_clause} WITH count(n) AS total_count MATCH (n) {where_clause} RETURN elementId(n) AS id, labels(n) AS labels, n.name AS name, n.nodeId AS nodeId, total_count AS total ORDER BY coalesce(n.createTime, '0000-00-00') DESC, toInteger(coalesce(n.nodeId, 0)) DESC SKIP $skip LIMIT $limit """ raw_data = self.db.execute_read(cypher, params) if not raw_data: return {"items": [], "total": 0} items = [] for item in raw_data: db_node_id = item.get("nodeId") items.append({ "id": item["id"], "nodeId": db_node_id if (db_node_id and db_node_id != 0 and db_node_id != '0') else item["id"], "labels": item["labels"], "name": item.get("name") or "N/A" }) return {"items": items, "total": raw_data[0]['total']} except Exception as e: traceback.print_exc() return {"items": [], "total": 0} # --- 3. 关系查询 --- def get_relationships_subset(self, page=1, page_size=20, source=None, target=None, rel_type=None): try: skip_val = (int(page) - 1) * int(page_size) limit_val = int(page_size) conditions = [] params = {"skip": skip_val, "limit": limit_val} if source: params["source"] = unquote(str(source)).strip() conditions.append("a.name CONTAINS $source") if target: params["target"] = unquote(str(target)).strip() conditions.append("b.name CONTAINS $target") if rel_type and rel_type not in ["全部", ""]: conditions.append("type(r) = $rel_type") params["rel_type"] = str(rel_type).strip() where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" cypher = f""" MATCH (a)-[r]->(b) {where_clause} WITH count(r) AS total_count MATCH (a)-[r]->(b) {where_clause} RETURN elementId(r) as id, type(r) as type, r.label as label, a.name as source, b.name as target, total_count ORDER BY coalesce(r.createTime, '0000-00-00') DESC SKIP $skip LIMIT $limit """ raw_data = self.db.execute_read(cypher, params) if not raw_data: return {"items": [], "total": 0} items = [] for row in raw_data: items.append({ "id": row["id"], "type": row["type"], "label": row["label"] if row.get("label") is not None else "", "source": row["source"], "target": row["target"] }) return {"items": items, "total": raw_data[0]['total_count']} except Exception as e: traceback.print_exc() return {"items": [], "total": 0} # --- 4. 联想建议 --- def suggest_nodes(self, keyword: str, label: str = None): """ 修复后的建议逻辑: 1. 优化 Label 过滤语法,确保在 keyword 为空时也能根据 Label 返回数据。 2. 增加对空字符串的宽容处理。 """ try: kw = unquote(str(keyword or "")).strip() lb = str(label).strip() if label and label not in ["全部", "", "null", "undefined"] else None # 如果既没有关键词也没有标签,直接返回空 if not kw and not lb: return [] params = {} # 基础匹配语句,排除无意义节点 match_clause = "MATCH (n)" if lb: # 动态构建标签匹配,使用 :`label` 语法更高效且准确 match_clause = f"MATCH (n:`{lb}`)" conditions = ["n.name <> '未命名'"] if kw: conditions.append("n.name CONTAINS $kw") params["kw"] = kw where_clause = "WHERE " + " AND ".join(conditions) # 查询数据库 cypher = f"{match_clause} {where_clause} RETURN DISTINCT n.name as name LIMIT 15" results = self.db.execute_read(cypher, params) db_suggestions = [row["name"] for row in results if row.get("name")] # 如果依然没有结果,尝试去掉 Label 限制进行全库模糊匹配(保底逻辑) if not db_suggestions and kw and lb: fallback_cypher = "MATCH (n) WHERE n.name CONTAINS $kw AND n.name <> '未命名' RETURN DISTINCT n.name as name LIMIT 5" fallback_res = self.db.execute_read(fallback_cypher, {"kw": kw}) db_suggestions = [row["name"] for row in fallback_res if row.get("name")] return db_suggestions except Exception as e: print(f"Suggest Error Trace: {traceback.format_exc()}") return [] # --- 5. 节点管理 --- def add_node(self, label: str, name: str): try: nm = unquote(str(name)).strip() if not nm: return {"success": False, "msg": "名称不能为空"} now = datetime.datetime.now() create_time = now.strftime('%Y-%m-%d %H:%M:%S') check_cypher = "MATCH (n) WHERE n.name = $name RETURN n LIMIT 1" existing = self.db.execute_read(check_cypher, {"name": nm}) if existing: return {"success": False, "msg": f"添加失败:已存在名为 '{nm}' 的节点"} new_node_id = int(time.time() * 1000) create_cypher = f""" CREATE (n:`{label}` {{ name: $name, nodeId: $nodeId, createTime: $createTime }}) RETURN n """ result = self.db.execute_write_and_return(create_cypher, { "name": nm, "nodeId": new_node_id, "createTime": create_time }) if result: return {"success": True, "msg": "添加成功", "nodeId": new_node_id} return {"success": False, "msg": "节点创建失败"} except Exception as e: return {"success": False, "msg": f"写入失败: {str(e)}"} def update_node(self, node_id: str, name: str, label: str): try: nm = unquote(str(name)).strip() if not nm: return {"success": False, "msg": "名称不能为空"} check_name = "MATCH (n) WHERE n.name = $name AND elementId(n) <> $id RETURN n LIMIT 1" existing = self.db.execute_read(check_name, {"name": nm, "id": node_id}) if existing: return {"success": False, "msg": f"修改失败:库中已有其他名为 '{nm}' 的节点"} cypher = f""" MATCH (n) WHERE elementId(n) = $id SET n.name = $name WITH n REMOVE n:Drug:Disease:Symptom:Entity:Medicine:Check:Food:Operation:CheckSubject:Complication:Diagnosis:Treatment:AdjuvantTherapy:adverseReactions:Department:DiseaseSite:RelatedDisease:RelatedSymptom:SpreadWay:Stage:Subject:SymptomAndSign:TreatmentPrograms:Type:Cause:Attribute:Indications:Ingredients:Pathogenesis:PathologicalType:Pathophysiology:Precautions:Prognosis:PrognosticSurvivalTime:DiseaseRatio:DrugTherapy:Infectious:MultipleGroups:DiseaseRate WITH n SET n:`{label}` RETURN n """ result = self.db.execute_write_and_return(cypher, {"id": node_id, "name": nm}) if result: return {"success": True, "msg": "节点修改成功"} else: return {"success": False, "msg": "找不到该节点或更新失败"} except Exception as e: return {"success": False, "msg": str(e)} def delete_node(self, node_id: str): try: cypher = "MATCH (n) WHERE elementId(n) = $id DETACH DELETE n RETURN 1 as deleted" result = self.db.execute_write_and_return(cypher, {"id": node_id}) if result: return {"success": True, "msg": "删除成功"} return {"success": False, "msg": "节点不存在或已被删除"} except Exception as e: return {"success": False, "msg": str(e)} def get_all_labels(self): cypher = "CALL db.labels()" try: results = self.db.execute_read(cypher) labels = [list(row.values())[0] for row in results] return labels if labels else ["Drug", "Disease", "Symptom"] except: return ["Drug", "Disease", "Symptom"] # --- 6. 关系管理 --- def get_all_relationship_types(self): cypher = """ MATCH ()-[r]->() RETURN DISTINCT type(r) AS type, r.label AS label """ try: results = self.db.execute_read(cypher) type_map = [] seen_types = set() for row in results: t_name = row["type"] t_label = row["label"] if row.get("label") else t_name if t_name not in seen_types: type_map.append({ "type": t_name, "label": t_label }) seen_types.add(t_name) return type_map if type_map else [] except Exception as e: print(f"Fetch RelTypes Error: {e}") return [] def add_relationship(self, source_name: str, target_name: str, rel_type: str, rel_label: str): try: s = unquote(str(source_name)).strip() t = unquote(str(target_name)).strip() l = str(rel_label).strip() clean_rel_type = rel_type.strip().replace("`", "") create_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') check_nodes = """ OPTIONAL MATCH (a) WHERE a.name = $s WITH a LIMIT 1 OPTIONAL MATCH (b) WHERE b.name = $t WITH a, b LIMIT 1 RETURN a IS NOT NULL as hasA, b IS NOT NULL as hasB """ nodes_res = self.db.execute_read(check_nodes, {"s": s, "t": t}) if not nodes_res or not nodes_res[0]['hasA'] or not nodes_res[0]['hasB']: err_msg = "添加失败: " if not nodes_res[0]['hasA']: err_msg += f"起始节点'{s}'不存在; " if not nodes_res[0]['hasB']: err_msg += f"结束节点'{t}'不存在" return {"success": False, "msg": err_msg} check_rel = f"MATCH (a {{name: $s}})-[r:`{clean_rel_type}`]->(b {{name: $t}}) RETURN r LIMIT 1" existing_rel = self.db.execute_read(check_rel, {"s": s, "t": t}) if existing_rel: return {"success": False, "msg": f"添加失败:已存在该关系"} create_cypher = f""" MATCH (a {{name: $s}}), (b {{name: $t}}) WITH a, b LIMIT 1 CREATE (a)-[r:`{clean_rel_type}` {{ label: $l, createTime: $create_time }}]->(b) RETURN r """ result = self.db.execute_write_and_return(create_cypher, { "s": s, "t": t, "l": l, "create_time": create_time }) if result: return {"success": True, "msg": "添加成功"} return {"success": False, "msg": "关系创建失败"} except Exception as e: traceback.print_exc() return {"success": False, "msg": f"数据库写入异常: {str(e)}"} def update_relationship(self, rel_id: str, source_name: str, target_name: str, rel_type: str, rel_label: str): try: s = unquote(str(source_name)).strip() t = unquote(str(target_name)).strip() l = str(rel_label).strip() find_old = "MATCH (a)-[r]->(b) WHERE elementId(r) = $id RETURN type(r) as type, a.name as s, b.name as t" old = self.db.execute_read(find_old, {"id": rel_id}) if not old: return {"success": False, "msg": "修改失败:原关系不存在"} if old[0]['s'] == s and old[0]['t'] == t and old[0]['type'] == rel_type: update_cypher = "MATCH ()-[r]->() WHERE elementId(r) = $id SET r.label = $l RETURN r" self.db.execute_write_and_return(update_cypher, {"id": rel_id, "l": l}) return {"success": True, "msg": "修改成功"} else: self.delete_relationship(rel_id) return self.add_relationship(s, t, rel_type, l) except Exception as e: traceback.print_exc() return {"success": False, "msg": f"修改异常: {str(e)}"} def delete_relationship(self, rel_id: str): try: cypher = "MATCH ()-[r]->() WHERE elementId(r) = $id DELETE r RETURN 1" result = self.db.execute_write_and_return(cypher, {"id": rel_id}) return {"success": True, "msg": "删除成功"} if result else {"success": False, "msg": "关系不存在"} except Exception as e: return {"success": False, "msg": f"删除失败: {str(e)}"} # --- 7. 导出功能 --- def export_nodes_to_json(self, label=None, name=None): # 删除了参数中的 limit=20 try: conditions = [] params = {} if name and str(name).strip() and name not in ["null", "undefined"]: params["name"] = unquote(str(name)).strip() conditions.append("n.name CONTAINS $name") if label and str(label).strip() and label not in ["全部", "", "null", "undefined"]: params["export_label"] = str(label).strip() label_cypher = f":`{label}`" else: label_cypher = "" where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" # 彻底移除 limit_clause cypher = f""" MATCH (n{label_cypher}) {where_clause} RETURN elementId(n) AS elementId, labels(n) AS labels, properties(n) AS properties """ export_items = [] with self.db.driver.session() as session: result = session.run(cypher, params) for index, row in enumerate(result): export_items.append({ "identity": index, "elementId": row.get("elementId"), "labels": row.get("labels"), "properties": row.get("properties") }) return {"success": True, "data": export_items, "count": len(export_items)} except Exception as e: traceback.print_exc() return {"success": False, "msg": f"导出节点失败: {str(e)}"} def export_relationships_to_json(self, source=None, target=None, rel_type=None): # 删除了参数中的 limit=20 try: conditions = [] params = {} if source: params["source"] = unquote(str(source)).strip() conditions.append("a.name CONTAINS $source") if target: params["target"] = unquote(str(target)).strip() conditions.append("b.name CONTAINS $target") if rel_type and str(rel_type).strip() and rel_type not in ["全部", "", "null", "undefined"]: params["rel_type"] = str(rel_type).strip() conditions.append("type(r) = $rel_type") where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" # 彻底移除 limit_clause cypher = f""" MATCH (a)-[r]->(b) {where_clause} RETURN {{ elementId: elementId(a), labels: labels(a), properties: properties(a) }} AS start_node, {{ elementId: elementId(b), labels: labels(b), properties: properties(b) }} AS end_node, {{ type: type(r), properties: properties(r), elementId: elementId(r), startNodeElementId: elementId(a), endNodeElementId: elementId(b) }} AS rel_info """ export_items = [] with self.db.driver.session() as session: result = session.run(cypher, params) for index, record in enumerate(result): s = record["start_node"] e = record["end_node"] r = record["rel_info"] node_id_base = index * 2 s["identity"] = node_id_base e["identity"] = node_id_base + 1 r["identity"] = index r["start"] = s["identity"] r["end"] = e["identity"] export_items.append({ "start": s, "end": e, "segments": [{"start": s, "relationship": r, "end": e}], "length": 1.0 }) return {"success": True, "data": export_items, "count": len(export_items)} except Exception as e: traceback.print_exc() return {"success": False, "msg": f"导出关系失败: {str(e)}"} # --- 8. 节点导入核心功能 --- def precheck_nodes_batch(self, nodes_batch): """ 全量预检:针对一批数据,检查格式无效性、nodeId冲突、name+label冲突 """ conflicts = [] invalid_data = [] valid_nodes = [] # 1. 内存清洗:先转换格式,再严格校验“三要素” for index, raw_node in enumerate(nodes_batch): # 格式标准化 (处理平铺/嵌套) node = self._format_node_data(raw_node) props = node["properties"] name = props.get("name") labels = node["labels"] node_id = props.get("nodeId") # 关键:获取 nodeId # 严格判定逻辑:name、labels、nodeId 缺一不可 if not name or not labels or node_id is None: reasons = [] if not name: reasons.append("缺少 name") if not labels: reasons.append("缺少 label") if node_id is None: reasons.append("缺少 nodeId") invalid_data.append({ "index": index, "name": name or "未知", "reason": " | ".join(reasons) }) continue valid_nodes.append(node) if not valid_nodes: return {"success": True, "conflicts": conflicts, "invalid": invalid_data} # 2. 批量数据库比对 (查询潜在冲突) all_node_ids = [n["properties"]["nodeId"] for n in valid_nodes] all_names = [n["properties"]["name"] for n in valid_nodes] # 查询 nodeId 冲突 db_id_map = {} if all_node_ids: id_results = self.db.execute_read( "MATCH (n) WHERE n.nodeId IN $ids RETURN n.nodeId as nodeId, n.name as name", {"ids": all_node_ids}) db_id_map = {row["nodeId"]: row for row in id_results} # 查询 name+label 冲突 db_name_set = set() if all_names: name_results = self.db.execute_read( "MATCH (n) WHERE n.name IN $names RETURN n.name as name, labels(n) as labels", {"names": all_names}) db_name_set = {f"{row['name']}_{lbl}" for row in name_results for lbl in row['labels']} # 3. 组装冲突报告 for node in valid_nodes: p = node["properties"] n_id, name, labels = p["nodeId"], p["name"], node["labels"] # 优先级 1: nodeId 冲突 if n_id in db_id_map: conflicts.append({ "name": name, "label": labels[0], "nodeId": n_id, "reason": f"业务ID冲突: 已存在 nodeId={n_id}", "type": "nodeId_duplicate" }) continue # 优先级 2: name + label 冲突 for lbl in labels: if f"{name}_{lbl}" in db_name_set: conflicts.append({ "name": name, "label": lbl, "nodeId": n_id, "reason": f"逻辑主键冲突: {lbl} 下已存在名称 '{name}'", "type": "logic_key_duplicate" }) break return { "success": True, "conflicts": conflicts, "invalid": invalid_data, "summary": {"total": len(nodes_batch), "valid": len(valid_nodes), "conflict": len(conflicts)} } def execute_node_import_batch(self, nodes_batch, mode="skip"): try: formatted_batch = [self._format_node_data(n) for n in nodes_batch] # 获取当前时间的标准字符串格式,确保与手动添加的节点一致 current_time_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') if mode == "strict": check = self.precheck_nodes_batch(nodes_batch) if check.get("conflicts"): return {"success": False, "msg": "严格模式下发现冲突,停止导入", "conflicts": check["conflicts"]} label_groups = {} for node in formatted_batch: lbls = node.get("labels") if not lbls or len(lbls) == 0: continue lbl = lbls[0] if lbl not in label_groups: label_groups[lbl] = [] props = node.get("properties") if props and props.get("name"): label_groups[lbl].append(props) total_imported = 0 with self.db.driver.session() as session: for lbl, batch_props in label_groups.items(): if not batch_props: continue # 关键修改:将 datetime() 替换为传入的 $now 字符串 if mode == "update": cypher = f""" UNWIND $batch AS props MERGE (n:`{lbl}` {{name: props.name}}) ON CREATE SET n = props, n.createTime = $now ON MATCH SET n += props, n.updateTime = $now RETURN count(n) as cnt """ elif mode == "skip": cypher = f""" UNWIND $batch AS props MERGE (n:`{lbl}` {{name: props.name}}) ON CREATE SET n = props, n.createTime = $now RETURN count(n) as cnt """ else: cypher = f""" UNWIND $batch AS props CREATE (n:`{lbl}`) SET n = props, n.createTime = $now RETURN count(n) as cnt """ res = session.run(cypher, {"batch": batch_props, "now": current_time_str}) record = res.single() if record: total_imported += record["cnt"] return {"success": True, "msg": f"成功处理 {total_imported} 个节点", "count": total_imported} except Exception as e: traceback.print_exc() return {"success": False, "msg": f"批次导入异常: {str(e)}"} def precheck_rels_batch(self, rels_batch): """ 关系导入预检 - 修复版 """ conflicts = [] # 关系已存在 invalid = [] # 节点不存在或格式错误 # 注意:这里不再使用 valid_to_check 这种模糊中间变量 # 统计真正可以执行导入的数量 actual_valid_count = 0 for raw_item in rels_batch: item = self._normalize_rel_data(raw_item) # 1. 基础格式校验 if not item["source_name"] or not item["target_name"] or not item["rel_type"]: invalid.append({ "source": item.get("source_name") or "未知", "target": item.get("target_name") or "未知", "reason": "格式错误:缺少必要字段" }) continue # 2. 数据库存在性校验 cypher = f""" OPTIONAL MATCH (s {{name: $s_name}}) OPTIONAL MATCH (t {{name: $t_name}}) OPTIONAL MATCH (s)-[r:`{item['rel_type']}`]->(t) RETURN s IS NOT NULL as hasS, t IS NOT NULL as hasT, r IS NOT NULL as hasR """ res = self.db.execute_read(cypher, {"s_name": item["source_name"], "t_name": item["target_name"]}) if not res: continue rec = res[0] if not rec["hasS"] or not rec["hasT"]: # 关键修复:节点不存在,属于 invalid invalid.append({ "source": item["source_name"], "target": item["target_name"], "reason": f"节点不存在(起点:{'√' if rec['hasS'] else '×'}, 终点:{'√' if rec['hasT'] else '×'})" }) elif rec["hasR"]: # 关系已存在,属于冲突 conflicts.append({ "source": item["source_name"], "target": item["target_name"], "type": item["rel_type"], "reason": "关系已存在" }) else: # 只有走到这里,才是真正的有效数据 actual_valid_count += 1 return { "success": True, "conflicts": conflicts, "invalid": invalid, "summary": { "total": len(rels_batch), "valid": actual_valid_count, # 真正能导进去的数量 "conflict": len(conflicts), "invalid": len(invalid) } } def execute_rel_import_batch(self, rels_batch, mode="skip"): """ 执行导入:加入 Label 辅助匹配,确保 ElementId 100% 捕获 """ try: now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') final_results = [] with self.db.driver.session() as session: for raw_item in rels_batch: item = self._normalize_rel_data(raw_item) # 基础检查:没有起点或终点名称直接跳过 if not item["source_name"] or not item["target_name"] or not item["rel_type"]: continue # 动态构建 Cypher:如果有 Label 则带上 Label,匹配更精准 s_label_cypher = f":`{item['source_label']}`" if item['source_label'] else "" t_label_cypher = f":`{item['target_label']}`" if item['target_label'] else "" op = "ON MATCH SET r.label = $label" if mode == "update" else "" cypher = f""" MATCH (s{s_label_cypher} {{name: $s_name}}) MATCH (t{t_label_cypher} {{name: $t_name}}) MERGE (s)-[r:`{item['rel_type']}`]->(t) ON CREATE SET r.label = $label, r.createTime = $now {op} RETURN s, t, r, id(s) as s_id, id(t) as t_id, id(r) as r_id, elementId(s) as s_eid, elementId(t) as t_eid, elementId(r) as r_eid """ res = session.run(cypher, { "s_name": item["source_name"], "t_name": item["target_name"], "label": item["rel_label"], "now": now }) record = res.single() if record: s_node, t_node, r_rel = record["s"], record["t"], record["r"] # 严格按照你要求的“理想格式”拼装 graph_item = { "start": { "identity": record["s_id"], "labels": list(s_node.labels), "properties": dict(s_node), "elementId": str(record["s_eid"]) }, "end": { "identity": record["t_id"], "labels": list(t_node.labels), "properties": dict(t_node), "elementId": str(record["t_eid"]) }, "segments": [{ "start": { "identity": record["s_id"], "labels": list(s_node.labels), "properties": dict(s_node), "elementId": str(record["s_eid"]) }, "relationship": { "identity": record["r_id"], "start": record["s_id"], "end": record["t_id"], "type": r_rel.type, "properties": dict(r_rel), "elementId": str(record["r_eid"]), "startNodeElementId": str(record["s_eid"]), "endNodeElementId": str(record["t_eid"]) }, "end": { "identity": record["identity"] if "identity" in t_node else record["t_id"], # 备选方案 "labels": list(t_node.labels), "properties": dict(t_node), "elementId": str(record["t_eid"]) } }], "length": 1.0 } final_results.append(graph_item) return {"success": True, "data": final_results, "count": len(final_results)} except Exception as e: traceback.print_exc() return {"success": False, "msg": f"导入执行失败: {str(e)}"}