You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

179 lines
5.7 KiB

4 months ago
from typing import Dict, List, Any, Optional
3 months ago
from pypinyin import lazy_pinyin, Style
from util.neo4j_utils import Neo4jUtil, neo4j_client
4 months ago
3 months ago
def convert_node_to_g6_v5(neo4j_node: dict) -> dict:
4 months ago
node_id = neo4j_node.get("id")
if node_id is None:
raise ValueError("节点必须包含 'id' 字段")
data = {k: v for k, v in neo4j_node.items() if k != "id"}
if "name" not in data and "label" not in data:
data["name"] = str(node_id)
3 months ago
return {
4 months ago
"id": node_id,
"data": data,
"states": [],
"combo": None
}
3 months ago
def build_g6_graph_data_from_results(
nodes: List[Dict[str, Any]],
relationships: List[Dict[str, Any]]
) -> dict:
4 months ago
"""
3 months ago
通用方法根据节点列表和关系列表构建 G6 v5 图数据
4 months ago
3 months ago
Args:
nodes: 节点列表每个节点需含 "id"
relationships: 关系列表每个关系需含:
- source: {"id": ..., ...}
- target: {"id": ..., ...}
- relationship: {"type": str, "properties": dict} 或直接扁平化字段
4 months ago
3 months ago
Returns:
{"nodes": [...], "edges": [...]}
"""
g6_node_map = {}
4 months ago
3 months ago
# 处理显式传入的节点
for node in nodes:
4 months ago
node_id = node.get("id")
if node_id:
g6_node_map[node_id] = convert_node_to_g6_v5(node)
3 months ago
g6_edges = []
4 months ago
for rel in relationships:
source_node = rel.get("source")
target_node = rel.get("target")
if not source_node or not target_node:
continue
source_id = source_node.get("id")
target_id = target_node.get("id")
if not source_id or not target_id:
continue
3 months ago
# 确保 source/target 节点也加入图中(即使未在 nodes 中显式提供)
if source_id not in g6_node_map:
g6_node_map[source_id] = convert_node_to_g6_v5(source_node)
if target_id not in g6_node_map:
g6_node_map[target_id] = convert_node_to_g6_v5(target_node)
4 months ago
3 months ago
# 构建 edge data
4 months ago
edge_data = {}
3 months ago
rel_type_str = rel.get("type") or rel.get("relationship") # 兼容不同结构
4 months ago
if rel_type_str:
edge_data["relationship"] = rel_type_str
3 months ago
# 尝试从 relProps 或 properties 或顶层提取关系属性
rel_props = (
rel.get("relProps") or
rel.get("properties") or
{k: v for k, v in rel.items() if k not in ("source", "target", "type", "relationship")}
)
if isinstance(rel_props, dict):
edge_data.update(rel_props)
4 months ago
g6_edge = {
"source": source_id,
"target": target_id,
3 months ago
"type": "line",
4 months ago
"data": edge_data,
"states": []
}
g6_edges.append(g6_edge)
3 months ago
return {
4 months ago
"nodes": list(g6_node_map.values()),
"edges": g6_edges
}
3 months ago
def build_g6_subgraph_by_props(
neo4j_util: Neo4jUtil,
node_properties: Dict[str, Any],
node_label: Optional[str] = None,
direction: str = "both",
rel_type: Optional[str] = None
) -> dict:
neighbor_list = neo4j_util.find_neighbors_with_relationships(
node_label=node_label,
node_properties=node_properties,
direction=direction,
rel_type=rel_type
)
# 提取所有唯一节点
node_dict = {}
for item in neighbor_list:
for key in ["source", "target"]:
n = item[key]
nid = n.get("id")
if nid and nid not in node_dict:
node_dict[nid] = n
# 如果没找到关系,但中心节点存在,也要包含它
if not neighbor_list:
center_nodes = neo4j_util.find_nodes_with_element_id(node_label, node_properties)
if center_nodes:
n = center_nodes[0]
node_dict[n["id"]] = n
nodes = list(node_dict.values())
relationships = neighbor_list # 结构已兼容
return build_g6_graph_data_from_results(nodes, relationships)
def get_drug_names_from_neo4j():
"""安全获取全部 Drug.name,支持大数据量"""
cypher = "MATCH (d:Drug) WHERE d.name IS NOT NULL RETURN d.name AS name"
results = neo4j_client.execute_read(cypher)
names = []
for record in results:
name = record.get("name")
if name is not None: # 再次过滤 None
names.append(name)
print(f"[DEBUG] Loaded {len(names)} drug names from Neo4j") # 打印实际数量
return names
def get_check_names_from_neo4j():
"""安全获取全部 Drug.name,支持大数据量"""
cypher = "MATCH (d:Check) WHERE d.name IS NOT NULL RETURN d.name AS name"
results = neo4j_client.execute_read(cypher)
names = []
for record in results:
name = record.get("name")
if name is not None: # 再次过滤 None
names.append(name)
print(f"[DEBUG] Loaded {len(names)} check names from Neo4j") # 打印实际数量
return names
3 months ago
def get_disease_names_from_neo4j():
cypher = "MATCH (d:Disease) RETURN d.name AS name"
results = neo4j_client.execute_read(cypher)
return [record["name"] for record in results if record.get("name")]
3 months ago
def get_group_key(name: str) -> str:
if not name or not isinstance(name, str):
return "其他"
name = name.strip()
if not name:
return "其他"
for char in name:
if char.isdigit():
return "0-9"
if char.isalpha() and char.isascii():
return char.upper()
if '\u4e00' <= char <= '\u9fff': # 中文
try:
first_letter = lazy_pinyin(char, style=Style.FIRST_LETTER)[0].upper()
if 'A' <= first_letter <= 'Z':
return first_letter
except Exception:
continue
# 其他字符:跳过
return "其他" # 所有无法归类的