You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
100 lines
3.1 KiB
100 lines
3.1 KiB
import pandas as pd
|
|
from neo4j import GraphDatabase
|
|
import numpy as np
|
|
|
|
# === 配置 ===
|
|
EXCEL_PATH = r"C:\Users\hanyuqing\Desktop\最新国家医保ICD编码\最新国家医保ICD编码\ICD-10医保版数据.xlsx"
|
|
NEO4J_URI = "bolt://localhost:7687"
|
|
NEO4J_USER = "neo4j"
|
|
NEO4J_PASSWORD = "12345678"
|
|
|
|
# === 读取 Excel ===
|
|
df = pd.read_excel(
|
|
EXCEL_PATH,
|
|
header=1,
|
|
dtype=str, # 关键:防止 A00.0 变成 A00
|
|
engine='openpyxl'
|
|
)
|
|
|
|
# 清理列名
|
|
df.columns = df.columns.astype(str).str.strip()
|
|
|
|
# 必需列
|
|
required_cols = [
|
|
"条目(诊断)名称", "条目(诊断)代码",
|
|
"亚目名称", "亚目代码",
|
|
"章代码范围", "节代码范围", "类目代码"
|
|
]
|
|
|
|
# 检查列是否存在
|
|
missing = [col for col in required_cols if col not in df.columns]
|
|
if missing:
|
|
raise ValueError(f"缺少必要列: {missing}")
|
|
|
|
# 替换 NaN 为 None(便于后续判断)
|
|
df = df.replace({np.nan: None})
|
|
|
|
# === 构造 name 和 code 字段(按你的逻辑)===
|
|
def get_disease_name(row):
|
|
diag_name = row["条目(诊断)名称"]
|
|
subcat_name = row["亚目名称"]
|
|
# 如果诊断名称为空(None 或 空白),用亚目名称
|
|
if not diag_name or str(diag_name).strip() == "":
|
|
return str(subcat_name).strip() if subcat_name else None
|
|
return str(diag_name).strip()
|
|
|
|
def get_diagnosis_code(row):
|
|
code = row["条目(诊断)代码"]
|
|
# 如果为空,返回空字符串 ""
|
|
if not code or str(code).strip() == "":
|
|
return ""
|
|
return str(code).strip()
|
|
|
|
# 应用逻辑
|
|
df["_disease_name"] = df.apply(get_disease_name, axis=1)
|
|
df["_diagnosis_code"] = df.apply(get_diagnosis_code, axis=1)
|
|
|
|
# 过滤掉 name 仍为 None 的行(即 诊断名 + 亚目名 都为空)
|
|
df = df[df["_disease_name"].notna() & (df["_disease_name"] != "")]
|
|
|
|
print(f"✅ 共准备 {len(df)} 条疾病记录用于导入")
|
|
|
|
# === Neo4j 连接 ===
|
|
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
|
|
|
|
def upsert_disease(tx, record):
|
|
cypher = """
|
|
MERGE (d:Disease {name: $name})
|
|
ON CREATE SET
|
|
d.diagnosisCode = $diagnosisCode,
|
|
d.chapterRange = $chapterRange,
|
|
d.sectionRange = $sectionRange,
|
|
d.categoryCode = $categoryCode,
|
|
d.subcategoryCode = $subcategoryCode
|
|
ON MATCH SET
|
|
d.diagnosisCode = $diagnosisCode,
|
|
d.chapterRange = $chapterRange,
|
|
d.sectionRange = $sectionRange,
|
|
d.categoryCode = $categoryCode,
|
|
d.subcategoryCode = $subcategoryCode
|
|
"""
|
|
tx.run(cypher, {
|
|
"name": record["_disease_name"],
|
|
"diagnosisCode": record["_diagnosis_code"],
|
|
"chapterRange": record.get("章代码范围") or "",
|
|
"sectionRange": record.get("节代码范围") or "",
|
|
"categoryCode": record.get("类目代码") or "",
|
|
"subcategoryCode": record.get("亚目代码") or ""
|
|
})
|
|
|
|
# === 批量导入 ===
|
|
with driver.session() as session:
|
|
for idx, row in df.iterrows():
|
|
try:
|
|
session.execute_write(upsert_disease, row.to_dict())
|
|
except Exception as e:
|
|
print(f"❌ 第 {idx + 2} 行失败: {e}")
|
|
continue
|
|
|
|
print("✅ 数据导入完成!")
|
|
driver.close()
|