import robyn from robyn import Robyn, jsonify, Response from typing import Optional, List, Any, Dict from util.neo4j_utils import Neo4jUtil from util.neo4j_utils import neo4j_client def convert_node_to_g6_v5(neo4j_node: dict) -> dict: node_id = neo4j_node.get("id") if node_id is None: raise ValueError("节点必须包含 'id' 字段") data = {k: v for k, v in neo4j_node.items() if k != "id"} if "name" not in data and "label" not in data: data["name"] = str(node_id) return { "id": node_id, "data": data, "states": [], "combo": None } def build_g6_graph_data_from_results( nodes: List[Dict[str, Any]], relationships: List[Dict[str, Any]] ) -> dict: """ 通用方法:根据节点列表和关系列表构建 G6 v5 图数据 Args: nodes: 节点列表,每个节点需含 "id" relationships: 关系列表,每个关系需含: - source: {"id": ..., ...} - target: {"id": ..., ...} - relationship: {"type": str, "properties": dict} 或直接扁平化字段 Returns: {"nodes": [...], "edges": [...]} """ g6_node_map = {} # 处理显式传入的节点 for node in nodes: node_id = node.get("id") if node_id: g6_node_map[node_id] = convert_node_to_g6_v5(node) g6_edges = [] for rel in relationships: source_node = rel.get("source") target_node = rel.get("target") if not source_node or not target_node: continue source_id = source_node.get("id") target_id = target_node.get("id") if not source_id or not target_id: continue # 确保 source/target 节点也加入图中(即使未在 nodes 中显式提供) if source_id not in g6_node_map: g6_node_map[source_id] = convert_node_to_g6_v5(source_node) if target_id not in g6_node_map: g6_node_map[target_id] = convert_node_to_g6_v5(target_node) # 构建 edge data edge_data = {} rel_type_str = rel.get("type") or rel.get("relationship") # 兼容不同结构 if rel_type_str: edge_data["relationship"] = rel_type_str # 尝试从 relProps 或 properties 或顶层提取关系属性 rel_props = ( rel.get("relProps") or rel.get("properties") or {k: v for k, v in rel.items() if k not in ("source", "target", "type", "relationship")} ) if isinstance(rel_props, dict): edge_data.update(rel_props) g6_edge = { "source": source_id, "target": target_id, "type": "line", "data": edge_data, "states": [] } g6_edges.append(g6_edge) return { "nodes": list(g6_node_map.values()), "edges": g6_edges } def build_g6_subgraph_by_props( neo4j_util: Neo4jUtil, node_label: str, node_properties: Dict[str, Any], direction: str = "both", rel_type: Optional[str] = None ) -> dict: neighbor_list = neo4j_util.find_neighbors_with_relationships( node_label=node_label, node_properties=node_properties, direction=direction, rel_type=rel_type ) # 提取所有唯一节点 node_dict = {} for item in neighbor_list: for key in ["source", "target"]: n = item[key] nid = n.get("id") if nid and nid not in node_dict: node_dict[nid] = n # 如果没找到关系,但中心节点存在,也要包含它 if not neighbor_list: center_nodes = neo4j_util.find_nodes_with_element_id(node_label, node_properties) if center_nodes: n = center_nodes[0] node_dict[n["id"]] = n nodes = list(node_dict.values()) relationships = neighbor_list # 结构已兼容 return build_g6_graph_data_from_results(nodes, relationships) # ====================== # Robyn App # ====================== app = Robyn(__file__) @app.get("/getData") def get_data(): try: graph_data = build_g6_subgraph_by_props( neo4j_client, node_label="Disease", node_properties={"name": "偏头痛"}, direction="both", rel_type=None ) return Response( status_code=200, description=jsonify(graph_data), headers={"Content-Type": "text/plain; charset=utf-8"} ) except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == "__main__": app.start(host="0.0.0.0", port=8088)