# redis_utils.py import redis from typing import Any, Optional, Union, List, Dict import os # 从环境变量读取配置(可选,更安全) REDIS_HOST = os.getenv("REDIS_HOST", "localhost") REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) REDIS_DB = int(os.getenv("REDIS_DB", 0)) REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None) # 创建全局唯一的 Redis 客户端(模块加载时初始化一次) _client = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD, decode_responses=True, socket_connect_timeout=5, retry_on_timeout=True, health_check_interval=30 ) # ========== 封装常用方法(直接操作 _client) ========== def set(key: str, value: Any, ex: Optional[int] = None) -> bool: """设置字符串值""" return _client.set(key, value, ex=ex) def get(key: str) -> Optional[str]: """获取字符串值""" return _client.get(key) def delete(*keys: str) -> int: """删除 key""" return _client.delete(*keys) def exists(key: str) -> bool: """检查 key 是否存在""" return _client.exists(key) == 1 def hset(name: str, key: str, value: Any) -> int: """Hash 设置字段""" return _client.hset(name, key, value) def hget(name: str, key: str) -> Optional[str]: """Hash 获取字段""" return _client.hget(name, key) def hgetall(name: str) -> Dict[str, str]: """获取整个 Hash""" return _client.hgetall(name) def lpush(name: str, *values: Any) -> int: """List 左插入""" return _client.lpush(name, *values) def rpop(name: str) -> Optional[str]: """List 右弹出""" return _client.rpop(name) def sadd(name: str, *values: Any) -> int: """Set 添加成员""" return _client.sadd(name, *values) def smembers(name: str) -> set: """获取 Set 所有成员""" return _client.smembers(name) def keys(pattern: str = "*") -> List[str]: """查找匹配的 key(慎用)""" return _client.keys(pattern) def ttl(key: str) -> int: """获取 key 剩余生存时间(秒)""" return _client.ttl(key) def expire(key: str, seconds: int) -> bool: """设置 key 过期时间""" return _client.expire(key, seconds) def ping() -> bool: """测试 Redis 连接""" try: return _client.ping() except Exception: return False # 可选:提供原始 client(谨慎使用) def get_raw_client(): return _client