from robyn import jsonify, Response from python.app import app from datetime import datetime, timedelta import uuid import json from service.UserService import user_service # 临时存储token,用于会话管理 TEMP_TOKENS = {} def generate_token() -> str: """生成随机token""" return str(uuid.uuid4()) @app.post("/api/login") def login_route(request): """登录接口""" try: request_data = json.loads(request.body) if request.body else {} username = request_data.get("username", "").strip() password = request_data.get("password", "").strip() remember = request_data.get("remember", False) # 验证输入 if not username or not password: return Response( status_code=400, description=jsonify({"success": False, "message": "用户名和密码不能为空"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 验证用户 user = user_service.verify_user(username, password) if not user: return Response( status_code=401, description=jsonify({"success": False, "message": "用户名或密码错误"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 生成token并设置过期时间 token = generate_token() expires_at = datetime.now() + timedelta(days=7 if remember else 1) TEMP_TOKENS[token] = {"user": user, "expires_at": expires_at} return Response( status_code=200, description=jsonify({"success": True, "message": "登录成功", "token": token, "user": user}), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"登录失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.post("/api/logout") def logout_route(request): """登出接口""" try: request_data = json.loads(request.body) if request.body else {} token = request_data.get("token", "") # 删除token TEMP_TOKENS.pop(token, None) return Response( status_code=200, description=jsonify({"success": True, "message": "登出成功"}), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"登出失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.get("/api/userInfo") def user_info_route(request): """获取用户信息接口""" try: query_params = getattr(request, 'query_params', {}) token = query_params.get("token", "") # 验证token是否存在 if not token or token not in TEMP_TOKENS: return Response( status_code=401, description=jsonify({"success": False, "message": "未登录或登录已过期"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 检查token是否过期 if datetime.now() > TEMP_TOKENS[token]["expires_at"]: del TEMP_TOKENS[token] return Response( status_code=401, description=jsonify({"success": False, "message": "登录已过期"}), headers={"Content-Type": "application/json; charset=utf-8"} ) return Response( status_code=200, description=jsonify({"success": True, "user": TEMP_TOKENS[token]["user"]}), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"获取用户信息失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.post("/api/updateAvatar") def update_avatar_route(request): """更新用户头像接口""" try: # 打印调试信息 print(f"请求类型: {type(request)}") print(f"请求属性: {dir(request)}") # 在Robyn中,文件上传的数据存储在request.files中 # 表单字段存储在request.form中 form_data = getattr(request, 'form', {}) files_data = getattr(request, 'files', {}) print(f"表单数据: {form_data}") print(f"文件数据: {files_data}") print(f"表单数据类型: {type(form_data)}") print(f"文件数据类型: {type(files_data)}") # 获取token - 尝试多种方式获取 token = None # 方法1: 从form_data中获取 if isinstance(form_data, dict) and "token" in form_data: token = form_data["token"] print(f"从form_data获取token: {token}") # 方法2: 如果form_data不是字典,尝试其他方式 if not token and hasattr(form_data, 'get'): token = form_data.get("token", "") print(f"通过form_data.get()获取token: {token}") # 方法3: 尝试从request的属性中直接获取 if not token: # 尝试从request中获取所有可能的属性 for attr_name in dir(request): if 'token' in attr_name.lower() or 'form' in attr_name.lower(): try: attr_value = getattr(request, attr_name) print(f"request.{attr_name}: {type(attr_value)} = {attr_value}") # 如果是字典类型,检查是否包含token if isinstance(attr_value, dict) and 'token' in attr_value: token = attr_value['token'] print(f"从request.{attr_name}获取token: {token}") break except Exception as e: print(f"访问request.{attr_name}时出错: {e}") # 方法4: 从查询参数中获取 if not token: query_params = getattr(request, 'query_params', {}) if isinstance(query_params, dict) and "token" in query_params: token = query_params["token"] print(f"从查询参数获取token: {token}") # 方法5: 从headers中获取 if not token: headers = getattr(request, 'headers', {}) if isinstance(headers, dict) and "Authorization" in headers: auth_header = headers["Authorization"] if auth_header.startswith("Bearer "): token = auth_header[7:] print(f"从Authorization头获取token: {token}") print(f"最终获取的token: {token}") print(f"TEMP_TOKENS中的keys: {list(TEMP_TOKENS.keys())}") # 验证token if not token or token not in TEMP_TOKENS: print(f"Token验证失败: {token}") return Response( status_code=401, description=jsonify({"success": False, "message": "未登录或登录已过期"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 检查token是否过期 if datetime.now() > TEMP_TOKENS[token]["expires_at"]: del TEMP_TOKENS[token] print("Token已过期") return Response( status_code=401, description=jsonify({"success": False, "message": "登录已过期"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 获取上传的文件 avatar_file = files_data.get("avatar") if isinstance(files_data, dict) else None # 如果没有通过"avatar"键获取到文件,尝试直接访问files_data的第一个元素 if not avatar_file and isinstance(files_data, dict) and len(files_data) > 0: # 获取第一个文件作为头像文件 first_key = list(files_data.keys())[0] avatar_file = files_data[first_key] print(f"通过备用方法获取文件,键名: {first_key}") if not avatar_file: print("未找到avatar文件") print(f"可用的文件键: {list(files_data.keys()) if isinstance(files_data, dict) else '无法获取键列表'}") return Response( status_code=400, description=jsonify({"success": False, "message": "未上传头像文件"}), headers={"Content-Type": "application/json; charset=utf-8"} ) print(f"获取的文件: {avatar_file}") print(f"文件属性: {dir(avatar_file) if avatar_file else '无文件'}") # 检查文件类型 - 使用多种方法验证 is_valid_image = False file_extension = "" # 方法1: 检查content_type content_type = getattr(avatar_file, 'content_type', '') if content_type and content_type.startswith("image/"): is_valid_image = True print(f"通过content_type验证: {content_type}") # 方法2: 检查文件名和扩展名 filename = getattr(avatar_file, 'filename', '') if filename: file_extension = os.path.splitext(filename)[1].lower() valid_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg'] if file_extension in valid_extensions: is_valid_image = True print(f"通过文件扩展名验证: {file_extension}") # 如果两种方法都失败,尝试读取文件头 if not is_valid_image: try: # 读取文件前几个字节来检测文件类型 file_content = avatar_file.read(1024) avatar_file.seek(0) # 重置文件指针 # 检查常见图片格式的文件头 if (file_content.startswith(b'\xFF\xD8\xFF') or # JPEG file_content.startswith(b'\x89PNG\r\n\x1a\n') or # PNG file_content.startswith(b'GIF87a') or # GIF file_content.startswith(b'GIF89a') or # GIF file_content.startswith(b'BM') or # BMP file_content.startswith(b'RIFF') and b'WEBP' in file_content[:12]): # WebP is_valid_image = True print(f"通过文件头验证成功") else: print(f"文件头验证失败,文件前16字节: {file_content[:16]}") except Exception as e: print(f"读取文件内容进行验证时出错: {e}") if not is_valid_image: print(f"文件类型验证失败 - content_type: {content_type}, filename: {filename}, extension: {file_extension}") return Response( status_code=400, description=jsonify({"success": False, "message": f"文件类型必须是图片 (支持的格式: JPG, PNG, GIF, BMP, WebP, SVG)"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 获取文件内容 file_content = avatar_file.file_data print(f"读取的文件大小: {len(file_content)} 字节") # 生成唯一的文件名 import time import random filename = getattr(avatar_file, 'filename', '') file_extension = filename.split('.')[-1] if '.' in filename else 'jpg' timestamp = int(time.time()) random_num = random.randint(1000, 9999) username = TEMP_TOKENS[token]["user"]["username"] new_filename = f"{username}_{timestamp}_{random_num}.{file_extension}" # 定义文件保存路径 avatar_dir = "D:/zhishitupu/MedKG/resource/avatar" file_path = f"{avatar_dir}/{new_filename}" print(f"保存文件到: {file_path}") # 确保目录存在 import os os.makedirs(avatar_dir, exist_ok=True) # 保存文件到磁盘 with open(file_path, "wb") as f: f.write(file_content) print(f"文件保存成功") # 更新用户头像到数据库,存储相对路径 avatar_relative_path = f"/resource/avatar/{new_filename}" success = user_service.update_user_avatar(username, avatar_relative_path) if not success: print("数据库更新失败") return Response( status_code=500, description=jsonify({"success": False, "message": "更新头像失败"}), headers={"Content-Type": "application/json; charset=utf-8"} ) print("数据库更新成功") # 更新token中的用户信息 TEMP_TOKENS[token]["user"]["avatar"] = avatar_relative_path return Response( status_code=200, description=jsonify({ "success": True, "message": "头像更新成功", "avatar": avatar_relative_path }), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: print(f"更新头像异常: {str(e)}") import traceback traceback.print_exc() return Response( status_code=500, description=jsonify({"success": False, "message": f"更新头像失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.post("/api/updatePassword") def update_password_route(request): """更新用户密码接口""" try: print("开始处理密码更新请求") # 解析请求数据 request_data = json.loads(request.body) if request.body else {} print(f"请求数据: {request_data}") token = request_data.get("token", "") current_password = request_data.get("currentPassword", "") new_password = request_data.get("newPassword", "") print(f"Token: {token}") print(f"当前密码长度: {len(current_password)}") print(f"新密码长度: {len(new_password)}") # 验证输入 if not current_password or not new_password: print("密码为空") return Response( status_code=400, description=jsonify({"success": False, "message": "当前密码和新密码不能为空"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 验证token if not token or token not in TEMP_TOKENS: print(f"Token验证失败: {token}") return Response( status_code=401, description=jsonify({"success": False, "message": "未登录或登录已过期"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 检查token是否过期 if datetime.now() > TEMP_TOKENS[token]["expires_at"]: del TEMP_TOKENS[token] print("Token已过期") return Response( status_code=401, description=jsonify({"success": False, "message": "登录已过期"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 获取用户信息 username = TEMP_TOKENS[token]["user"]["username"] print(f"用户名: {username}") # 验证当前密码 user = user_service.get_user_by_username(username) if not user: print("用户不存在") return Response( status_code=404, description=jsonify({"success": False, "message": "用户不存在"}), headers={"Content-Type": "application/json; charset=utf-8"} ) print(f"用户信息: {user}") # 验证密码 is_password_valid = user_service.verify_password(current_password, user["password"]) print(f"密码验证结果: {is_password_valid}") if not is_password_valid: print("当前密码不正确") return Response( status_code=401, description=jsonify({"success": False, "message": "当前密码不正确"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 更新密码 success = user_service.update_user_password(username, new_password) print(f"密码更新结果: {success}") if not success: print("密码更新失败") return Response( status_code=500, description=jsonify({"success": False, "message": "密码更新失败"}), headers={"Content-Type": "application/json; charset=utf-8"} ) print("密码更新成功") return Response( status_code=200, description=jsonify({"success": True, "message": "密码更新成功"}), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: print(f"密码更新异常: {str(e)}") import traceback traceback.print_exc() return Response( status_code=500, description=jsonify({"success": False, "message": f"密码更新失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.get("/api/test/db") def test_db_connection(request): """测试数据库连接""" try: # 检查数据库连接状态 is_connected = user_service.mysql.is_connected() print(f"数据库连接状态: {is_connected}") # 尝试查询用户表 sql = "SELECT COUNT(*) as user_count FROM users" result = user_service.mysql.execute_query(sql) print(f"查询结果: {result}") # 打印当前的token信息用于调试 print(f"当前TEMP_TOKENS: {TEMP_TOKENS}") return Response( status_code=200, description=jsonify({ "success": True, "message": "数据库连接测试成功", "is_connected": is_connected, "user_count": result[0]["user_count"] if result else 0, "tokens": list(TEMP_TOKENS.keys()) # 返回当前有效的token列表 }), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: print(f"数据库连接测试失败: {str(e)}") import traceback traceback.print_exc() return Response( status_code=500, description=jsonify({"success": False, "message": f"数据库连接测试失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.after_request("/") def add_cors_headers(response): """添加CORS头,支持跨域请求""" response.headers.update({ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS", "Access-Control-Allow-Headers": "Content-Type, Authorization, X-Requested-With" }) return response