import os from robyn import jsonify, Response, Request from app import app import json from service.UserService import user_service from util.token_utils import TokenManager from util.auth_interceptor import require_login, get_current_user @app.post("/api/login") def login_route(request): """登录接口""" try: # 解析请求数据 request_data = {} if request.body: try: request_data = json.loads(request.body) except json.JSONDecodeError: pass # 如果JSON解析失败,尝试从form_data获取 if not request_data and hasattr(request, 'form_data'): request_data = getattr(request, 'form_data', {}) username = request_data.get("username", "").strip() password = request_data.get("password", "").strip() remember = request_data.get("remember", False) # 验证输入 if not username or not password: return Response( status_code=400, description=jsonify({"success": False, "message": "用户名和密码不能为空"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 验证用户 user = user_service.verify_user(username, password) if not user: return Response( status_code=401, description=jsonify({"success": False, "message": "用户名或密码错误"}), headers={"Content-Type": "application/json; charset=utf-8"} ) token=TokenManager.generate_token() TokenManager.store_token(token,user, remember) return Response( status_code=200, description=jsonify({"success": True, "message": "登录成功", "token": token, "user": user}), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"登录失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.post("/api/logout") def logout_route(request): """登出接口""" try: token=TokenManager.get_token_from_request(request) TokenManager.remove_token(token) return Response( status_code=200, description=jsonify({"success": True, "message": "登出成功"}), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"登出失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.get("/api/userInfo") @require_login def user_info_route(request): """获取用户信息接口""" try: user = get_current_user() # 从数据库获取最新的用户信息 username = user["username"] db_user = user_service.get_user_info(username) if db_user: # 更新TEMP_TOKENS中的用户信息 user_info = db_user else: user_info = user return Response( status_code=200, description=jsonify({"success": True, "user": user_info}), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"获取用户信息失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.post("/api/updateAvatar") @require_login async def update_avatar_route(request: Request): """更新用户头像接口""" try: user = get_current_user() # 从files中获取文件和token avatar_file = request.files.get('avatar') if hasattr(request, 'files') else None if not avatar_file and hasattr(request, 'files') and request.files: first_key = list(request.files.keys())[0] avatar_file = request.files[first_key] # 检查文件是否存在 if not avatar_file: return Response( status_code=400, description=jsonify({"success": False, "message": "未上传头像文件"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 获取文件内容和文件名 file_content = None filename = "" # Robyn框架中,files字典的值通常是字节数据 if isinstance(avatar_file, bytes): file_content = avatar_file filename = list(request.files.keys())[0] if request.files else "avatar.jpg" elif isinstance(avatar_file, str): filename = avatar_file else: if avatar_file is not None: # 尝试获取文件内容的其他方式 if hasattr(avatar_file, 'content'): file_content = avatar_file.content elif hasattr(avatar_file, 'read'): file_content = avatar_file.read() filename = getattr(avatar_file, 'filename', 'avatar.jpg') # 如果我们有文件内容,继续处理 if not file_content: return Response( status_code=400, description=jsonify({"success": False, "message": "无法读取文件内容"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 处理文件名 if not filename: filename = "avatar.jpg" file_extension = filename.split('.')[-1] if '.' in filename else 'jpg' # 验证文件类型 is_valid_image = False # 通过扩展名验证 if file_extension.lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'svg']: is_valid_image = True # 如果扩展名验证失败,尝试读取文件头验证 if not is_valid_image: try: if (file_content.startswith(b'\xFF\xD8\xFF') or # JPEG file_content.startswith(b'\x89PNG\r\n\x1a\n') or # PNG file_content.startswith(b'GIF87a') or # GIF file_content.startswith(b'GIF89a') or # GIF file_content.startswith(b'BM') or # BMP file_content.startswith(b'RIFF') and b'WEBP' in file_content[:12]): # WebP is_valid_image = True except Exception: pass if not is_valid_image: return Response( status_code=400, description=jsonify({"success": False, "message": f"文件类型必须是图片 (支持的格式: JPG, PNG, GIF, BMP, WebP, SVG)"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 生成唯一的文件名 import time import random timestamp = int(time.time()) random_num = random.randint(1000, 9999) username = user["username"] new_filename = f"{username}_{timestamp}_{random_num}.{file_extension}" # 定义文件保存路径(使用相对路径) current_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) avatar_dir = os.path.join(current_dir, "resource", "avatar") file_path = os.path.join(avatar_dir, new_filename) # 确保目录存在 os.makedirs(avatar_dir, exist_ok=True) # 保存文件到磁盘 with open(file_path, "wb") as f: f.write(file_content) # 更新用户头像到数据库,存储相对路径 avatar_relative_path = f"/resource/avatar/{new_filename}" success = user_service.update_user_avatar(username, avatar_relative_path) if not success: return Response( status_code=500, description=jsonify({"success": False, "message": "更新头像失败"}), headers={"Content-Type": "application/json; charset=utf-8"} ) return Response( status_code=200, description=jsonify({ "success": True, "message": "头像更新成功", "avatar": avatar_relative_path }), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"更新头像失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.post("/api/updatePassword") @require_login def update_password_route(request): """更新用户密码接口""" try: # 解析请求数据 request_data = json.loads(request.body) if request.body else {} current_password = request_data.get("currentPassword", "") new_password = request_data.get("newPassword", "") # 验证输入 # 获取当前用户 user = get_current_user() # 获取用户信息 username = user["username"] # 验证当前密码 db_user = user_service.get_user_by_username(username) if not db_user: return Response( status_code=404, description=jsonify({"success": False, "message": "用户不存在"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 验证密码 is_password_valid = user_service.verify_password(current_password, db_user["password"]) if not is_password_valid: return Response( status_code=401, description=jsonify({"success": False, "message": "当前密码不正确"}), headers={"Content-Type": "application/json; charset=utf-8"} ) # 更新密码 success = user_service.update_user_password(username, new_password) if not success: return Response( status_code=500, description=jsonify({"success": False, "message": "密码更新失败"}), headers={"Content-Type": "application/json; charset=utf-8"} ) return Response( status_code=200, description=jsonify({"success": True, "message": "密码更新成功"}), headers={"Content-Type": "application/json; charset=utf-8"} ) except Exception as e: return Response( status_code=500, description=jsonify({"success": False, "message": f"密码更新失败: {str(e)}"}), headers={"Content-Type": "application/json; charset=utf-8"} ) @app.after_request("/") def add_cors_headers(response): """添加CORS头,支持跨域请求""" response.headers.update({ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS", "Access-Control-Allow-Headers": "Content-Type, Authorization, X-Requested-With" }) return response