Browse Source

拼接功能提交

master
王鹏 3 months ago
parent
commit
93cf1c3196
  1. 58
      core/serial_protocol.py
  2. 276
      main.py

58
core/serial_protocol.py

@ -1,7 +1,6 @@
# core/serial_protocol.py
import serial
import serial.tools.list_ports
from utils.logger import logger
class SerialProtocol:
def __init__(self):
@ -32,7 +31,7 @@ class SerialProtocol:
0x80, # HEAD (假设)
0x74, # CMD (接命令)
0x46, # 起始位置
0xFF, 0xFF, 0xFF, 0xFF # 填充位 (根据协议长度补齐)
0xFF, 0xFF, 0xFF, 0xFF, 0xFF # 填充位 (根据协议长度补齐)
]
return bytes(cmd_bytes)
@ -48,27 +47,44 @@ class SerialProtocol:
]
return bytes(cmd_bytes)
def power_off_panel(self, panel_id):
"""指定单屏关机"""
# 假设协议是: 80 74 19 [PanelNo] FF FF FF FF
# panel_id 是 1-12,转成16进制
hex_panel = f"{panel_id:02X}"
cmd_str = f"80 74 19 {hex_panel} FF FF FF FF"
return bytes.fromhex(cmd_str)
def change_signal(self, big_pic_id):
"""设置窗口信号源改变标志指令"""
# 根据你的协议文档,这里填入真实的十六进制指令
# 示例: [0x80, 0x94, 0x5F, big_pic_id, 0xFF, 0xFF, 0xFF, 0xFF]
cmd_bytes = [
0x80, # HEAD (假设)
0x94, # CMD (拼接命令)
0x5F, # 起始位置
big_pic_id,
0xFF, 0xFF, 0xFF, 0xFF # 填充位 (根据协议长度补齐)
]
return bytes(cmd_bytes)
def switch_layout(self, start_pos, cols, rows, source_type=0x33):
"""切换拼接布局 (例如 2x2)"""
# 示例指令,具体根据你的附件协议调整
return bytes.fromhex(f"80 D6 10 {start_pos:02X} {rows:02X} {cols:02X} 01 FF")
def send_signal(self, SourceType, SourceId):
"""根据信号源切换标志切换信号源指令"""
# 根据你的协议文档,这里填入真实的十六进制指令
# 示例: [0x80 0x94 0x81 SourceType SourceId 0xFF 0xFF 0xFF]
cmd_bytes = [
0x80, # HEAD (假设)
0x94, # CMD (拼接命令)
0x81, # 起始位置
SourceType,
SourceId,
0xFF, 0xFF, 0xFF # 填充位 (根据协议长度补齐)
]
return bytes(cmd_bytes)
def create_mosaic_window(self, start_id, rows, cols, source_id):
"""
生成拼接指令
硬件协议通常是设置逻辑屏大小 -> 绑定信号源 -> 显示
"""
# 示例伪代码
cmd1 = bytes.fromhex(f"80 D6 10 {start_id} {rows} {cols} {source_id} FF")
return [cmd1]
def cancel_signal(self):
"""设置窗口信号源改变标志指令"""
# 根据你的协议文档,这里填入真实的十六进制指令
# 示例: [0x80 0x94 0x3A 0xFF 0xFF 0xFF 0xFF 0xFF]
cmd_bytes = [
0x80, # HEAD (假设)
0x94, # CMD (拼接命令)
0x3A, # 起始位置
0xFF, 0xFF, 0xFF, 0xFF, 0xFF # 填充位 (根据协议长度补齐)
]
return bytes(cmd_bytes)
def calculate_mosaic_params(self, selected_widgets):
"""

276
main.py

@ -26,6 +26,7 @@ class ScreenLabel(QLabel):
split_solution = []
click_labels = []
big_pic_id = 0
"""自定义 Label,增加了右键信号"""
# 定义一个右键信号,携带当前屏幕的 ID
@ -85,18 +86,18 @@ class ScreenLabel(QLabel):
if ScreenLabel.split_solution[self.screen_id - 1]:
ScreenLabel.split_solution[self.screen_id - 1]['status'] = 0
self.update_style(0)
self.update_style()
# 强制刷新界面
self.update()
def update_style(self, status):
def update_style(self, status = 0):
if status == 0:
self.init_style()
elif status == 1:
# 选中状态:蓝色背景
self.setStyleSheet("""
background-color: rgb(0, 170, 255);
background-color: rgb(0, 170, 255); /* 蓝色背景 */
color: white;
border: 2px solid yellow;
font-weight: bold;
@ -104,9 +105,9 @@ class ScreenLabel(QLabel):
elif status == 2:
# 锁定状态:绿色背景
self.setStyleSheet("""
background-color: rgb(0, 255, 0);
color: white;
border: 2px solid yellow;
background-color: rgb(0, 255, 127); /* 绿色背景 */
color: black;
border: 2px solid white;
font-weight: bold;
""")
@ -238,19 +239,6 @@ class MatrixControlApp(QMainWindow, Ui_MainWindow):
"""处理右键点击事件"""
print(f"屏幕 {screen_id} 被右键点击了")
if len(ScreenLabel.click_labels) > 1 and screen_id in ScreenLabel.click_labels:
"""处理拼接屏幕事件"""
# 1. 创建菜单
menu = QMenu(self)
menu.addAction(f"--- 屏幕 {screen_id} ---")
menu.addSeparator()
split_act = QAction("拼接屏幕", self)
menu.addAction(split_act)
action = menu.exec_(QCursor.pos())
if action == split_act:
self.set_split(screen_id)
else:
# 1. 创建菜单
menu = QMenu(self)
menu.addAction(f"--- 屏幕 {screen_id} ---")
@ -289,10 +277,11 @@ class MatrixControlApp(QMainWindow, Ui_MainWindow):
if action == cancel_act:
self.clear_screen(screen_id)
for k, item in enumerate(available_sources, start=0):
for k, item in enumerate(available_sources, start = 0):
if action == new_var_name[k]:
self.assign_source_to_screen(screen_id, item)
def get_current_layout_params(self):
"""
获取当前的拼接布局参数
@ -321,69 +310,108 @@ class MatrixControlApp(QMainWindow, Ui_MainWindow):
# 2. 【新增】通过串口管理器发送
# 这里的 self.serial_manager 来自于我们在 main.py 中的初始化
if hasattr(self, 'serial_manager') and self.serial_manager.is_connected:
# send_command 方法来自 SerialManager 类
select_all_bytes = self.serial_protocol.select_all()
# 1. 获取点击的屏幕
# screen_status = ScreenLabel.split_solution[screen_id - 1]['status']
# screen_big_pic_id = ScreenLabel.split_solution[screen_id - 1]['big_pic_id']
#
# if screen_status == 3:
# for split_screen in ScreenLabel.split_solution:
# if split_screen['status'] == 3 and split_screen['big_pic_id'] == screen_big_pic_id:
# self.serial_manager.cancel_all()
# break
print(select_all_bytes)
if len(ScreenLabel.click_labels) > 0 and screen_id in ScreenLabel.click_labels:
"""
设置屏幕控件的拼接状态
"""
# 获取选中的屏幕控件
selected_widgets = [self.screen_labels[value - 1] for value in ScreenLabel.click_labels]
if not select_all_bytes:
# 生成拼接指令参数
params = self.serial_protocol.calculate_mosaic_params(selected_widgets)
# 2. 【新增】通过串口管理器发送
# 这里的 self.serial_manager 来自于我们在 main.py 中的初始化
if hasattr(self, 'serial_manager') and self.serial_manager.is_connected:
start = params['start']
v_count = params['v_count']
h_count = params['h_count']
# 计算拼接id
ScreenLabel.big_pic_id += 1
# 发送全部取消选择指令
cancel_all_bytes = self.serial_protocol.cancel_all()
if not cancel_all_bytes:
logger.error("指令封装失败,数据无效")
return
# send_command 方法来自 SerialManager 类
success = self.serial_manager.send_command(select_all_bytes)
success = self.serial_manager.send_command(cancel_all_bytes)
if success:
logger.info(f"✅ 取消选择指令发送成功")
else:
logger.error("取消选择指令发送失败")
click_labels = ScreenLabel.click_labels
# 生成拼接指令
mosaic_packet_bytes = self.serial_protocol.build_mosaic_packet(start, v_count, h_count, ScreenLabel.big_pic_id)
# 1. 查找对应的 Label 对象
# label = self.screen_labels[screen_id - 1] # 数组下标从0开始
if not mosaic_packet_bytes:
logger.error("指令封装失败,数据无效")
return
if click_labels and screen_id in click_labels:
for click_label in click_labels:
label = self.screen_labels[click_label - 1] # 数组下标从0开始
# send_command 方法来自 SerialManager 类
success = self.serial_manager.send_command(mosaic_packet_bytes)
# 2. 修改显示文字和样式
label.setText(f"Screen {click_label}\n[{source['name']}]\n(已分配)")
label.setStyleSheet("""
background-color: rgb(0, 255, 127); /* 绿色背景 */
color: black;
border: 2px solid white;
font-weight: bold;
""")
if success:
logger.info(f"✅ 拼接指令发送成功")
else:
logger.error("拼接指令发送失败")
# 3. 这里调用核心控制器发送指令
# self.matrix.send_layout_command([screen_id], source)
# 发送设置窗口信号源改变标志指令
change_signal_bytes = self.serial_protocol.change_signal(ScreenLabel.big_pic_id)
if not change_signal_bytes:
logger.error("指令封装失败,数据无效")
return
if ScreenLabel.split_solution[click_label - 1]:
ScreenLabel.split_solution[click_label - 1]['status'] = 2
ScreenLabel.split_solution[click_label - 1]['source_type'] = source['type']
ScreenLabel.split_solution[click_label - 1]['source_id'] = source['id']
# send_command 方法来自 SerialManager 类
success = self.serial_manager.send_command(change_signal_bytes)
print(f"【指令】屏幕 {click_label} 已切换至 {source['name']}")
if success:
logger.info(f"✅ 设置窗口信号源改变标志指令发送成功")
else:
label = self.screen_labels[screen_id - 1] # 数组下标从0开始
logger.error("设置窗口信号源改变标志指令发送失败")
# 2. 修改显示文字和样式
label.setText(f"Screen {screen_id}\n[{source['name']}]\n(已分配)")
label.setStyleSheet("""
background-color: rgb(0, 255, 127); /* 绿色背景 */
color: black;
border: 2px solid white;
font-weight: bold;
""")
# 发送信号源切换指令
send_signal_bytes = self.serial_protocol.send_signal(source['type'], source['id'])
# 3. 这里调用核心控制器发送指令
# self.matrix.send_layout_command([screen_id], source)
if not send_signal_bytes:
logger.error("指令封装失败,数据无效")
return
if ScreenLabel.split_solution[screen_id - 1]:
ScreenLabel.split_solution[screen_id - 1]['status'] = 2
ScreenLabel.split_solution[screen_id - 1]['source_type'] = source['type']
ScreenLabel.split_solution[screen_id - 1]['source_id'] = source['id']
# send_command 方法来自 SerialManager 类
success = self.serial_manager.send_command(send_signal_bytes)
print(f"【指令】屏幕 {screen_id} 已切换至 {source['name']}")
if success:
logger.info(f"✅ 设置窗口信号源改变标志指令发送成功")
else:
logger.error("设置窗口信号源改变标志指令发送失败")
if success:
for i in ScreenLabel.click_labels:
ScreenLabel.split_solution[i - 1]["status"] = 2
ScreenLabel.split_solution[i - 1]['source_type'] = source['type']
ScreenLabel.split_solution[i - 1]['source_id'] = source['id']
ScreenLabel.split_solution[i - 1]["big_pic_id"] = ScreenLabel.big_pic_id
label = self.screen_labels[i - 1] # 数组下标从0开始
label.setText(f"Screen {i}\n[分组_{ScreenLabel.big_pic_id}]\n{source['name']}\n(已分配)")
label.update_style(2)
# 清空选中的屏幕控件
ScreenLabel.click_labels = []
logger.info(f"✅ 指令发送成功")
@ -394,18 +422,18 @@ class MatrixControlApp(QMainWindow, Ui_MainWindow):
logger.error("指令发送失败,未连接数据大屏")
return
# # 3. 这里调用核心控制器发送指令
# # self.matrix.send_layout_command([screen_id], source)
# print(f"【指令】屏幕 {screen_id} 已切换至 {source}")
def set_split(self, screen_id):
elif ScreenLabel.split_solution[screen_id - 1]['status'] == 2 and ScreenLabel.split_solution[screen_id - 1]['big_pic_id'] > 0:
"""
设置屏幕控件的拼接状态
解除屏幕拼接状态并切换信号
"""
# self.update_style()
# 获取选中的屏幕控件
selected_widgets = [self.screen_labels[value - 1] for value in ScreenLabel.click_labels]
ids = []
for i in ScreenLabel.split_solution:
if i['status'] == 2 and i['big_pic_id'] == ScreenLabel.split_solution[screen_id - 1]['big_pic_id']:
ids.append(i['screen_id'])
selected_widgets = [self.screen_labels[value - 1] for value in ids]
# 生成拼接指令参数
params = self.serial_protocol.calculate_mosaic_params(selected_widgets)
@ -418,11 +446,7 @@ class MatrixControlApp(QMainWindow, Ui_MainWindow):
h_count = params['h_count']
# 计算拼接id
big_pic_id = 1
for i in range(start, start + v_count * h_count):
if ScreenLabel.split_solution[i]['status'] == 3 and ScreenLabel.split_solution[i]['big_pic_id'] > 0:
big_pic_id += 1
break
big_pic_id = ScreenLabel.split_solution[screen_id - 1]['big_pic_id']
# 发送全部取消选择指令
cancel_all_bytes = self.serial_protocol.cancel_all()
@ -440,7 +464,7 @@ class MatrixControlApp(QMainWindow, Ui_MainWindow):
logger.error("取消选择指令发送失败")
# 生成拼接指令
mosaic_packet_bytes = self.serial_protocol.build_mosaic_packet(start, v_count, h_count, big_pic_id)
mosaic_packet_bytes = self.serial_protocol.build_mosaic_packet(start, v_count, h_count, big_pic_id, 0x11)
if not mosaic_packet_bytes:
logger.error("指令封装失败,数据无效")
@ -450,17 +474,105 @@ class MatrixControlApp(QMainWindow, Ui_MainWindow):
success = self.serial_manager.send_command(mosaic_packet_bytes)
if success:
logger.info(f"✅ 拼接指令发送成功")
else:
logger.error("拼接指令发送失败")
ScreenLabel.click_labels = []
# 发送设置窗口信号源改变标志指令
cancel_signal_bytes = self.serial_protocol.cancel_signal()
ids = [w.screen_id for w in selected_widgets]
if not cancel_signal_bytes:
logger.error("指令封装失败,数据无效")
return
# send_command 方法来自 SerialManager 类
success = self.serial_manager.send_command(cancel_signal_bytes)
if success:
logger.info(f"✅ 取消设置窗口信号源改变标志指令发送成功")
else:
logger.error("取消设置窗口信号源改变标志指令发送失败")
# 发送信号源切换指令
send_signal_bytes = self.serial_protocol.build_mosaic_packet(start, v_count, h_count, big_pic_id, 0x12)
if not send_signal_bytes:
logger.error("指令封装失败,数据无效")
return
# send_command 方法来自 SerialManager 类
success = self.serial_manager.send_command(send_signal_bytes)
if success:
logger.info(f"✅ 还原拼接前的信号源指令发送成功")
else:
logger.error("还原拼接前的信号源指令发送失败")
if success:
for i in ids:
ScreenLabel.split_solution[i - 1]["status"] = 3
ScreenLabel.split_solution[i - 1]["big_pic_id"] = big_pic_id
ScreenLabel.split_solution[i - 1]["status"] = 0
ScreenLabel.split_solution[i - 1]['source_type'] = 0
ScreenLabel.split_solution[i - 1]['source_id'] = 0
ScreenLabel.split_solution[i - 1]["big_pic_id"] = 0
label = self.screen_labels[i - 1] # 数组下标从0开始
label.setText(f"Screen {i}\n[分组_{big_pic_id}]\n(已拼接)")
label.setText(f"Screen {i}\n(空闲)")
label.update_style(0)
# 清空选中的屏幕控件
ScreenLabel.click_labels = []
logger.info(f"✅ 指令发送成功")
else:
logger.error("指令发送失败")
else:
logger.error("指令发送失败,未连接数据大屏")
return
else:
label = self.screen_labels[screen_id - 1] # 数组下标从0开始
# 2. 修改显示文字和样式
label.setText(f"Screen {screen_id}\n[{source['name']}]\n(已分配)")
label.update_style(2)
# 3. 这里调用核心控制器发送指令
# self.matrix.send_layout_command([screen_id], source)
if ScreenLabel.split_solution[screen_id - 1]:
ScreenLabel.split_solution[screen_id - 1]['status'] = 2
ScreenLabel.split_solution[screen_id - 1]['source_type'] = source['type']
ScreenLabel.split_solution[screen_id - 1]['source_id'] = source['id']
print(f"【指令】屏幕 {screen_id} 已切换至 {source['name']}")
ScreenLabel.click_labels = []
# send_command 方法来自 SerialManager 类
select_all_bytes = self.serial_protocol.select_all()
print(select_all_bytes)
if not select_all_bytes:
logger.error("指令封装失败,数据无效")
return
# send_command 方法来自 SerialManager 类
success = self.serial_manager.send_command(select_all_bytes)
if success:
# 1. 查找对应的 Label 对象
# label = self.screen_labels[screen_id - 1] # 数组下标从0开始
logger.info(f"✅ 指令发送成功")
else:

Loading…
Cancel
Save