|
|
|
|
import json
|
|
|
|
|
import traceback
|
|
|
|
|
import datetime
|
|
|
|
|
from urllib.parse import unquote
|
|
|
|
|
from util.neo4j_utils import neo4j_client
|
|
|
|
|
|
|
|
|
|
class OperationService:
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.db = neo4j_client
|
|
|
|
|
|
|
|
|
|
# --- 1. 全局统计接口 ---
|
|
|
|
|
def get_kg_stats(self):
|
|
|
|
|
try:
|
|
|
|
|
today_str = datetime.datetime.now().strftime('%Y-%m-%d')
|
|
|
|
|
cypher = """
|
|
|
|
|
CALL () {
|
|
|
|
|
MATCH (n) RETURN count(n) AS totalNodes
|
|
|
|
|
}
|
|
|
|
|
CALL () {
|
|
|
|
|
MATCH ()-[r]->() RETURN count(r) AS totalRels
|
|
|
|
|
}
|
|
|
|
|
CALL () {
|
|
|
|
|
MATCH (n) WHERE n.createTime STARTS WITH $today RETURN count(n) AS todayNodes
|
|
|
|
|
}
|
|
|
|
|
RETURN totalNodes, totalRels, todayNodes
|
|
|
|
|
"""
|
|
|
|
|
results = self.db.execute_read(cypher, {"today": today_str})
|
|
|
|
|
if results:
|
|
|
|
|
return {
|
|
|
|
|
"success": True,
|
|
|
|
|
"data": {
|
|
|
|
|
"totalNodes": results[0]['totalNodes'],
|
|
|
|
|
"totalRels": results[0]['totalRels'],
|
|
|
|
|
"todayNodes": results[0]['todayNodes']
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return {"success": False, "msg": "未能获取统计数据"}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Stats Error: {e}")
|
|
|
|
|
return {"success": False, "data": {"totalNodes": 0, "totalRels": 0, "todayNodes": 0}}
|
|
|
|
|
|
|
|
|
|
# --- 2. 节点查询 ---
|
|
|
|
|
def get_nodes_subset(self, page=1, page_size=20, name=None, label=None):
|
|
|
|
|
try:
|
|
|
|
|
skip_val = (int(page) - 1) * int(page_size)
|
|
|
|
|
limit_val = int(page_size)
|
|
|
|
|
conditions = []
|
|
|
|
|
params = {"skip": skip_val, "limit": limit_val}
|
|
|
|
|
|
|
|
|
|
if name:
|
|
|
|
|
decoded_name = unquote(str(name)).strip()
|
|
|
|
|
conditions.append("n.name CONTAINS $name")
|
|
|
|
|
params["name"] = decoded_name
|
|
|
|
|
|
|
|
|
|
if label and str(label).strip() and label not in ["全部", ""]:
|
|
|
|
|
conditions.append("ANY(l IN labels(n) WHERE l = $label)")
|
|
|
|
|
params["label"] = str(label).strip()
|
|
|
|
|
|
|
|
|
|
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
|
|
|
|
|
|
|
|
|
|
# 核心 Cypher:同时返回 elementId(用于后端操作) 和 nodeId(用于前端展示)
|
|
|
|
|
cypher = f"""
|
|
|
|
|
MATCH (n)
|
|
|
|
|
{where_clause}
|
|
|
|
|
WITH count(n) AS total_count
|
|
|
|
|
MATCH (n)
|
|
|
|
|
{where_clause}
|
|
|
|
|
RETURN elementId(n) AS id, labels(n) AS labels, n.name AS name, n.nodeId AS nodeId, total_count AS total
|
|
|
|
|
ORDER BY coalesce(n.createTime, '0000-00-00') DESC, toInteger(coalesce(n.nodeId, 0)) DESC
|
|
|
|
|
SKIP $skip LIMIT $limit
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
raw_data = self.db.execute_read(cypher, params)
|
|
|
|
|
if not raw_data:
|
|
|
|
|
return {"items": [], "total": 0}
|
|
|
|
|
|
|
|
|
|
items = []
|
|
|
|
|
for item in raw_data:
|
|
|
|
|
items.append({
|
|
|
|
|
"id": item["id"], # 后端操作用的 elementId
|
|
|
|
|
"nodeId": item.get("nodeId") or item["id"], # 前端展示用的业务ID,若无则降级
|
|
|
|
|
"labels": item["labels"],
|
|
|
|
|
"name": item.get("name") or "N/A"
|
|
|
|
|
})
|
|
|
|
|
return {"items": items, "total": raw_data[0]['total']}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
traceback.print_exc()
|
|
|
|
|
return {"items": [], "total": 0}
|
|
|
|
|
|
|
|
|
|
# --- 3. 关系查询 ---
|
|
|
|
|
def get_relationships_subset(self, page=1, page_size=20, source=None, target=None, rel_type=None):
|
|
|
|
|
try:
|
|
|
|
|
skip_val = (int(page) - 1) * int(page_size)
|
|
|
|
|
limit_val = int(page_size)
|
|
|
|
|
conditions = []
|
|
|
|
|
params = {"skip": skip_val, "limit": limit_val}
|
|
|
|
|
|
|
|
|
|
if source:
|
|
|
|
|
params["source"] = unquote(str(source)).strip()
|
|
|
|
|
conditions.append("a.name CONTAINS $source")
|
|
|
|
|
if target:
|
|
|
|
|
params["target"] = unquote(str(target)).strip()
|
|
|
|
|
conditions.append("b.name CONTAINS $target")
|
|
|
|
|
if rel_type and rel_type not in ["全部", ""]:
|
|
|
|
|
conditions.append("type(r) = $rel_type")
|
|
|
|
|
params["rel_type"] = str(rel_type).strip()
|
|
|
|
|
|
|
|
|
|
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
|
|
|
|
|
|
|
|
|
|
cypher = f"""
|
|
|
|
|
MATCH (a)-[r]->(b)
|
|
|
|
|
{where_clause}
|
|
|
|
|
WITH count(r) AS total_count
|
|
|
|
|
MATCH (a)-[r]->(b)
|
|
|
|
|
{where_clause}
|
|
|
|
|
RETURN elementId(r) as id,
|
|
|
|
|
type(r) as type,
|
|
|
|
|
r.label as label,
|
|
|
|
|
a.name as source,
|
|
|
|
|
b.name as target,
|
|
|
|
|
total_count
|
|
|
|
|
ORDER BY coalesce(r.createTime, '0000-00-00') DESC
|
|
|
|
|
SKIP $skip LIMIT $limit
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
raw_data = self.db.execute_read(cypher, params)
|
|
|
|
|
if not raw_data:
|
|
|
|
|
return {"items": [], "total": 0}
|
|
|
|
|
|
|
|
|
|
items = []
|
|
|
|
|
for row in raw_data:
|
|
|
|
|
items.append({
|
|
|
|
|
"id": row["id"],
|
|
|
|
|
"type": row["type"],
|
|
|
|
|
"label": row["label"] if row.get("label") is not None else "",
|
|
|
|
|
"source": row["source"],
|
|
|
|
|
"target": row["target"]
|
|
|
|
|
})
|
|
|
|
|
return {"items": items, "total": raw_data[0]['total_count']}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
traceback.print_exc()
|
|
|
|
|
return {"items": [], "total": 0}
|
|
|
|
|
|
|
|
|
|
# --- 4. 联想建议 ---
|
|
|
|
|
def suggest_nodes(self, keyword: str):
|
|
|
|
|
if not keyword: return []
|
|
|
|
|
try:
|
|
|
|
|
kw = unquote(str(keyword)).strip()
|
|
|
|
|
cypher = "MATCH (n) WHERE n.name CONTAINS $kw AND n.name <> '未命名' RETURN DISTINCT n.name as name LIMIT 15"
|
|
|
|
|
results = self.db.execute_read(cypher, {"kw": kw})
|
|
|
|
|
db_suggestions = [row["name"] for row in results if row["name"]]
|
|
|
|
|
suffix_suggestions = [f"{kw}片", f"{kw}胶囊", f"{kw}注射液"]
|
|
|
|
|
final_res = list(dict.fromkeys(db_suggestions + suffix_suggestions))
|
|
|
|
|
return final_res[:15]
|
|
|
|
|
except:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
# --- 5. 节点管理 ---
|
|
|
|
|
def add_node(self, label: str, name: str):
|
|
|
|
|
try:
|
|
|
|
|
nm = unquote(str(name)).strip()
|
|
|
|
|
if not nm: return {"success": False, "msg": "名称不能为空"}
|
|
|
|
|
create_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
|
|
|
|
|
|
# 查重
|
|
|
|
|
check_cypher = "MATCH (n) WHERE n.name = $name RETURN n LIMIT 1"
|
|
|
|
|
existing = self.db.execute_read(check_cypher, {"name": nm})
|
|
|
|
|
if existing:
|
|
|
|
|
return {"success": False, "msg": f"添加失败:已存在名为 '{nm}' 的节点"}
|
|
|
|
|
|
|
|
|
|
# nodeId 使用当前毫秒时间戳,确保其为业务可读的短ID
|
|
|
|
|
create_cypher = f"""
|
|
|
|
|
CREATE (n:`{label}` {{
|
|
|
|
|
name: $name,
|
|
|
|
|
nodeId: timestamp(),
|
|
|
|
|
createTime: $createTime
|
|
|
|
|
}})
|
|
|
|
|
RETURN n
|
|
|
|
|
"""
|
|
|
|
|
# 使用 write_and_return 确保能拿到返回对象,从而判断成功
|
|
|
|
|
result = self.db.execute_write_and_return(create_cypher, {"name": nm, "createTime": create_time})
|
|
|
|
|
if result:
|
|
|
|
|
return {"success": True, "msg": "添加成功"}
|
|
|
|
|
return {"success": False, "msg": "节点创建失败"}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return {"success": False, "msg": f"写入失败: {str(e)}"}
|
|
|
|
|
|
|
|
|
|
def update_node(self, node_id: str, name: str, label: str):
|
|
|
|
|
try:
|
|
|
|
|
nm = unquote(str(name)).strip()
|
|
|
|
|
if not nm: return {"success": False, "msg": "名称不能为空"}
|
|
|
|
|
|
|
|
|
|
# 排除自身查重
|
|
|
|
|
check_name = "MATCH (n) WHERE n.name = $name AND elementId(n) <> $id RETURN n LIMIT 1"
|
|
|
|
|
existing = self.db.execute_read(check_name, {"name": nm, "id": node_id})
|
|
|
|
|
if existing:
|
|
|
|
|
return {"success": False, "msg": f"修改失败:库中已有其他名为 '{nm}' 的节点"}
|
|
|
|
|
|
|
|
|
|
# 修改标签需要先移除旧标签(Neo4j不支持直接覆盖所有标签)
|
|
|
|
|
cypher = f"""
|
|
|
|
|
MATCH (n) WHERE elementId(n) = $id
|
|
|
|
|
SET n.name = $name
|
|
|
|
|
WITH n
|
|
|
|
|
REMOVE n:Drug:Disease:Symptom:Entity:Medicine:Check:Food
|
|
|
|
|
WITH n
|
|
|
|
|
SET n:`{label}`
|
|
|
|
|
RETURN n
|
|
|
|
|
"""
|
|
|
|
|
result = self.db.execute_write_and_return(cypher, {"id": node_id, "name": nm})
|
|
|
|
|
if result:
|
|
|
|
|
return {"success": True, "msg": "节点修改成功"}
|
|
|
|
|
else:
|
|
|
|
|
return {"success": False, "msg": "找不到该节点或更新失败"}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return {"success": False, "msg": str(e)}
|
|
|
|
|
|
|
|
|
|
def delete_node(self, node_id: str):
|
|
|
|
|
try:
|
|
|
|
|
# 使用 RETURN count(n) 来确认是否执行了删除
|
|
|
|
|
cypher = "MATCH (n) WHERE elementId(n) = $id DETACH DELETE n RETURN 1 as deleted"
|
|
|
|
|
result = self.db.execute_write_and_return(cypher, {"id": node_id})
|
|
|
|
|
if result:
|
|
|
|
|
return {"success": True, "msg": "删除成功"}
|
|
|
|
|
return {"success": False, "msg": "节点不存在或已被删除"}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return {"success": False, "msg": str(e)}
|
|
|
|
|
|
|
|
|
|
def get_all_labels(self):
|
|
|
|
|
cypher = "CALL db.labels()"
|
|
|
|
|
try:
|
|
|
|
|
results = self.db.execute_read(cypher)
|
|
|
|
|
# 处理 Neo4j 返回的列表格式
|
|
|
|
|
labels = [list(row.values())[0] for row in results]
|
|
|
|
|
return labels if labels else ["Drug", "Disease", "Symptom"]
|
|
|
|
|
except:
|
|
|
|
|
return ["Drug", "Disease", "Symptom"]
|
|
|
|
|
|
|
|
|
|
# --- 6. 关系管理 ---
|
|
|
|
|
def add_relationship(self, source_name: str, target_name: str, rel_type: str, rel_label: str):
|
|
|
|
|
try:
|
|
|
|
|
# 1. 数据清洗
|
|
|
|
|
s = unquote(str(source_name)).strip()
|
|
|
|
|
t = unquote(str(target_name)).strip()
|
|
|
|
|
l = str(rel_label).strip()
|
|
|
|
|
clean_rel_type = rel_type.strip().replace("`", "")
|
|
|
|
|
create_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
|
|
|
|
|
|
# 2. 预检:节点是否存在 (仿照 add_node 的 check_cypher)
|
|
|
|
|
# 使用 LIMIT 1 避免同名节点导致的笛卡尔积重复
|
|
|
|
|
check_nodes = """
|
|
|
|
|
OPTIONAL MATCH (a) WHERE a.name = $s
|
|
|
|
|
WITH a LIMIT 1
|
|
|
|
|
OPTIONAL MATCH (b) WHERE b.name = $t
|
|
|
|
|
WITH a, b LIMIT 1
|
|
|
|
|
RETURN a IS NOT NULL as hasA, b IS NOT NULL as hasB
|
|
|
|
|
"""
|
|
|
|
|
nodes_res = self.db.execute_read(check_nodes, {"s": s, "t": t})
|
|
|
|
|
|
|
|
|
|
if not nodes_res or not nodes_res[0]['hasA'] or not nodes_res[0]['hasB']:
|
|
|
|
|
err_msg = "添加失败: "
|
|
|
|
|
if not nodes_res[0]['hasA']: err_msg += f"起始节点'{s}'不存在; "
|
|
|
|
|
if not nodes_res[0]['hasB']: err_msg += f"结束节点'{t}'不存在"
|
|
|
|
|
return {"success": False, "msg": err_msg}
|
|
|
|
|
|
|
|
|
|
# 3. 查重:检查是否已存在相同关系 (仿照 add_node 的查重逻辑)
|
|
|
|
|
check_rel = f"MATCH (a {{name: $s}})-[r:`{clean_rel_type}`]->(b {{name: $t}}) RETURN r LIMIT 1"
|
|
|
|
|
existing_rel = self.db.execute_read(check_rel, {"s": s, "t": t})
|
|
|
|
|
if existing_rel:
|
|
|
|
|
return {"success": False, "msg": f"添加失败:已存在该关系"}
|
|
|
|
|
|
|
|
|
|
# 4. 写入:创建新关系
|
|
|
|
|
# 同样使用 WITH...LIMIT 1 确保即使有重名点也只连一对线
|
|
|
|
|
create_cypher = f"""
|
|
|
|
|
MATCH (a {{name: $s}}), (b {{name: $t}})
|
|
|
|
|
WITH a, b LIMIT 1
|
|
|
|
|
CREATE (a)-[r:`{clean_rel_type}` {{
|
|
|
|
|
label: $l,
|
|
|
|
|
createTime: $create_time
|
|
|
|
|
}}]->(b)
|
|
|
|
|
RETURN r
|
|
|
|
|
"""
|
|
|
|
|
result = self.db.execute_write_and_return(create_cypher, {
|
|
|
|
|
"s": s, "t": t, "l": l, "create_time": create_time
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if result:
|
|
|
|
|
return {"success": True, "msg": "添加成功"}
|
|
|
|
|
return {"success": False, "msg": "关系创建失败"}
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
traceback.print_exc()
|
|
|
|
|
return {"success": False, "msg": f"数据库写入异常: {str(e)}"}
|
|
|
|
|
|
|
|
|
|
def update_relationship(self, rel_id: str, source_name: str, target_name: str, rel_type: str, rel_label: str):
|
|
|
|
|
try:
|
|
|
|
|
s = unquote(str(source_name)).strip()
|
|
|
|
|
t = unquote(str(target_name)).strip()
|
|
|
|
|
l = str(rel_label).strip()
|
|
|
|
|
|
|
|
|
|
find_old = "MATCH (a)-[r]->(b) WHERE elementId(r) = $id RETURN type(r) as type, a.name as s, b.name as t"
|
|
|
|
|
old = self.db.execute_read(find_old, {"id": rel_id})
|
|
|
|
|
if not old: return {"success": False, "msg": "修改失败:原关系不存在"}
|
|
|
|
|
|
|
|
|
|
# 如果只是修改 label,不修改节点和类型,则直接 SET
|
|
|
|
|
if old[0]['s'] == s and old[0]['t'] == t and old[0]['type'] == rel_type:
|
|
|
|
|
update_cypher = "MATCH ()-[r]->() WHERE elementId(r) = $id SET r.label = $l RETURN r"
|
|
|
|
|
self.db.execute_write_and_return(update_cypher, {"id": rel_id, "l": l})
|
|
|
|
|
return {"success": True, "msg": "修改成功"}
|
|
|
|
|
else:
|
|
|
|
|
# 涉及节点或类型变动,由于 Neo4j 不支持直接更改关系类型,需删掉重建
|
|
|
|
|
self.delete_relationship(rel_id)
|
|
|
|
|
return self.add_relationship(s, t, rel_type, l)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
traceback.print_exc()
|
|
|
|
|
return {"success": False, "msg": f"修改异常: {str(e)}"}
|
|
|
|
|
|
|
|
|
|
def delete_relationship(self, rel_id: str):
|
|
|
|
|
try:
|
|
|
|
|
cypher = "MATCH ()-[r]->() WHERE elementId(r) = $id DELETE r RETURN 1"
|
|
|
|
|
result = self.db.execute_write_and_return(cypher, {"id": rel_id})
|
|
|
|
|
return {"success": True, "msg": "删除成功"} if result else {"success": False, "msg": "关系不存在"}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return {"success": False, "msg": f"删除失败: {str(e)}"}
|