# core/connection_manager.py import serial import serial.tools.list_ports from utils.logger import logger class SerialManager: def __init__(self): self.ser = None # 串口对象 self.is_connected = False def get_available_ports(self): """ 扫描并返回当前系统中所有可用的串口端口名 """ ports = serial.tools.list_ports.comports() return [port.device for port in ports] def connect(self, port, baudrate=9600, bytesize=8, stopbits=1, parity='N'): """ 连接指定的串口 """ try: # 如果已有连接,先关闭 if self.ser and self.ser.is_open: self.ser.close() # 设置校验位映射 parity_map = { 'N': serial.PARITY_NONE, 'E': serial.PARITY_EVEN, 'O': serial.PARITY_ODD } self.ser = serial.Serial( port=port, baudrate=baudrate, bytesize=bytesize, stopbits=stopbits, parity=parity_map.get(parity, serial.PARITY_NONE), timeout=1 # 设置读超时 ) if self.ser.is_open: self.is_connected = True logger.info(f"🎉 串口连接成功: {port} @ {baudrate}") return True except Exception as e: logger.error(f"❌ 串口连接失败 {port}: {e}") self.is_connected = False return False def disconnect(self): """ 断开串口连接 """ try: if self.ser and self.ser.is_open: self.ser.close() self.is_connected = False logger.info("🔌 串口已断开") except Exception as e: logger.error(f"❌ 断开串口时出错: {e}") def send_command(self, command_bytes): """ 发送数据到串口 :param command_bytes: bytes类型,例如 b'\x01\x02\xFF' """ if self.is_connected and self.ser and self.ser.is_open: try: # 添加这行日志,确认代码确实跑到了这里 hex_str = ' '.join(f'{b:02X}' for b in command_bytes) logger.debug(f"准备调用系统API发送: {hex_str}") # 调用写入 bytes_written = self.ser.write(command_bytes) # 关键:打印实际写入的字节数 logger.info(f"✅ 发送成功,写入 {bytes_written} 个字节") return True except Exception as e: # 这里要捕获具体的异常 logger.error(f"❌ 发送数据时发生异常: {e}") return False