You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
203 lines
8.8 KiB
203 lines
8.8 KiB
from util.neo4j_utils import neo4j_client
|
|
from urllib.parse import unquote
|
|
import traceback
|
|
|
|
class OperationService:
|
|
def __init__(self):
|
|
self.db = neo4j_client
|
|
|
|
# --- 1. 节点查询:去掉强制命名的逻辑 ---
|
|
def get_nodes_subset(self, page: int = 1, page_size: int = 20):
|
|
skip_val = int((page - 1) * page_size)
|
|
limit_val = int(page_size)
|
|
# 去掉 Cypher 中的排序干扰,确保 ID 稳定
|
|
cypher = """
|
|
MATCH (n)
|
|
WITH count(n) AS total_count
|
|
MATCH (n)
|
|
RETURN elementId(n) AS id, labels(n) AS labels, n.name AS name, n.nodeId AS nodeId, total_count AS total
|
|
ORDER BY toInteger(n.nodeId) DESC
|
|
SKIP $skip LIMIT $limit
|
|
"""
|
|
try:
|
|
params = {"skip": skip_val, "limit": limit_val}
|
|
raw_data = self.db.execute_read(cypher, params)
|
|
if not raw_data: return {"items": [], "total": 0}
|
|
items = []
|
|
for item in raw_data:
|
|
items.append({
|
|
"id": item["id"],
|
|
"labels": item["labels"],
|
|
# 修复:不再强转为“未命名”,保留真实状态
|
|
"name": item.get("name"),
|
|
"nodeId": item.get("nodeId") or 0
|
|
})
|
|
return {"items": items, "total": raw_data[0]['total']}
|
|
except Exception as e:
|
|
print(f"Service Error (Nodes): {e}")
|
|
return {"items": [], "total": 0}
|
|
|
|
# --- 2. 关系查询:去掉 coalesce 强制命名 ---
|
|
def get_relationships_subset(self, page: int = 1, page_size: int = 20):
|
|
skip_val = int((page - 1) * page_size)
|
|
limit_val = int(page_size)
|
|
# 修复:移除 coalesce(..., "未知节点")
|
|
cypher = """
|
|
MATCH (a)-[r]->(b)
|
|
WITH count(r) AS total_count
|
|
MATCH (a)-[r]->(b)
|
|
RETURN elementId(r) as id,
|
|
type(r) as type,
|
|
r.label as label,
|
|
a.name as source,
|
|
b.name as target,
|
|
coalesce(r.createTime, 0) as createTime,
|
|
total_count
|
|
ORDER BY createTime DESC
|
|
SKIP $skip LIMIT $limit
|
|
"""
|
|
try:
|
|
params = {"skip": skip_val, "limit": limit_val}
|
|
raw_data = self.db.execute_read(cypher, params)
|
|
if not raw_data: return {"items": [], "total": 0}
|
|
items = []
|
|
for row in raw_data:
|
|
items.append({
|
|
"id": row["id"],
|
|
"type": row["type"],
|
|
"label": row["label"] if row["label"] is not None else "",
|
|
"source": row["source"],
|
|
"target": row["target"]
|
|
})
|
|
return {"items": items, "total": raw_data[0]['total_count']}
|
|
except Exception as e:
|
|
print(f"Service Error (Rels): {e}")
|
|
return {"items": [], "total": 0}
|
|
|
|
# --- 3. 联想建议:排除“未命名”干扰 ---
|
|
def suggest_nodes(self, keyword: str):
|
|
if not keyword: return []
|
|
try:
|
|
kw = unquote(str(keyword)).strip()
|
|
# 增加过滤:不返回名为“未命名”的建议
|
|
cypher = """
|
|
MATCH (n)
|
|
WHERE n.name CONTAINS $kw AND n.name <> '未命名'
|
|
RETURN DISTINCT n.name as name LIMIT 15
|
|
"""
|
|
results = self.db.execute_read(cypher, {"kw": kw})
|
|
db_suggestions = [row["name"] for row in results if row["name"]]
|
|
suffix_suggestions = [f"{kw}片", f"{kw}胶囊", f"{kw}注射液"]
|
|
final_res = list(dict.fromkeys(db_suggestions + suffix_suggestions))
|
|
return final_res[:15]
|
|
except:
|
|
return []
|
|
|
|
# --- 4. 节点管理 ---
|
|
def add_node(self, label: str, name: str):
|
|
try:
|
|
nm = str(name).strip()
|
|
if not nm: return {"success": False, "msg": "名称不能为空"}
|
|
|
|
check_cypher = "MATCH (n) WHERE n.name = $name RETURN n LIMIT 1"
|
|
existing = self.db.execute_read(check_cypher, {"name": nm})
|
|
if existing:
|
|
return {"success": False, "msg": f"添加失败:已存在名为 '{nm}' 的节点"}
|
|
|
|
create_cypher = f"CREATE (n:`{label}` {{name: $name, nodeId: timestamp()}}) RETURN elementId(n) as id"
|
|
self.db.execute_write(create_cypher, {"name": nm})
|
|
return {"success": True, "msg": "添加成功"}
|
|
except Exception as e:
|
|
return {"success": False, "msg": f"写入失败: {str(e)}"}
|
|
|
|
def update_node(self, node_id: str, name: str, label: str):
|
|
try:
|
|
nm = str(name).strip()
|
|
if not nm: return {"success": False, "msg": "名称不能为空"}
|
|
|
|
check_name = "MATCH (n) WHERE n.name = $name AND elementId(n) <> $id RETURN n LIMIT 1"
|
|
existing = self.db.execute_read(check_name, {"name": nm, "id": node_id})
|
|
if existing:
|
|
return {"success": False, "msg": f"修改失败:库中已有其他名为 '{nm}' 的节点"}
|
|
|
|
cypher = f"""
|
|
MATCH (n) WHERE elementId(n) = $id
|
|
SET n.name = $name, n.nodeId = timestamp()
|
|
WITH n
|
|
REMOVE n:Drug:Disease:Symptom:Entity
|
|
WITH n
|
|
SET n:`{label}`
|
|
RETURN n
|
|
"""
|
|
result = self.db.execute_write(cypher, {"id": node_id, "name": nm})
|
|
return {"success": True, "msg": "节点修改成功"} if result else {"success": False, "msg": "找不到该节点"}
|
|
except Exception as e:
|
|
return {"success": False, "msg": str(e)}
|
|
|
|
def delete_node(self, node_id: str):
|
|
cypher = "MATCH (n) WHERE elementId(n) = $id DETACH DELETE n"
|
|
return self.db.execute_write(cypher, {"id": node_id})
|
|
|
|
def get_all_labels(self):
|
|
cypher = "CALL db.labels()"
|
|
try:
|
|
results = self.db.execute_read(cypher)
|
|
labels = [list(row.values())[0] for row in results]
|
|
# 排除掉 Neo4j 默认的一些内部标签(如果有)
|
|
return labels if labels else ["Drug", "Disease", "Symptom"]
|
|
except:
|
|
return ["Drug", "Disease", "Symptom"]
|
|
|
|
# --- 5. 关系管理 ---
|
|
def add_relationship(self, source_name: str, target_name: str, rel_type: str, rel_label: str):
|
|
try:
|
|
s, t, l = str(source_name).strip(), str(target_name).strip(), str(rel_label).strip()
|
|
|
|
check_nodes = """
|
|
OPTIONAL MATCH (a) WHERE a.name = $s
|
|
OPTIONAL MATCH (b) WHERE b.name = $t
|
|
RETURN a IS NOT NULL as hasA, b IS NOT NULL as hasB
|
|
"""
|
|
exists = self.db.execute_read(check_nodes, {"s": s, "t": t})
|
|
|
|
if not exists or not exists[0]['hasA'] or not exists[0]['hasB']:
|
|
err_msg = "添加失败: "
|
|
if not exists[0]['hasA']: err_msg += f"起始节点'{s}'不存在; "
|
|
if not exists[0]['hasB']: err_msg += f"结束节点'{t}'不存在"
|
|
return {"success": False, "msg": err_msg}
|
|
|
|
cypher = f"""
|
|
MATCH (a {{name: $s}}), (b {{name: $t}})
|
|
MERGE (a)-[r:`{rel_type}`]->(b)
|
|
ON CREATE SET r.label = $l, r.createTime = timestamp()
|
|
ON MATCH SET r.label = $l, r.updateTime = timestamp()
|
|
RETURN r
|
|
"""
|
|
self.db.execute_write(cypher, {"s": s, "t": t, "l": l})
|
|
return {"success": True, "msg": "操作成功"}
|
|
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return {"success": False, "msg": f"数据库写入异常: {str(e)}"}
|
|
|
|
def update_relationship(self, rel_id: str, source_name: str, target_name: str, rel_type: str, rel_label: str):
|
|
try:
|
|
s, t, l = str(source_name).strip(), str(target_name).strip(), str(rel_label).strip()
|
|
find_old = "MATCH (a)-[r]->(b) WHERE elementId(r) = $id RETURN type(r) as type, a.name as s, b.name as t"
|
|
old = self.db.execute_read(find_old, {"id": rel_id})
|
|
if not old: return {"success": False, "msg": "修改失败:原关系不存在"}
|
|
|
|
if old[0]['s'] != s or old[0]['t'] != t or old[0]['type'] != rel_type:
|
|
self.delete_relationship(rel_id)
|
|
return self.add_relationship(s, t, rel_type, l)
|
|
else:
|
|
update_cypher = "MATCH ()-[r]->() WHERE elementId(r) = $id SET r.label = $l, r.updateTime = timestamp() RETURN r"
|
|
self.db.execute_write(update_cypher, {"id": rel_id, "l": l})
|
|
return {"success": True, "msg": "修改成功"}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return {"success": False, "msg": f"修改异常: {str(e)}"}
|
|
|
|
def delete_relationship(self, rel_id: str):
|
|
cypher = "MATCH ()-[r]->() WHERE elementId(r) = $id DELETE r"
|
|
return self.db.execute_write(cypher, {"id": rel_id})
|