import os from robyn import jsonify, Response, Request from app import app from datetime import datetime, timedelta import uuid import json from service.UserService import user_service # 临时存储token,用于会话管理 TEMP_TOKENS = {} def generate_token() -> str: """生成随机token""" return str(uuid.uuid4()) def validate_token(token: str) -> dict: """验证token并返回用户信息""" if not token or token not in TEMP_TOKENS: return None # 检查token是否过期 if datetime.now() > TEMP_TOKENS[token]["expires_at"]: del TEMP_TOKENS[token] return None return TEMP_TOKENS[token]["user"] @app.post("/api/login") def login_route(request): """登录接口""" try: # 解析请求数据 request_data = {} if request.body: try: request_data = json.loads(request.body) except json.JSONDecodeError: pass # 如果JSON解析失败,尝试从form_data获取 if not request_data and hasattr(request, 'form_data'): request_data = getattr(request, 'form_data', {}) username = request_data.get("username", "").strip() password = request_data.get("password", "").strip() remember = request_data.get("remember", False) # 验证输入 if not username or not password: return Response( status_code=400, description=jsonify({"success": False, "message": "用户名和密码不能为空"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 验证用户 user = user_service.verify_user(username, password) if not user: return Response( status_code=401, description=jsonify({"success": False, "message": "用户名或密码错误"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 生成token并设置过期时间 token = generate_token() expires_at = datetime.now() + timedelta(days=7 if remember else 1) TEMP_TOKENS[token] = {"user": user, "expires_at": expires_at} return Response( status_code=200, description=jsonify({"success": True, "message": "登录成功", "token": token, "user": user}), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"登录失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.post("/api/logout") def logout_route(request): """登出接口""" try: request_data = json.loads(request.body) if request.body else {} token = request_data.get("token", "") # 删除token TEMP_TOKENS.pop(token, None) return Response( status_code=200, description=jsonify({"success": True, "message": "登出成功"}), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"登出失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.get("/api/userInfo") def user_info_route(request): """获取用户信息接口""" try: query_params = getattr(request, 'query_params', {}) token = query_params.get("token", "") # 验证token user = validate_token(token) if not user: return Response( status_code=401, description=jsonify({"success": False, "message": "未登录或登录已过期"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 从数据库获取最新的用户信息 username = user["username"] db_user = user_service.get_user_info(username) if db_user: # 更新TEMP_TOKENS中的用户信息 TEMP_TOKENS[token]["user"] = db_user user_info = db_user else: user_info = user return Response( status_code=200, description=jsonify({"success": True, "user": user_info}), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"获取用户信息失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.post("/api/updateAvatar") async def update_avatar_route(request: Request): """更新用户头像接口""" try: # 从files中获取文件和token avatar_file = request.files.get('avatar') if hasattr(request, 'files') else None token = None # 从form_data中获取token if hasattr(request, 'form_data'): token = request.form_data.get('token') # 如果files中没有直接找到'avatar',尝试获取第一个文件 if not avatar_file and hasattr(request, 'files') and request.files: first_key = list(request.files.keys())[0] avatar_file = request.files[first_key] # 如果form_data中没有token,尝试从headers获取 if not token: headers_dict = {} if hasattr(request, 'headers'): try: headers_dict = dict(request.headers) except: pass token = headers_dict.get('Authorization') or headers_dict.get('authorization') # 如果还是没有token,尝试从body中解析 if not token and hasattr(request, 'body'): try: body_data = json.loads(request.body) token = body_data.get('token') except: pass # 验证token user = validate_token(token) if not user: return Response( status_code=401, description=jsonify({"success": False, "message": "未登录或登录已过期"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 检查文件是否存在 if not avatar_file: return Response( status_code=400, description=jsonify({"success": False, "message": "未上传头像文件"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 获取文件内容和文件名 file_content = None filename = "" # Robyn框架中,files字典的值通常是字节数据 if isinstance(avatar_file, bytes): file_content = avatar_file filename = list(request.files.keys())[0] if request.files else "avatar.jpg" elif isinstance(avatar_file, str): filename = avatar_file else: if avatar_file is not None: # 尝试获取文件内容的其他方式 if hasattr(avatar_file, 'content'): file_content = avatar_file.content elif hasattr(avatar_file, 'read'): file_content = avatar_file.read() filename = getattr(avatar_file, 'filename', 'avatar.jpg') # 如果我们有文件内容,继续处理 if not file_content: return Response( status_code=400, description=jsonify({"success": False, "message": "无法读取文件内容"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 处理文件名 if not filename: filename = "avatar.jpg" file_extension = filename.split('.')[-1] if '.' in filename else 'jpg' # 验证文件类型 is_valid_image = False # 通过扩展名验证 if file_extension.lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'svg']: is_valid_image = True # 如果扩展名验证失败,尝试读取文件头验证 if not is_valid_image: try: if (file_content.startswith(b'\xFF\xD8\xFF') or # JPEG file_content.startswith(b'\x89PNG\r\n\x1a\n') or # PNG file_content.startswith(b'GIF87a') or # GIF file_content.startswith(b'GIF89a') or # GIF file_content.startswith(b'BM') or # BMP file_content.startswith(b'RIFF') and b'WEBP' in file_content[:12]): # WebP is_valid_image = True except Exception: pass if not is_valid_image: return Response( status_code=400, description=jsonify({"success": False, "message": f"文件类型必须是图片 (支持的格式: JPG, PNG, GIF, BMP, WebP, SVG)"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 生成唯一的文件名 import time import random timestamp = int(time.time()) random_num = random.randint(1000, 9999) username = user["username"] new_filename = f"{username}_{timestamp}_{random_num}.{file_extension}" # 定义文件保存路径(使用相对路径) current_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) avatar_dir = os.path.join(current_dir, "resource", "avatar") file_path = os.path.join(avatar_dir, new_filename) # 确保目录存在 os.makedirs(avatar_dir, exist_ok=True) # 保存文件到磁盘 with open(file_path, "wb") as f: f.write(file_content) # 更新用户头像到数据库,存储相对路径 avatar_relative_path = f"/resource/avatar/{new_filename}" success = user_service.update_user_avatar(username, avatar_relative_path) if not success: return Response( status_code=500, description=jsonify({"success": False, "message": "更新头像失败"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 更新token中的用户信息 TEMP_TOKENS[token]["user"]["avatar"] = avatar_relative_path return Response( status_code=200, description=jsonify({ "success": True, "message": "头像更新成功", "avatar": avatar_relative_path }), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"更新头像失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.post("/api/updatePassword") def update_password_route(request): """更新用户密码接口""" try: # 解析请求数据 request_data = json.loads(request.body) if request.body else {} token = request_data.get("token", "") current_password = request_data.get("currentPassword", "") new_password = request_data.get("newPassword", "") # 验证输入 if not current_password or not new_password: return Response( status_code=400, description=jsonify({"success": False, "message": "当前密码和新密码不能为空"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 验证token user = validate_token(token) if not user: return Response( status_code=401, description=jsonify({"success": False, "message": "未登录或登录已过期"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 获取用户信息 username = user["username"] # 验证当前密码 db_user = user_service.get_user_by_username(username) if not db_user: return Response( status_code=404, description=jsonify({"success": False, "message": "用户不存在"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 验证密码 is_password_valid = user_service.verify_password(current_password, db_user["password"]) if not is_password_valid: return Response( status_code=401, description=jsonify({"success": False, "message": "当前密码不正确"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 更新密码 success = user_service.update_user_password(username, new_password) if not success: return Response( status_code=500, description=jsonify({"success": False, "message": "密码更新失败"}), headers={"Content-Type": "application/json; charset=utf-8"} ) return Response( status_code=200, description=jsonify({"success": True, "message": "密码更新成功"}), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"密码更新失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.after_request("/") def add_cors_headers(response): """添加CORS头,支持跨域请求""" response.headers.update({ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS", "Access-Control-Allow-Headers": "Content-Type, Authorization, X-Requested-With" }) return response