You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

392 lines
16 KiB

import json
import traceback
import datetime
import random
import time
from urllib.parse import unquote
from util.neo4j_utils import neo4j_client
class OperationService:
def __init__(self):
self.db = neo4j_client
# --- 0. 数据修复工具 ---
def fix_all_missing_node_ids(self):
try:
check_cypher = "MATCH (n) WHERE n.nodeId IS NULL OR n.nodeId = 0 OR n.nodeId = '0' RETURN count(n) as cnt"
res = self.db.execute_read(check_cypher)
if not res or res[0]['cnt'] == 0:
return {"success": True, "msg": "没有需要修复的节点"}
update_cypher = """
MATCH (n)
WHERE n.nodeId IS NULL OR n.nodeId = 0 OR n.nodeId = '0'
WITH n, toInteger(100000 + rand() * 899999) as newId
SET n.nodeId = newId
RETURN count(n) as fixedCount
"""
result = self.db.execute_write_and_return(update_cypher)
return {"success": True, "msg": f"修复完成,共处理 {result[0]['fixedCount']} 个节点"}
except Exception as e:
return {"success": False, "msg": f"修复失败: {str(e)}"}
# --- 1. 全局统计接口 ---
def get_kg_stats(self):
try:
today_str = datetime.datetime.now().strftime('%Y-%m-%d')
cypher = """
CALL () {
MATCH (n) RETURN count(n) AS totalNodes
}
CALL () {
MATCH ()-[r]->() RETURN count(r) AS totalRels
}
CALL () {
MATCH (n) WHERE n.createTime STARTS WITH $today RETURN count(n) AS todayNodes
}
RETURN totalNodes, totalRels, todayNodes
"""
results = self.db.execute_read(cypher, {"today": today_str})
if results:
return {
"success": True,
"data": {
"totalNodes": results[0]['totalNodes'],
"totalRels": results[0]['totalRels'],
"todayNodes": results[0]['todayNodes']
}
}
return {"success": False, "msg": "未能获取统计数据"}
except Exception as e:
print(f"Stats Error: {e}")
return {"success": False, "data": {"totalNodes": 0, "totalRels": 0, "todayNodes": 0}}
# --- 2. 节点查询 ---
def get_nodes_subset(self, page=1, page_size=20, name=None, label=None):
try:
skip_val = (int(page) - 1) * int(page_size)
limit_val = int(page_size)
conditions = []
params = {"skip": skip_val, "limit": limit_val}
if name:
decoded_name = unquote(str(name)).strip()
conditions.append("n.name CONTAINS $name")
params["name"] = decoded_name
if label and str(label).strip() and label not in ["全部", ""]:
# 使用标准的标签匹配语法
params["label"] = str(label).strip()
conditions.append("$label IN labels(n)")
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
cypher = f"""
MATCH (n)
{where_clause}
WITH count(n) AS total_count
MATCH (n)
{where_clause}
RETURN elementId(n) AS id, labels(n) AS labels, n.name AS name, n.nodeId AS nodeId, total_count AS total
ORDER BY coalesce(n.createTime, '0000-00-00') DESC, toInteger(coalesce(n.nodeId, 0)) DESC
SKIP $skip LIMIT $limit
"""
raw_data = self.db.execute_read(cypher, params)
if not raw_data:
return {"items": [], "total": 0}
items = []
for item in raw_data:
db_node_id = item.get("nodeId")
items.append({
"id": item["id"],
"nodeId": db_node_id if (db_node_id and db_node_id != 0 and db_node_id != '0') else item["id"],
"labels": item["labels"],
"name": item.get("name") or "N/A"
})
return {"items": items, "total": raw_data[0]['total']}
except Exception as e:
traceback.print_exc()
return {"items": [], "total": 0}
# --- 3. 关系查询 ---
def get_relationships_subset(self, page=1, page_size=20, source=None, target=None, rel_type=None):
try:
skip_val = (int(page) - 1) * int(page_size)
limit_val = int(page_size)
conditions = []
params = {"skip": skip_val, "limit": limit_val}
if source:
params["source"] = unquote(str(source)).strip()
conditions.append("a.name CONTAINS $source")
if target:
params["target"] = unquote(str(target)).strip()
conditions.append("b.name CONTAINS $target")
if rel_type and rel_type not in ["全部", ""]:
conditions.append("type(r) = $rel_type")
params["rel_type"] = str(rel_type).strip()
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
cypher = f"""
MATCH (a)-[r]->(b)
{where_clause}
WITH count(r) AS total_count
MATCH (a)-[r]->(b)
{where_clause}
RETURN elementId(r) as id,
type(r) as type,
r.label as label,
a.name as source,
b.name as target,
total_count
ORDER BY coalesce(r.createTime, '0000-00-00') DESC
SKIP $skip LIMIT $limit
"""
raw_data = self.db.execute_read(cypher, params)
if not raw_data:
return {"items": [], "total": 0}
items = []
for row in raw_data:
items.append({
"id": row["id"],
"type": row["type"],
"label": row["label"] if row.get("label") is not None else "",
"source": row["source"],
"target": row["target"]
})
return {"items": items, "total": raw_data[0]['total_count']}
except Exception as e:
traceback.print_exc()
return {"items": [], "total": 0}
# --- 4. 联想建议 ---
def suggest_nodes(self, keyword: str, label: str = None):
"""
修复后的建议逻辑:
1. 优化 Label 过滤语法,确保在 keyword 为空时也能根据 Label 返回数据。
2. 增加对空字符串的宽容处理。
"""
try:
kw = unquote(str(keyword or "")).strip()
lb = str(label).strip() if label and label not in ["全部", "", "null", "undefined"] else None
# 如果既没有关键词也没有标签,直接返回空
if not kw and not lb:
return []
params = {}
# 基础匹配语句,排除无意义节点
match_clause = "MATCH (n)"
if lb:
# 动态构建标签匹配,使用 :`label` 语法更高效且准确
match_clause = f"MATCH (n:`{lb}`)"
conditions = ["n.name <> '未命名'"]
if kw:
conditions.append("n.name CONTAINS $kw")
params["kw"] = kw
where_clause = "WHERE " + " AND ".join(conditions)
# 查询数据库
cypher = f"{match_clause} {where_clause} RETURN DISTINCT n.name as name LIMIT 15"
results = self.db.execute_read(cypher, params)
db_suggestions = [row["name"] for row in results if row.get("name")]
# 如果依然没有结果,尝试去掉 Label 限制进行全库模糊匹配(保底逻辑)
if not db_suggestions and kw and lb:
fallback_cypher = "MATCH (n) WHERE n.name CONTAINS $kw AND n.name <> '未命名' RETURN DISTINCT n.name as name LIMIT 5"
fallback_res = self.db.execute_read(fallback_cypher, {"kw": kw})
db_suggestions = [row["name"] for row in fallback_res if row.get("name")]
return db_suggestions
except Exception as e:
print(f"Suggest Error Trace: {traceback.format_exc()}")
return []
# --- 5. 节点管理 ---
def add_node(self, label: str, name: str):
try:
nm = unquote(str(name)).strip()
if not nm: return {"success": False, "msg": "名称不能为空"}
now = datetime.datetime.now()
create_time = now.strftime('%Y-%m-%d %H:%M:%S')
check_cypher = "MATCH (n) WHERE n.name = $name RETURN n LIMIT 1"
existing = self.db.execute_read(check_cypher, {"name": nm})
if existing:
return {"success": False, "msg": f"添加失败:已存在名为 '{nm}' 的节点"}
new_node_id = int(time.time() * 1000)
create_cypher = f"""
CREATE (n:`{label}` {{
name: $name,
nodeId: $nodeId,
createTime: $createTime
}})
RETURN n
"""
result = self.db.execute_write_and_return(create_cypher, {
"name": nm,
"nodeId": new_node_id,
"createTime": create_time
})
if result:
return {"success": True, "msg": "添加成功", "nodeId": new_node_id}
return {"success": False, "msg": "节点创建失败"}
except Exception as e:
return {"success": False, "msg": f"写入失败: {str(e)}"}
def update_node(self, node_id: str, name: str, label: str):
try:
nm = unquote(str(name)).strip()
if not nm: return {"success": False, "msg": "名称不能为空"}
check_name = "MATCH (n) WHERE n.name = $name AND elementId(n) <> $id RETURN n LIMIT 1"
existing = self.db.execute_read(check_name, {"name": nm, "id": node_id})
if existing:
return {"success": False, "msg": f"修改失败:库中已有其他名为 '{nm}' 的节点"}
cypher = f"""
MATCH (n) WHERE elementId(n) = $id
SET n.name = $name
WITH n
REMOVE n:Drug:Disease:Symptom:Entity:Medicine:Check:Food:Operation:CheckSubject:Complication:Diagnosis:Treatment:AdjuvantTherapy:adverseReactions:Department:DiseaseSite:RelatedDisease:RelatedSymptom:SpreadWay:Stage:Subject:SymptomAndSign:TreatmentPrograms:Type:Cause:Attribute:Indications:Ingredients:Pathogenesis:PathologicalType:Pathophysiology:Precautions:Prognosis:PrognosticSurvivalTime:DiseaseRatio:DrugTherapy:Infectious:MultipleGroups:DiseaseRate
WITH n
SET n:`{label}`
RETURN n
"""
result = self.db.execute_write_and_return(cypher, {"id": node_id, "name": nm})
if result:
return {"success": True, "msg": "节点修改成功"}
else:
return {"success": False, "msg": "找不到该节点或更新失败"}
except Exception as e:
return {"success": False, "msg": str(e)}
def delete_node(self, node_id: str):
try:
cypher = "MATCH (n) WHERE elementId(n) = $id DETACH DELETE n RETURN 1 as deleted"
result = self.db.execute_write_and_return(cypher, {"id": node_id})
if result:
return {"success": True, "msg": "删除成功"}
return {"success": False, "msg": "节点不存在或已被删除"}
except Exception as e:
return {"success": False, "msg": str(e)}
def get_all_labels(self):
cypher = "CALL db.labels()"
try:
results = self.db.execute_read(cypher)
labels = [list(row.values())[0] for row in results]
return labels if labels else ["Drug", "Disease", "Symptom"]
except:
return ["Drug", "Disease", "Symptom"]
# --- 6. 关系管理 ---
def get_all_relationship_types(self):
cypher = """
MATCH ()-[r]->()
RETURN DISTINCT type(r) AS type, r.label AS label
"""
try:
results = self.db.execute_read(cypher)
type_map = []
seen_types = set()
for row in results:
t_name = row["type"]
t_label = row["label"] if row.get("label") else t_name
if t_name not in seen_types:
type_map.append({
"type": t_name,
"label": t_label
})
seen_types.add(t_name)
return type_map if type_map else []
except Exception as e:
print(f"Fetch RelTypes Error: {e}")
return []
def add_relationship(self, source_name: str, target_name: str, rel_type: str, rel_label: str):
try:
s = unquote(str(source_name)).strip()
t = unquote(str(target_name)).strip()
l = str(rel_label).strip()
clean_rel_type = rel_type.strip().replace("`", "")
create_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
check_nodes = """
OPTIONAL MATCH (a) WHERE a.name = $s
WITH a LIMIT 1
OPTIONAL MATCH (b) WHERE b.name = $t
WITH a, b LIMIT 1
RETURN a IS NOT NULL as hasA, b IS NOT NULL as hasB
"""
nodes_res = self.db.execute_read(check_nodes, {"s": s, "t": t})
if not nodes_res or not nodes_res[0]['hasA'] or not nodes_res[0]['hasB']:
err_msg = "添加失败: "
if not nodes_res[0]['hasA']: err_msg += f"起始节点'{s}'不存在; "
if not nodes_res[0]['hasB']: err_msg += f"结束节点'{t}'不存在"
return {"success": False, "msg": err_msg}
check_rel = f"MATCH (a {{name: $s}})-[r:`{clean_rel_type}`]->(b {{name: $t}}) RETURN r LIMIT 1"
existing_rel = self.db.execute_read(check_rel, {"s": s, "t": t})
if existing_rel:
return {"success": False, "msg": f"添加失败:已存在该关系"}
create_cypher = f"""
MATCH (a {{name: $s}}), (b {{name: $t}})
WITH a, b LIMIT 1
CREATE (a)-[r:`{clean_rel_type}` {{
label: $l,
createTime: $create_time
}}]->(b)
RETURN r
"""
result = self.db.execute_write_and_return(create_cypher, {
"s": s, "t": t, "l": l, "create_time": create_time
})
if result:
return {"success": True, "msg": "添加成功"}
return {"success": False, "msg": "关系创建失败"}
except Exception as e:
traceback.print_exc()
return {"success": False, "msg": f"数据库写入异常: {str(e)}"}
def update_relationship(self, rel_id: str, source_name: str, target_name: str, rel_type: str, rel_label: str):
try:
s = unquote(str(source_name)).strip()
t = unquote(str(target_name)).strip()
l = str(rel_label).strip()
find_old = "MATCH (a)-[r]->(b) WHERE elementId(r) = $id RETURN type(r) as type, a.name as s, b.name as t"
old = self.db.execute_read(find_old, {"id": rel_id})
if not old: return {"success": False, "msg": "修改失败:原关系不存在"}
if old[0]['s'] == s and old[0]['t'] == t and old[0]['type'] == rel_type:
update_cypher = "MATCH ()-[r]->() WHERE elementId(r) = $id SET r.label = $l RETURN r"
self.db.execute_write_and_return(update_cypher, {"id": rel_id, "l": l})
return {"success": True, "msg": "修改成功"}
else:
self.delete_relationship(rel_id)
return self.add_relationship(s, t, rel_type, l)
except Exception as e:
traceback.print_exc()
return {"success": False, "msg": f"修改异常: {str(e)}"}
def delete_relationship(self, rel_id: str):
try:
cypher = "MATCH ()-[r]->() WHERE elementId(r) = $id DELETE r RETURN 1"
result = self.db.execute_write_and_return(cypher, {"id": rel_id})
return {"success": True, "msg": "删除成功"} if result else {"success": False, "msg": "关系不存在"}
except Exception as e:
return {"success": False, "msg": f"删除失败: {str(e)}"}