You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
76 lines
2.3 KiB
76 lines
2.3 KiB
# core/device_manager.py
|
|
import logging
|
|
|
|
# --- 模拟/真实驱动选择逻辑 ---
|
|
|
|
|
|
class MatrixSerialPort:
|
|
def __init__(self, port, baudrate):
|
|
self.port = port
|
|
self.baudrate = baudrate
|
|
self.is_open = False
|
|
print(f"【模拟】初始化 {port}")
|
|
|
|
# --- 必须要有这个方法,名字必须是 connect ---
|
|
def connect(self):
|
|
print(f"【模拟】成功连接到 {self.port}")
|
|
self.is_open = True
|
|
return True
|
|
|
|
# --- 必须要有这个方法,名字必须是 close ---
|
|
def close(self):
|
|
print(f"【模拟】断开 {self.port}")
|
|
self.is_open = False
|
|
|
|
def send_data(self, hex_str):
|
|
if self.is_open:
|
|
print(f"【模拟】发送指令: {hex_str}")
|
|
else:
|
|
print("【模拟】错误:设备未连接")
|
|
|
|
|
|
HARDWARE_AVAILABLE = False
|
|
logging.info("【模拟模式】运行在模拟环境")
|
|
|
|
class MatrixController:
|
|
def __init__(self, port='COM3', baudrate=9600):
|
|
self.port = port
|
|
self.baudrate = baudrate
|
|
# 实例化设备(此时会根据上面的逻辑,自动选择真实或模拟)
|
|
self.device = MatrixSerialPort(port, baudrate)
|
|
self.is_connected = False
|
|
|
|
def connect(self):
|
|
try:
|
|
# 注意这里:是 .connect()
|
|
print(self.device)
|
|
|
|
result = self.device.connect()
|
|
self.is_connected = result
|
|
return result
|
|
except Exception as e:
|
|
print(f"❌ 连接失败: {e}")
|
|
return False
|
|
|
|
def send_command(self, hex_string):
|
|
"""发送指令的方法"""
|
|
if not self.is_connected:
|
|
print("⚠️ 未连接设备,请先调用 connect()")
|
|
return False
|
|
try:
|
|
# 根据真实驱动或模拟驱动的不同,这里会自动适配
|
|
if hasattr(self.device, 'send_data'):
|
|
self.device.send_data(hex_string)
|
|
else:
|
|
# 兼容真实 serial 的 write 方式
|
|
self.device.write(bytes.fromhex(hex_string.replace(' ', '')))
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ 发送指令失败: {e}")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""断开连接"""
|
|
self.device.close()
|
|
self.is_connected = False
|
|
print("🔌 已断开连接")
|