import pandas as pd from neo4j import GraphDatabase import numpy as np # === 配置 === EXCEL_PATH = r"C:\Users\hanyuqing\Desktop\最新国家医保ICD编码\最新国家医保ICD编码\ICD-10医保版数据.xlsx" NEO4J_URI = "bolt://localhost:7687" NEO4J_USER = "neo4j" NEO4J_PASSWORD = "12345678" # === 读取 Excel === df = pd.read_excel( EXCEL_PATH, header=1, dtype=str, # 关键:防止 A00.0 变成 A00 engine='openpyxl' ) # 清理列名 df.columns = df.columns.astype(str).str.strip() # 必需列 required_cols = [ "条目(诊断)名称", "条目(诊断)代码", "亚目名称", "亚目代码", "章代码范围", "节代码范围", "类目代码" ] # 检查列是否存在 missing = [col for col in required_cols if col not in df.columns] if missing: raise ValueError(f"缺少必要列: {missing}") # 替换 NaN 为 None(便于后续判断) df = df.replace({np.nan: None}) # === 构造 name 和 code 字段(按你的逻辑)=== def get_disease_name(row): diag_name = row["条目(诊断)名称"] subcat_name = row["亚目名称"] # 如果诊断名称为空(None 或 空白),用亚目名称 if not diag_name or str(diag_name).strip() == "": return str(subcat_name).strip() if subcat_name else None return str(diag_name).strip() def get_diagnosis_code(row): code = row["条目(诊断)代码"] # 如果为空,返回空字符串 "" if not code or str(code).strip() == "": return "" return str(code).strip() # 应用逻辑 df["_disease_name"] = df.apply(get_disease_name, axis=1) df["_diagnosis_code"] = df.apply(get_diagnosis_code, axis=1) # 过滤掉 name 仍为 None 的行(即 诊断名 + 亚目名 都为空) df = df[df["_disease_name"].notna() & (df["_disease_name"] != "")] print(f"✅ 共准备 {len(df)} 条疾病记录用于导入") # === Neo4j 连接 === driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) def upsert_disease(tx, record): cypher = """ MERGE (d:Disease {name: $name}) ON CREATE SET d.diagnosisCode = $diagnosisCode, d.chapterRange = $chapterRange, d.sectionRange = $sectionRange, d.categoryCode = $categoryCode, d.subcategoryCode = $subcategoryCode ON MATCH SET d.diagnosisCode = $diagnosisCode, d.chapterRange = $chapterRange, d.sectionRange = $sectionRange, d.categoryCode = $categoryCode, d.subcategoryCode = $subcategoryCode """ tx.run(cypher, { "name": record["_disease_name"], "diagnosisCode": record["_diagnosis_code"], "chapterRange": record.get("章代码范围") or "", "sectionRange": record.get("节代码范围") or "", "categoryCode": record.get("类目代码") or "", "subcategoryCode": record.get("亚目代码") or "" }) # === 批量导入 === with driver.session() as session: for idx, row in df.iterrows(): try: session.execute_write(upsert_disease, row.to_dict()) except Exception as e: print(f"❌ 第 {idx + 2} 行失败: {e}") continue print("✅ 数据导入完成!") driver.close()