|
|
|
@ -1,4 +1,4 @@ |
|
|
|
from robyn import jsonify, Response |
|
|
|
from robyn import jsonify, Response, Request |
|
|
|
from app import app |
|
|
|
from datetime import datetime, timedelta |
|
|
|
import uuid |
|
|
|
@ -14,11 +14,34 @@ def generate_token() -> str: |
|
|
|
"""生成随机token""" |
|
|
|
return str(uuid.uuid4()) |
|
|
|
|
|
|
|
def validate_token(token: str) -> dict: |
|
|
|
"""验证token并返回用户信息""" |
|
|
|
if not token or token not in TEMP_TOKENS: |
|
|
|
return None |
|
|
|
|
|
|
|
# 检查token是否过期 |
|
|
|
if datetime.now() > TEMP_TOKENS[token]["expires_at"]: |
|
|
|
del TEMP_TOKENS[token] |
|
|
|
return None |
|
|
|
|
|
|
|
return TEMP_TOKENS[token]["user"] |
|
|
|
|
|
|
|
@app.post("/api/login") |
|
|
|
def login_route(request): |
|
|
|
"""登录接口""" |
|
|
|
try: |
|
|
|
request_data = json.loads(request.body) if request.body else {} |
|
|
|
# 解析请求数据 |
|
|
|
request_data = {} |
|
|
|
if request.body: |
|
|
|
try: |
|
|
|
request_data = json.loads(request.body) |
|
|
|
except json.JSONDecodeError: |
|
|
|
pass |
|
|
|
|
|
|
|
# 如果JSON解析失败,尝试从form_data获取 |
|
|
|
if not request_data and hasattr(request, 'form_data'): |
|
|
|
request_data = getattr(request, 'form_data', {}) |
|
|
|
|
|
|
|
username = request_data.get("username", "").strip() |
|
|
|
password = request_data.get("password", "").strip() |
|
|
|
remember = request_data.get("remember", False) |
|
|
|
@ -85,26 +108,28 @@ def user_info_route(request): |
|
|
|
query_params = getattr(request, 'query_params', {}) |
|
|
|
token = query_params.get("token", "") |
|
|
|
|
|
|
|
# 验证token是否存在 |
|
|
|
if not token or token not in TEMP_TOKENS: |
|
|
|
# 验证token |
|
|
|
user = validate_token(token) |
|
|
|
if not user: |
|
|
|
return Response( |
|
|
|
status_code=401, |
|
|
|
description=jsonify({"success": False, "message": "未登录或登录已过期"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
|
|
|
|
# 检查token是否过期 |
|
|
|
if datetime.now() > TEMP_TOKENS[token]["expires_at"]: |
|
|
|
del TEMP_TOKENS[token] |
|
|
|
return Response( |
|
|
|
status_code=401, |
|
|
|
description=jsonify({"success": False, "message": "登录已过期"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
# 从数据库获取最新的用户信息 |
|
|
|
username = user["username"] |
|
|
|
db_user = user_service.get_user_info(username) |
|
|
|
if db_user: |
|
|
|
# 更新TEMP_TOKENS中的用户信息 |
|
|
|
TEMP_TOKENS[token]["user"] = db_user |
|
|
|
user_info = db_user |
|
|
|
else: |
|
|
|
user_info = user |
|
|
|
|
|
|
|
return Response( |
|
|
|
status_code=200, |
|
|
|
description=jsonify({"success": True, "user": TEMP_TOKENS[token]["user"]}), |
|
|
|
description=jsonify({"success": True, "user": user_info}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
except Exception as e: |
|
|
|
@ -115,140 +140,100 @@ def user_info_route(request): |
|
|
|
) |
|
|
|
|
|
|
|
@app.post("/api/updateAvatar") |
|
|
|
def update_avatar_route(request): |
|
|
|
async def update_avatar_route(request: Request): |
|
|
|
"""更新用户头像接口""" |
|
|
|
try: |
|
|
|
# 打印调试信息 |
|
|
|
print(f"请求类型: {type(request)}") |
|
|
|
print(f"请求属性: {dir(request)}") |
|
|
|
|
|
|
|
# 在Robyn中,文件上传的数据存储在request.files中 |
|
|
|
# 表单字段存储在request.form中 |
|
|
|
form_data = getattr(request, 'form', {}) |
|
|
|
files_data = getattr(request, 'files', {}) |
|
|
|
|
|
|
|
print(f"表单数据: {form_data}") |
|
|
|
print(f"文件数据: {files_data}") |
|
|
|
print(f"表单数据类型: {type(form_data)}") |
|
|
|
print(f"文件数据类型: {type(files_data)}") |
|
|
|
|
|
|
|
# 获取token - 尝试多种方式获取 |
|
|
|
# 从files中获取文件和token |
|
|
|
avatar_file = request.files.get('avatar') if hasattr(request, 'files') else None |
|
|
|
token = None |
|
|
|
|
|
|
|
# 方法1: 从form_data中获取 |
|
|
|
if isinstance(form_data, dict) and "token" in form_data: |
|
|
|
token = form_data["token"] |
|
|
|
print(f"从form_data获取token: {token}") |
|
|
|
|
|
|
|
# 方法2: 如果form_data不是字典,尝试其他方式 |
|
|
|
if not token and hasattr(form_data, 'get'): |
|
|
|
token = form_data.get("token", "") |
|
|
|
print(f"通过form_data.get()获取token: {token}") |
|
|
|
# 从form_data中获取token |
|
|
|
if hasattr(request, 'form_data'): |
|
|
|
token = request.form_data.get('token') |
|
|
|
|
|
|
|
# 如果files中没有直接找到'avatar',尝试获取第一个文件 |
|
|
|
if not avatar_file and hasattr(request, 'files') and request.files: |
|
|
|
first_key = list(request.files.keys())[0] |
|
|
|
avatar_file = request.files[first_key] |
|
|
|
|
|
|
|
# 方法3: 尝试从request的属性中直接获取 |
|
|
|
# 如果form_data中没有token,尝试从headers获取 |
|
|
|
if not token: |
|
|
|
# 尝试从request中获取所有可能的属性 |
|
|
|
for attr_name in dir(request): |
|
|
|
if 'token' in attr_name.lower() or 'form' in attr_name.lower(): |
|
|
|
try: |
|
|
|
attr_value = getattr(request, attr_name) |
|
|
|
print(f"request.{attr_name}: {type(attr_value)} = {attr_value}") |
|
|
|
|
|
|
|
# 如果是字典类型,检查是否包含token |
|
|
|
if isinstance(attr_value, dict) and 'token' in attr_value: |
|
|
|
token = attr_value['token'] |
|
|
|
print(f"从request.{attr_name}获取token: {token}") |
|
|
|
break |
|
|
|
except Exception as e: |
|
|
|
print(f"访问request.{attr_name}时出错: {e}") |
|
|
|
|
|
|
|
# 方法4: 从查询参数中获取 |
|
|
|
if not token: |
|
|
|
query_params = getattr(request, 'query_params', {}) |
|
|
|
if isinstance(query_params, dict) and "token" in query_params: |
|
|
|
token = query_params["token"] |
|
|
|
print(f"从查询参数获取token: {token}") |
|
|
|
|
|
|
|
# 方法5: 从headers中获取 |
|
|
|
if not token: |
|
|
|
headers = getattr(request, 'headers', {}) |
|
|
|
if isinstance(headers, dict) and "Authorization" in headers: |
|
|
|
auth_header = headers["Authorization"] |
|
|
|
if auth_header.startswith("Bearer "): |
|
|
|
token = auth_header[7:] |
|
|
|
print(f"从Authorization头获取token: {token}") |
|
|
|
|
|
|
|
print(f"最终获取的token: {token}") |
|
|
|
print(f"TEMP_TOKENS中的keys: {list(TEMP_TOKENS.keys())}") |
|
|
|
|
|
|
|
headers_dict = {} |
|
|
|
if hasattr(request, 'headers'): |
|
|
|
try: |
|
|
|
headers_dict = dict(request.headers) |
|
|
|
except: |
|
|
|
pass |
|
|
|
token = headers_dict.get('Authorization') or headers_dict.get('authorization') |
|
|
|
|
|
|
|
# 如果还是没有token,尝试从body中解析 |
|
|
|
if not token and hasattr(request, 'body'): |
|
|
|
try: |
|
|
|
body_data = json.loads(request.body) |
|
|
|
token = body_data.get('token') |
|
|
|
except: |
|
|
|
pass |
|
|
|
|
|
|
|
# 验证token |
|
|
|
if not token or token not in TEMP_TOKENS: |
|
|
|
print(f"Token验证失败: {token}") |
|
|
|
user = validate_token(token) |
|
|
|
if not user: |
|
|
|
return Response( |
|
|
|
status_code=401, |
|
|
|
description=jsonify({"success": False, "message": "未登录或登录已过期"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
|
|
|
|
# 检查token是否过期 |
|
|
|
if datetime.now() > TEMP_TOKENS[token]["expires_at"]: |
|
|
|
del TEMP_TOKENS[token] |
|
|
|
print("Token已过期") |
|
|
|
# 检查文件是否存在 |
|
|
|
if not avatar_file: |
|
|
|
return Response( |
|
|
|
status_code=401, |
|
|
|
description=jsonify({"success": False, "message": "登录已过期"}), |
|
|
|
status_code=400, |
|
|
|
description=jsonify({"success": False, "message": "未上传头像文件"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
|
|
|
|
# 获取上传的文件 |
|
|
|
avatar_file = files_data.get("avatar") if isinstance(files_data, dict) else None |
|
|
|
|
|
|
|
# 如果没有通过"avatar"键获取到文件,尝试直接访问files_data的第一个元素 |
|
|
|
if not avatar_file and isinstance(files_data, dict) and len(files_data) > 0: |
|
|
|
# 获取第一个文件作为头像文件 |
|
|
|
first_key = list(files_data.keys())[0] |
|
|
|
avatar_file = files_data[first_key] |
|
|
|
print(f"通过备用方法获取文件,键名: {first_key}") |
|
|
|
|
|
|
|
if not avatar_file: |
|
|
|
print("未找到avatar文件") |
|
|
|
print(f"可用的文件键: {list(files_data.keys()) if isinstance(files_data, dict) else '无法获取键列表'}") |
|
|
|
# 获取文件内容和文件名 |
|
|
|
file_content = None |
|
|
|
filename = "" |
|
|
|
|
|
|
|
# Robyn框架中,files字典的值通常是字节数据 |
|
|
|
if isinstance(avatar_file, bytes): |
|
|
|
file_content = avatar_file |
|
|
|
filename = list(request.files.keys())[0] if request.files else "avatar.jpg" |
|
|
|
elif isinstance(avatar_file, str): |
|
|
|
filename = avatar_file |
|
|
|
else: |
|
|
|
if avatar_file is not None: |
|
|
|
# 尝试获取文件内容的其他方式 |
|
|
|
if hasattr(avatar_file, 'content'): |
|
|
|
file_content = avatar_file.content |
|
|
|
elif hasattr(avatar_file, 'read'): |
|
|
|
file_content = avatar_file.read() |
|
|
|
filename = getattr(avatar_file, 'filename', 'avatar.jpg') |
|
|
|
|
|
|
|
# 如果我们有文件内容,继续处理 |
|
|
|
if not file_content: |
|
|
|
return Response( |
|
|
|
status_code=400, |
|
|
|
description=jsonify({"success": False, "message": "未上传头像文件"}), |
|
|
|
description=jsonify({"success": False, "message": "无法读取文件内容"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
|
|
|
|
print(f"获取的文件: {avatar_file}") |
|
|
|
print(f"文件属性: {dir(avatar_file) if avatar_file else '无文件'}") |
|
|
|
# 处理文件名 |
|
|
|
if not filename: |
|
|
|
filename = "avatar.jpg" |
|
|
|
|
|
|
|
file_extension = filename.split('.')[-1] if '.' in filename else 'jpg' |
|
|
|
|
|
|
|
# 检查文件类型 - 使用多种方法验证 |
|
|
|
# 验证文件类型 |
|
|
|
is_valid_image = False |
|
|
|
file_extension = "" |
|
|
|
|
|
|
|
# 方法1: 检查content_type |
|
|
|
content_type = getattr(avatar_file, 'content_type', '') |
|
|
|
if content_type and content_type.startswith("image/"): |
|
|
|
# 通过扩展名验证 |
|
|
|
if file_extension.lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'svg']: |
|
|
|
is_valid_image = True |
|
|
|
print(f"通过content_type验证: {content_type}") |
|
|
|
|
|
|
|
# 方法2: 检查文件名和扩展名 |
|
|
|
filename = getattr(avatar_file, 'filename', '') |
|
|
|
if filename: |
|
|
|
file_extension = os.path.splitext(filename)[1].lower() |
|
|
|
valid_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg'] |
|
|
|
if file_extension in valid_extensions: |
|
|
|
is_valid_image = True |
|
|
|
print(f"通过文件扩展名验证: {file_extension}") |
|
|
|
|
|
|
|
# 如果两种方法都失败,尝试读取文件头 |
|
|
|
|
|
|
|
# 如果扩展名验证失败,尝试读取文件头验证 |
|
|
|
if not is_valid_image: |
|
|
|
try: |
|
|
|
# 读取文件前几个字节来检测文件类型 |
|
|
|
file_content = avatar_file.read(1024) |
|
|
|
avatar_file.seek(0) # 重置文件指针 |
|
|
|
|
|
|
|
# 检查常见图片格式的文件头 |
|
|
|
if (file_content.startswith(b'\xFF\xD8\xFF') or # JPEG |
|
|
|
file_content.startswith(b'\x89PNG\r\n\x1a\n') or # PNG |
|
|
|
file_content.startswith(b'GIF87a') or # GIF |
|
|
|
@ -256,64 +241,47 @@ def update_avatar_route(request): |
|
|
|
file_content.startswith(b'BM') or # BMP |
|
|
|
file_content.startswith(b'RIFF') and b'WEBP' in file_content[:12]): # WebP |
|
|
|
is_valid_image = True |
|
|
|
print(f"通过文件头验证成功") |
|
|
|
else: |
|
|
|
print(f"文件头验证失败,文件前16字节: {file_content[:16]}") |
|
|
|
except Exception as e: |
|
|
|
print(f"读取文件内容进行验证时出错: {e}") |
|
|
|
except Exception: |
|
|
|
pass |
|
|
|
|
|
|
|
if not is_valid_image: |
|
|
|
print(f"文件类型验证失败 - content_type: {content_type}, filename: {filename}, extension: {file_extension}") |
|
|
|
return Response( |
|
|
|
status_code=400, |
|
|
|
description=jsonify({"success": False, "message": f"文件类型必须是图片 (支持的格式: JPG, PNG, GIF, BMP, WebP, SVG)"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
|
|
|
|
# 获取文件内容 |
|
|
|
file_content = avatar_file.file_data |
|
|
|
print(f"读取的文件大小: {len(file_content)} 字节") |
|
|
|
|
|
|
|
# 生成唯一的文件名 |
|
|
|
import time |
|
|
|
import random |
|
|
|
filename = getattr(avatar_file, 'filename', '') |
|
|
|
file_extension = filename.split('.')[-1] if '.' in filename else 'jpg' |
|
|
|
timestamp = int(time.time()) |
|
|
|
random_num = random.randint(1000, 9999) |
|
|
|
username = TEMP_TOKENS[token]["user"]["username"] |
|
|
|
username = user["username"] |
|
|
|
new_filename = f"{username}_{timestamp}_{random_num}.{file_extension}" |
|
|
|
|
|
|
|
# 定义文件保存路径 |
|
|
|
avatar_dir = "D:/zhishitupu/MedKG/resource/avatar" |
|
|
|
file_path = f"{avatar_dir}/{new_filename}" |
|
|
|
|
|
|
|
print(f"保存文件到: {file_path}") |
|
|
|
# 定义文件保存路径(使用相对路径) |
|
|
|
current_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
|
|
avatar_dir = os.path.join(current_dir, "resource", "avatar") |
|
|
|
file_path = os.path.join(avatar_dir, new_filename) |
|
|
|
|
|
|
|
# 确保目录存在 |
|
|
|
import os |
|
|
|
os.makedirs(avatar_dir, exist_ok=True) |
|
|
|
|
|
|
|
# 保存文件到磁盘 |
|
|
|
with open(file_path, "wb") as f: |
|
|
|
f.write(file_content) |
|
|
|
|
|
|
|
print(f"文件保存成功") |
|
|
|
|
|
|
|
# 更新用户头像到数据库,存储相对路径 |
|
|
|
avatar_relative_path = f"/resource/avatar/{new_filename}" |
|
|
|
success = user_service.update_user_avatar(username, avatar_relative_path) |
|
|
|
|
|
|
|
if not success: |
|
|
|
print("数据库更新失败") |
|
|
|
return Response( |
|
|
|
status_code=500, |
|
|
|
description=jsonify({"success": False, "message": "更新头像失败"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
|
|
|
|
print("数据库更新成功") |
|
|
|
|
|
|
|
# 更新token中的用户信息 |
|
|
|
TEMP_TOKENS[token]["user"]["avatar"] = avatar_relative_path |
|
|
|
|
|
|
|
@ -327,9 +295,6 @@ def update_avatar_route(request): |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
except Exception as e: |
|
|
|
print(f"更新头像异常: {str(e)}") |
|
|
|
import traceback |
|
|
|
traceback.print_exc() |
|
|
|
return Response( |
|
|
|
status_code=500, |
|
|
|
description=jsonify({"success": False, "message": f"更新头像失败: {str(e)}"}), |
|
|
|
@ -340,23 +305,15 @@ def update_avatar_route(request): |
|
|
|
def update_password_route(request): |
|
|
|
"""更新用户密码接口""" |
|
|
|
try: |
|
|
|
print("开始处理密码更新请求") |
|
|
|
|
|
|
|
# 解析请求数据 |
|
|
|
request_data = json.loads(request.body) if request.body else {} |
|
|
|
print(f"请求数据: {request_data}") |
|
|
|
|
|
|
|
token = request_data.get("token", "") |
|
|
|
current_password = request_data.get("currentPassword", "") |
|
|
|
new_password = request_data.get("newPassword", "") |
|
|
|
|
|
|
|
print(f"Token: {token}") |
|
|
|
print(f"当前密码长度: {len(current_password)}") |
|
|
|
print(f"新密码长度: {len(new_password)}") |
|
|
|
|
|
|
|
# 验证输入 |
|
|
|
if not current_password or not new_password: |
|
|
|
print("密码为空") |
|
|
|
return Response( |
|
|
|
status_code=400, |
|
|
|
description=jsonify({"success": False, "message": "当前密码和新密码不能为空"}), |
|
|
|
@ -364,46 +321,29 @@ def update_password_route(request): |
|
|
|
) |
|
|
|
|
|
|
|
# 验证token |
|
|
|
if not token or token not in TEMP_TOKENS: |
|
|
|
print(f"Token验证失败: {token}") |
|
|
|
user = validate_token(token) |
|
|
|
if not user: |
|
|
|
return Response( |
|
|
|
status_code=401, |
|
|
|
description=jsonify({"success": False, "message": "未登录或登录已过期"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
|
|
|
|
# 检查token是否过期 |
|
|
|
if datetime.now() > TEMP_TOKENS[token]["expires_at"]: |
|
|
|
del TEMP_TOKENS[token] |
|
|
|
print("Token已过期") |
|
|
|
return Response( |
|
|
|
status_code=401, |
|
|
|
description=jsonify({"success": False, "message": "登录已过期"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
|
|
|
|
# 获取用户信息 |
|
|
|
username = TEMP_TOKENS[token]["user"]["username"] |
|
|
|
print(f"用户名: {username}") |
|
|
|
username = user["username"] |
|
|
|
|
|
|
|
# 验证当前密码 |
|
|
|
user = user_service.get_user_by_username(username) |
|
|
|
if not user: |
|
|
|
print("用户不存在") |
|
|
|
db_user = user_service.get_user_by_username(username) |
|
|
|
if not db_user: |
|
|
|
return Response( |
|
|
|
status_code=404, |
|
|
|
description=jsonify({"success": False, "message": "用户不存在"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
|
|
|
|
print(f"用户信息: {user}") |
|
|
|
|
|
|
|
# 验证密码 |
|
|
|
is_password_valid = user_service.verify_password(current_password, user["password"]) |
|
|
|
print(f"密码验证结果: {is_password_valid}") |
|
|
|
|
|
|
|
is_password_valid = user_service.verify_password(current_password, db_user["password"]) |
|
|
|
if not is_password_valid: |
|
|
|
print("当前密码不正确") |
|
|
|
return Response( |
|
|
|
status_code=401, |
|
|
|
description=jsonify({"success": False, "message": "当前密码不正确"}), |
|
|
|
@ -412,69 +352,25 @@ def update_password_route(request): |
|
|
|
|
|
|
|
# 更新密码 |
|
|
|
success = user_service.update_user_password(username, new_password) |
|
|
|
print(f"密码更新结果: {success}") |
|
|
|
|
|
|
|
if not success: |
|
|
|
print("密码更新失败") |
|
|
|
return Response( |
|
|
|
status_code=500, |
|
|
|
description=jsonify({"success": False, "message": "密码更新失败"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
|
|
|
|
print("密码更新成功") |
|
|
|
return Response( |
|
|
|
status_code=200, |
|
|
|
description=jsonify({"success": True, "message": "密码更新成功"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
except Exception as e: |
|
|
|
print(f"密码更新异常: {str(e)}") |
|
|
|
import traceback |
|
|
|
traceback.print_exc() |
|
|
|
return Response( |
|
|
|
status_code=500, |
|
|
|
description=jsonify({"success": False, "message": f"密码更新失败: {str(e)}"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
|
|
|
|
@app.get("/api/test/db") |
|
|
|
def test_db_connection(request): |
|
|
|
"""测试数据库连接""" |
|
|
|
try: |
|
|
|
# 检查数据库连接状态 |
|
|
|
is_connected = user_service.mysql.is_connected() |
|
|
|
print(f"数据库连接状态: {is_connected}") |
|
|
|
|
|
|
|
# 尝试查询用户表 |
|
|
|
sql = "SELECT COUNT(*) as user_count FROM users" |
|
|
|
result = user_service.mysql.execute_query(sql) |
|
|
|
print(f"查询结果: {result}") |
|
|
|
|
|
|
|
# 打印当前的token信息用于调试 |
|
|
|
print(f"当前TEMP_TOKENS: {TEMP_TOKENS}") |
|
|
|
|
|
|
|
return Response( |
|
|
|
status_code=200, |
|
|
|
description=jsonify({ |
|
|
|
"success": True, |
|
|
|
"message": "数据库连接测试成功", |
|
|
|
"is_connected": is_connected, |
|
|
|
"user_count": result[0]["user_count"] if result else 0, |
|
|
|
"tokens": list(TEMP_TOKENS.keys()) # 返回当前有效的token列表 |
|
|
|
}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
except Exception as e: |
|
|
|
print(f"数据库连接测试失败: {str(e)}") |
|
|
|
import traceback |
|
|
|
traceback.print_exc() |
|
|
|
return Response( |
|
|
|
status_code=500, |
|
|
|
description=jsonify({"success": False, "message": f"数据库连接测试失败: {str(e)}"}), |
|
|
|
headers={"Content-Type": "application/json; charset=utf-8"} |
|
|
|
) |
|
|
|
|
|
|
|
@app.after_request("/") |
|
|
|
def add_cors_headers(response): |
|
|
|
"""添加CORS头,支持跨域请求""" |
|
|
|
|