You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
86 lines
2.8 KiB
86 lines
2.8 KiB
# core/connection_manager.py
|
|
import serial
|
|
import serial.tools.list_ports
|
|
from utils.logger import logger
|
|
|
|
class SerialManager:
|
|
def __init__(self):
|
|
self.ser = None # 串口对象
|
|
self.is_connected = False
|
|
|
|
def get_available_ports(self):
|
|
"""
|
|
扫描并返回当前系统中所有可用的串口端口名
|
|
"""
|
|
ports = serial.tools.list_ports.comports()
|
|
return [port.device for port in ports]
|
|
|
|
def connect(self, port, baudrate=9600, bytesize=8, stopbits=1, parity='N'):
|
|
"""
|
|
连接指定的串口
|
|
"""
|
|
try:
|
|
# 如果已有连接,先关闭
|
|
if self.ser and self.ser.is_open:
|
|
self.ser.close()
|
|
|
|
# 设置校验位映射
|
|
parity_map = {
|
|
'N': serial.PARITY_NONE,
|
|
'E': serial.PARITY_EVEN,
|
|
'O': serial.PARITY_ODD
|
|
}
|
|
|
|
self.ser = serial.Serial(
|
|
port=port,
|
|
baudrate=baudrate,
|
|
bytesize=bytesize,
|
|
stopbits=stopbits,
|
|
parity=parity_map.get(parity, serial.PARITY_NONE),
|
|
timeout=1 # 设置读超时
|
|
)
|
|
|
|
if self.ser.is_open:
|
|
self.is_connected = True
|
|
logger.info(f"🎉 串口连接成功: {port} @ {baudrate}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 串口连接失败 {port}: {e}")
|
|
self.is_connected = False
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""
|
|
断开串口连接
|
|
"""
|
|
try:
|
|
if self.ser and self.ser.is_open:
|
|
self.ser.close()
|
|
self.is_connected = False
|
|
logger.info("🔌 串口已断开")
|
|
except Exception as e:
|
|
logger.error(f"❌ 断开串口时出错: {e}")
|
|
|
|
def send_command(self, command_bytes):
|
|
"""
|
|
发送数据到串口
|
|
:param command_bytes: bytes类型,例如 b'\x01\x02\xFF'
|
|
"""
|
|
if self.is_connected and self.ser and self.ser.is_open:
|
|
try:
|
|
# 添加这行日志,确认代码确实跑到了这里
|
|
hex_str = ' '.join(f'{b:02X}' for b in command_bytes)
|
|
logger.debug(f"准备调用系统API发送: {hex_str}")
|
|
|
|
# 调用写入
|
|
bytes_written = self.ser.write(command_bytes)
|
|
|
|
# 关键:打印实际写入的字节数
|
|
logger.info(f"✅ 发送成功,写入 {bytes_written} 个字节")
|
|
|
|
return True
|
|
except Exception as e:
|
|
# 这里要捕获具体的异常
|
|
logger.error(f"❌ 发送数据时发生异常: {e}")
|
|
return False
|