# core/device_manager.py import logging # --- 模拟/真实驱动选择逻辑 --- class MatrixSerialPort: def __init__(self, port, baudrate): self.port = port self.baudrate = baudrate self.is_open = False print(f"【模拟】初始化 {port}") # --- 必须要有这个方法,名字必须是 connect --- def connect(self): print(f"【模拟】成功连接到 {self.port}") self.is_open = True return True # --- 必须要有这个方法,名字必须是 close --- def close(self): print(f"【模拟】断开 {self.port}") self.is_open = False def send_data(self, hex_str): if self.is_open: print(f"【模拟】发送指令: {hex_str}") else: print("【模拟】错误:设备未连接") HARDWARE_AVAILABLE = False logging.info("【模拟模式】运行在模拟环境") class MatrixController: def __init__(self, port='COM3', baudrate=9600): self.port = port self.baudrate = baudrate # 实例化设备(此时会根据上面的逻辑,自动选择真实或模拟) self.device = MatrixSerialPort(port, baudrate) self.is_connected = False def connect(self): try: # 注意这里:是 .connect() print(self.device) result = self.device.connect() self.is_connected = result return result except Exception as e: print(f"❌ 连接失败: {e}") return False def send_command(self, hex_string): """发送指令的方法""" if not self.is_connected: print("⚠️ 未连接设备,请先调用 connect()") return False try: # 根据真实驱动或模拟驱动的不同,这里会自动适配 if hasattr(self.device, 'send_data'): self.device.send_data(hex_string) else: # 兼容真实 serial 的 write 方式 self.device.write(bytes.fromhex(hex_string.replace(' ', ''))) return True except Exception as e: print(f"❌ 发送指令失败: {e}") return False def disconnect(self): """断开连接""" self.device.close() self.is_connected = False print("🔌 已断开连接")