You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

486 lines
19 KiB

from robyn import jsonify, Response
from app import app
from datetime import datetime, timedelta
import uuid
import json
import os
import base64
from service.UserService import user_service
# 临时存储token,用于会话管理
TEMP_TOKENS = {}
def generate_token() -> str:
"""生成随机token"""
return str(uuid.uuid4())
@app.post("/api/login")
def login_route(request):
"""登录接口"""
try:
request_data = json.loads(request.body) if request.body else {}
username = request_data.get("username", "").strip()
password = request_data.get("password", "").strip()
remember = request_data.get("remember", False)
# 验证输入
if not username or not password:
return Response(
status_code=400,
description=jsonify({"success": False, "message": "用户名和密码不能为空"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
# 验证用户
user = user_service.verify_user(username, password)
if not user:
return Response(
status_code=401,
description=jsonify({"success": False, "message": "用户名或密码错误"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
# 生成token并设置过期时间
token = generate_token()
expires_at = datetime.now() + timedelta(days=7 if remember else 1)
TEMP_TOKENS[token] = {"user": user, "expires_at": expires_at}
return Response(
status_code=200,
description=jsonify({"success": True, "message": "登录成功", "token": token, "user": user}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
except Exception as e:
return Response(
status_code=500,
description=jsonify({"success": False, "message": f"登录失败: {str(e)}"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
@app.post("/api/logout")
def logout_route(request):
"""登出接口"""
try:
request_data = json.loads(request.body) if request.body else {}
token = request_data.get("token", "")
# 删除token
TEMP_TOKENS.pop(token, None)
return Response(
status_code=200,
description=jsonify({"success": True, "message": "登出成功"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
except Exception as e:
return Response(
status_code=500,
description=jsonify({"success": False, "message": f"登出失败: {str(e)}"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
@app.get("/api/userInfo")
def user_info_route(request):
"""获取用户信息接口"""
try:
query_params = getattr(request, 'query_params', {})
token = query_params.get("token", "")
# 验证token是否存在
if not token or token not in TEMP_TOKENS:
return Response(
status_code=401,
description=jsonify({"success": False, "message": "未登录或登录已过期"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
# 检查token是否过期
if datetime.now() > TEMP_TOKENS[token]["expires_at"]:
del TEMP_TOKENS[token]
return Response(
status_code=401,
description=jsonify({"success": False, "message": "登录已过期"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
return Response(
status_code=200,
description=jsonify({"success": True, "user": TEMP_TOKENS[token]["user"]}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
except Exception as e:
return Response(
status_code=500,
description=jsonify({"success": False, "message": f"获取用户信息失败: {str(e)}"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
@app.post("/api/updateAvatar")
def update_avatar_route(request):
"""更新用户头像接口"""
try:
# 打印调试信息
print(f"请求类型: {type(request)}")
print(f"请求属性: {dir(request)}")
# 在Robyn中,文件上传的数据存储在request.files中
# 表单字段存储在request.form中
form_data = getattr(request, 'form', {})
files_data = getattr(request, 'files', {})
print(f"表单数据: {form_data}")
print(f"文件数据: {files_data}")
print(f"表单数据类型: {type(form_data)}")
print(f"文件数据类型: {type(files_data)}")
# 获取token - 尝试多种方式获取
token = None
# 方法1: 从form_data中获取
if isinstance(form_data, dict) and "token" in form_data:
token = form_data["token"]
print(f"从form_data获取token: {token}")
# 方法2: 如果form_data不是字典,尝试其他方式
if not token and hasattr(form_data, 'get'):
token = form_data.get("token", "")
print(f"通过form_data.get()获取token: {token}")
# 方法3: 尝试从request的属性中直接获取
if not token:
# 尝试从request中获取所有可能的属性
for attr_name in dir(request):
if 'token' in attr_name.lower() or 'form' in attr_name.lower():
try:
attr_value = getattr(request, attr_name)
print(f"request.{attr_name}: {type(attr_value)} = {attr_value}")
# 如果是字典类型,检查是否包含token
if isinstance(attr_value, dict) and 'token' in attr_value:
token = attr_value['token']
print(f"从request.{attr_name}获取token: {token}")
break
except Exception as e:
print(f"访问request.{attr_name}时出错: {e}")
# 方法4: 从查询参数中获取
if not token:
query_params = getattr(request, 'query_params', {})
if isinstance(query_params, dict) and "token" in query_params:
token = query_params["token"]
print(f"从查询参数获取token: {token}")
# 方法5: 从headers中获取
if not token:
headers = getattr(request, 'headers', {})
if isinstance(headers, dict) and "Authorization" in headers:
auth_header = headers["Authorization"]
if auth_header.startswith("Bearer "):
token = auth_header[7:]
print(f"从Authorization头获取token: {token}")
print(f"最终获取的token: {token}")
print(f"TEMP_TOKENS中的keys: {list(TEMP_TOKENS.keys())}")
# 验证token
if not token or token not in TEMP_TOKENS:
print(f"Token验证失败: {token}")
return Response(
status_code=401,
description=jsonify({"success": False, "message": "未登录或登录已过期"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
# 检查token是否过期
if datetime.now() > TEMP_TOKENS[token]["expires_at"]:
del TEMP_TOKENS[token]
print("Token已过期")
return Response(
status_code=401,
description=jsonify({"success": False, "message": "登录已过期"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
# 获取上传的文件
avatar_file = files_data.get("avatar") if isinstance(files_data, dict) else None
# 如果没有通过"avatar"键获取到文件,尝试直接访问files_data的第一个元素
if not avatar_file and isinstance(files_data, dict) and len(files_data) > 0:
# 获取第一个文件作为头像文件
first_key = list(files_data.keys())[0]
avatar_file = files_data[first_key]
print(f"通过备用方法获取文件,键名: {first_key}")
if not avatar_file:
print("未找到avatar文件")
print(f"可用的文件键: {list(files_data.keys()) if isinstance(files_data, dict) else '无法获取键列表'}")
return Response(
status_code=400,
description=jsonify({"success": False, "message": "未上传头像文件"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
print(f"获取的文件: {avatar_file}")
print(f"文件属性: {dir(avatar_file) if avatar_file else '无文件'}")
# 检查文件类型 - 使用多种方法验证
is_valid_image = False
file_extension = ""
# 方法1: 检查content_type
content_type = getattr(avatar_file, 'content_type', '')
if content_type and content_type.startswith("image/"):
is_valid_image = True
print(f"通过content_type验证: {content_type}")
# 方法2: 检查文件名和扩展名
filename = getattr(avatar_file, 'filename', '')
if filename:
file_extension = os.path.splitext(filename)[1].lower()
valid_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg']
if file_extension in valid_extensions:
is_valid_image = True
print(f"通过文件扩展名验证: {file_extension}")
# 如果两种方法都失败,尝试读取文件头
if not is_valid_image:
try:
# 读取文件前几个字节来检测文件类型
file_content = avatar_file.read(1024)
avatar_file.seek(0) # 重置文件指针
# 检查常见图片格式的文件头
if (file_content.startswith(b'\xFF\xD8\xFF') or # JPEG
file_content.startswith(b'\x89PNG\r\n\x1a\n') or # PNG
file_content.startswith(b'GIF87a') or # GIF
file_content.startswith(b'GIF89a') or # GIF
file_content.startswith(b'BM') or # BMP
file_content.startswith(b'RIFF') and b'WEBP' in file_content[:12]): # WebP
is_valid_image = True
print(f"通过文件头验证成功")
else:
print(f"文件头验证失败,文件前16字节: {file_content[:16]}")
except Exception as e:
print(f"读取文件内容进行验证时出错: {e}")
if not is_valid_image:
print(f"文件类型验证失败 - content_type: {content_type}, filename: {filename}, extension: {file_extension}")
return Response(
status_code=400,
description=jsonify({"success": False, "message": f"文件类型必须是图片 (支持的格式: JPG, PNG, GIF, BMP, WebP, SVG)"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
# 获取文件内容
file_content = avatar_file.file_data
print(f"读取的文件大小: {len(file_content)} 字节")
# 生成唯一的文件名
import time
import random
filename = getattr(avatar_file, 'filename', '')
file_extension = filename.split('.')[-1] if '.' in filename else 'jpg'
timestamp = int(time.time())
random_num = random.randint(1000, 9999)
username = TEMP_TOKENS[token]["user"]["username"]
new_filename = f"{username}_{timestamp}_{random_num}.{file_extension}"
# 定义文件保存路径
avatar_dir = "D:/zhishitupu/MedKG/resource/avatar"
file_path = f"{avatar_dir}/{new_filename}"
print(f"保存文件到: {file_path}")
# 确保目录存在
import os
os.makedirs(avatar_dir, exist_ok=True)
# 保存文件到磁盘
with open(file_path, "wb") as f:
f.write(file_content)
print(f"文件保存成功")
# 更新用户头像到数据库,存储相对路径
avatar_relative_path = f"/resource/avatar/{new_filename}"
success = user_service.update_user_avatar(username, avatar_relative_path)
if not success:
print("数据库更新失败")
return Response(
status_code=500,
description=jsonify({"success": False, "message": "更新头像失败"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
print("数据库更新成功")
# 更新token中的用户信息
TEMP_TOKENS[token]["user"]["avatar"] = avatar_relative_path
return Response(
status_code=200,
description=jsonify({
"success": True,
"message": "头像更新成功",
"avatar": avatar_relative_path
}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
except Exception as e:
print(f"更新头像异常: {str(e)}")
import traceback
traceback.print_exc()
return Response(
status_code=500,
description=jsonify({"success": False, "message": f"更新头像失败: {str(e)}"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
@app.post("/api/updatePassword")
def update_password_route(request):
"""更新用户密码接口"""
try:
print("开始处理密码更新请求")
# 解析请求数据
request_data = json.loads(request.body) if request.body else {}
print(f"请求数据: {request_data}")
token = request_data.get("token", "")
current_password = request_data.get("currentPassword", "")
new_password = request_data.get("newPassword", "")
print(f"Token: {token}")
print(f"当前密码长度: {len(current_password)}")
print(f"新密码长度: {len(new_password)}")
# 验证输入
if not current_password or not new_password:
print("密码为空")
return Response(
status_code=400,
description=jsonify({"success": False, "message": "当前密码和新密码不能为空"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
# 验证token
if not token or token not in TEMP_TOKENS:
print(f"Token验证失败: {token}")
return Response(
status_code=401,
description=jsonify({"success": False, "message": "未登录或登录已过期"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
# 检查token是否过期
if datetime.now() > TEMP_TOKENS[token]["expires_at"]:
del TEMP_TOKENS[token]
print("Token已过期")
return Response(
status_code=401,
description=jsonify({"success": False, "message": "登录已过期"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
# 获取用户信息
username = TEMP_TOKENS[token]["user"]["username"]
print(f"用户名: {username}")
# 验证当前密码
user = user_service.get_user_by_username(username)
if not user:
print("用户不存在")
return Response(
status_code=404,
description=jsonify({"success": False, "message": "用户不存在"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
print(f"用户信息: {user}")
# 验证密码
is_password_valid = user_service.verify_password(current_password, user["password"])
print(f"密码验证结果: {is_password_valid}")
if not is_password_valid:
print("当前密码不正确")
return Response(
status_code=401,
description=jsonify({"success": False, "message": "当前密码不正确"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
# 更新密码
success = user_service.update_user_password(username, new_password)
print(f"密码更新结果: {success}")
if not success:
print("密码更新失败")
return Response(
status_code=500,
description=jsonify({"success": False, "message": "密码更新失败"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
print("密码更新成功")
return Response(
status_code=200,
description=jsonify({"success": True, "message": "密码更新成功"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
except Exception as e:
print(f"密码更新异常: {str(e)}")
import traceback
traceback.print_exc()
return Response(
status_code=500,
description=jsonify({"success": False, "message": f"密码更新失败: {str(e)}"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
@app.get("/api/test/db")
def test_db_connection(request):
"""测试数据库连接"""
try:
# 检查数据库连接状态
is_connected = user_service.mysql.is_connected()
print(f"数据库连接状态: {is_connected}")
# 尝试查询用户表
sql = "SELECT COUNT(*) as user_count FROM users"
result = user_service.mysql.execute_query(sql)
print(f"查询结果: {result}")
# 打印当前的token信息用于调试
print(f"当前TEMP_TOKENS: {TEMP_TOKENS}")
return Response(
status_code=200,
description=jsonify({
"success": True,
"message": "数据库连接测试成功",
"is_connected": is_connected,
"user_count": result[0]["user_count"] if result else 0,
"tokens": list(TEMP_TOKENS.keys()) # 返回当前有效的token列表
}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
except Exception as e:
print(f"数据库连接测试失败: {str(e)}")
import traceback
traceback.print_exc()
return Response(
status_code=500,
description=jsonify({"success": False, "message": f"数据库连接测试失败: {str(e)}"}),
headers={"Content-Type": "application/json; charset=utf-8"}
)
@app.after_request("/")
def add_cors_headers(response):
"""添加CORS头,支持跨域请求"""
response.headers.update({
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization, X-Requested-With"
})
return response