From 93cf1c319642c55b859ffa31a156dcc2b5acd479 Mon Sep 17 00:00:00 2001 From: kaiser Date: Fri, 26 Dec 2025 16:20:15 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=BC=E6=8E=A5=E5=8A=9F=E8=83=BD=E6=8F=90?= =?UTF-8?q?=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/serial_protocol.py | 58 ++++--- main.py | 430 ++++++++++++++++++++++++++++++------------------ 2 files changed, 308 insertions(+), 180 deletions(-) diff --git a/core/serial_protocol.py b/core/serial_protocol.py index 641e0f7..5dffa9c 100644 --- a/core/serial_protocol.py +++ b/core/serial_protocol.py @@ -1,7 +1,6 @@ # core/serial_protocol.py import serial import serial.tools.list_ports -from utils.logger import logger class SerialProtocol: def __init__(self): @@ -32,7 +31,7 @@ class SerialProtocol: 0x80, # HEAD (假设) 0x74, # CMD (接命令) 0x46, # 起始位置 - 0xFF, 0xFF, 0xFF, 0xFF # 填充位 (根据协议长度补齐) + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF # 填充位 (根据协议长度补齐) ] return bytes(cmd_bytes) @@ -48,27 +47,44 @@ class SerialProtocol: ] return bytes(cmd_bytes) - def power_off_panel(self, panel_id): - """指定单屏关机""" - # 假设协议是: 80 74 19 [PanelNo] FF FF FF FF - # panel_id 是 1-12,转成16进制 - hex_panel = f"{panel_id:02X}" - cmd_str = f"80 74 19 {hex_panel} FF FF FF FF" - return bytes.fromhex(cmd_str) + def change_signal(self, big_pic_id): + """设置窗口信号源改变标志指令""" + # 根据你的协议文档,这里填入真实的十六进制指令 + # 示例: [0x80, 0x94, 0x5F, big_pic_id, 0xFF, 0xFF, 0xFF, 0xFF] + cmd_bytes = [ + 0x80, # HEAD (假设) + 0x94, # CMD (拼接命令) + 0x5F, # 起始位置 + big_pic_id, + 0xFF, 0xFF, 0xFF, 0xFF # 填充位 (根据协议长度补齐) + ] + return bytes(cmd_bytes) - def switch_layout(self, start_pos, cols, rows, source_type=0x33): - """切换拼接布局 (例如 2x2)""" - # 示例指令,具体根据你的附件协议调整 - return bytes.fromhex(f"80 D6 10 {start_pos:02X} {rows:02X} {cols:02X} 01 FF") + def send_signal(self, SourceType, SourceId): + """根据信号源切换标志切换信号源指令""" + # 根据你的协议文档,这里填入真实的十六进制指令 + # 示例: [0x80 0x94 0x81 SourceType SourceId 0xFF 0xFF 0xFF] + cmd_bytes = [ + 0x80, # HEAD (假设) + 0x94, # CMD (拼接命令) + 0x81, # 起始位置 + SourceType, + SourceId, + 0xFF, 0xFF, 0xFF # 填充位 (根据协议长度补齐) + ] + return bytes(cmd_bytes) - def create_mosaic_window(self, start_id, rows, cols, source_id): - """ - 生成拼接指令 - 硬件协议通常是:设置逻辑屏大小 -> 绑定信号源 -> 显示 - """ - # 示例伪代码 - cmd1 = bytes.fromhex(f"80 D6 10 {start_id} {rows} {cols} {source_id} FF") - return [cmd1] + def cancel_signal(self): + """设置窗口信号源改变标志指令""" + # 根据你的协议文档,这里填入真实的十六进制指令 + # 示例: [0x80 0x94 0x3A 0xFF 0xFF 0xFF 0xFF 0xFF] + cmd_bytes = [ + 0x80, # HEAD (假设) + 0x94, # CMD (拼接命令) + 0x3A, # 起始位置 + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF # 填充位 (根据协议长度补齐) + ] + return bytes(cmd_bytes) def calculate_mosaic_params(self, selected_widgets): """ diff --git a/main.py b/main.py index 114e17d..7b24c19 100644 --- a/main.py +++ b/main.py @@ -26,6 +26,7 @@ class ScreenLabel(QLabel): split_solution = [] click_labels = [] + big_pic_id = 0 """自定义 Label,增加了右键信号""" # 定义一个右键信号,携带当前屏幕的 ID @@ -85,18 +86,18 @@ class ScreenLabel(QLabel): if ScreenLabel.split_solution[self.screen_id - 1]: ScreenLabel.split_solution[self.screen_id - 1]['status'] = 0 - self.update_style(0) + self.update_style() # 强制刷新界面 self.update() - def update_style(self, status): + def update_style(self, status = 0): if status == 0: self.init_style() elif status == 1: # 选中状态:蓝色背景 self.setStyleSheet(""" - background-color: rgb(0, 170, 255); + background-color: rgb(0, 170, 255); /* 蓝色背景 */ color: white; border: 2px solid yellow; font-weight: bold; @@ -104,9 +105,9 @@ class ScreenLabel(QLabel): elif status == 2: # 锁定状态:绿色背景 self.setStyleSheet(""" - background-color: rgb(0, 255, 0); - color: white; - border: 2px solid yellow; + background-color: rgb(0, 255, 127); /* 绿色背景 */ + color: black; + border: 2px solid white; font-weight: bold; """) @@ -238,60 +239,48 @@ class MatrixControlApp(QMainWindow, Ui_MainWindow): """处理右键点击事件""" print(f"屏幕 {screen_id} 被右键点击了") - if len(ScreenLabel.click_labels) > 1 and screen_id in ScreenLabel.click_labels: - """处理拼接屏幕事件""" - # 1. 创建菜单 - menu = QMenu(self) - menu.addAction(f"--- 屏幕 {screen_id} ---") - menu.addSeparator() - split_act = QAction("拼接屏幕", self) - menu.addAction(split_act) - - action = menu.exec_(QCursor.pos()) - if action == split_act: - self.set_split(screen_id) - else: - # 1. 创建菜单 - menu = QMenu(self) - menu.addAction(f"--- 屏幕 {screen_id} ---") - menu.addSeparator() - - # 2. 模拟信号源选项 - source_group = QActionGroup(self) # 使用 QActionGroup 来实现单选 - new_var_name = [] - available_sources = [ - {"type": 0x33, "id": 0x01, "name": "HDMI 1"}, - {"type": 0x35, "id": 0x01, "name": "DVI 1"}, - {"type": 0x32, "id": 0x01, "name": "AV 1"}, # SourceType=0x32, Sourceld=0x01 - {"type": 0x32, "id": 0x02, "name": "AV 2"}, - ] - - # 模拟信号源列表,实际可以从设备读取 - # available_sources = ["HDMI 1", "HDMI 2", "DP 1", "DP 2", "Camera 1", "Test Pattern"] - for item in available_sources: # 这里用 enumerate 替代手动 k+=1 - # 1. 创建 QAction 对象 - action = QAction(item['name'], self) - action.setData(item) - - # 2. 如果需要动态变量名存入全局(虽然不推荐,但如果你必须用) - new_var_name.append(action) - - # 3. 将 QAction 对象添加到菜单 (关键修改在这里) - menu.addAction(action) - - menu.addSeparator() - cancel_act = QAction("清除信号", self) - menu.addAction(cancel_act) - - # 3. 显示菜单并获取结果 - action = menu.exec_(QCursor.pos()) - - if action == cancel_act: - self.clear_screen(screen_id) - - for k, item in enumerate(available_sources, start=0): - if action == new_var_name[k]: - self.assign_source_to_screen(screen_id, item) + # 1. 创建菜单 + menu = QMenu(self) + menu.addAction(f"--- 屏幕 {screen_id} ---") + menu.addSeparator() + + # 2. 模拟信号源选项 + source_group = QActionGroup(self) # 使用 QActionGroup 来实现单选 + new_var_name = [] + available_sources = [ + {"type": 0x33, "id": 0x01, "name": "HDMI 1"}, + {"type": 0x35, "id": 0x01, "name": "DVI 1"}, + {"type": 0x32, "id": 0x01, "name": "AV 1"}, # SourceType=0x32, Sourceld=0x01 + {"type": 0x32, "id": 0x02, "name": "AV 2"}, + ] + + # 模拟信号源列表,实际可以从设备读取 + # available_sources = ["HDMI 1", "HDMI 2", "DP 1", "DP 2", "Camera 1", "Test Pattern"] + for item in available_sources: # 这里用 enumerate 替代手动 k+=1 + # 1. 创建 QAction 对象 + action = QAction(item['name'], self) + action.setData(item) + + # 2. 如果需要动态变量名存入全局(虽然不推荐,但如果你必须用) + new_var_name.append(action) + + # 3. 将 QAction 对象添加到菜单 (关键修改在这里) + menu.addAction(action) + + menu.addSeparator() + cancel_act = QAction("清除信号", self) + menu.addAction(cancel_act) + + # 3. 显示菜单并获取结果 + action = menu.exec_(QCursor.pos()) + + if action == cancel_act: + self.clear_screen(screen_id) + + for k, item in enumerate(available_sources, start = 0): + if action == new_var_name[k]: + self.assign_source_to_screen(screen_id, item) + def get_current_layout_params(self): """ @@ -321,146 +310,269 @@ class MatrixControlApp(QMainWindow, Ui_MainWindow): # 2. 【新增】通过串口管理器发送 # 这里的 self.serial_manager 来自于我们在 main.py 中的初始化 if hasattr(self, 'serial_manager') and self.serial_manager.is_connected: - # send_command 方法来自 SerialManager 类 - select_all_bytes = self.serial_protocol.select_all() + # 1. 获取点击的屏幕 + # screen_status = ScreenLabel.split_solution[screen_id - 1]['status'] + # screen_big_pic_id = ScreenLabel.split_solution[screen_id - 1]['big_pic_id'] + # + # if screen_status == 3: + # for split_screen in ScreenLabel.split_solution: + # if split_screen['status'] == 3 and split_screen['big_pic_id'] == screen_big_pic_id: + # self.serial_manager.cancel_all() + # break + + if len(ScreenLabel.click_labels) > 0 and screen_id in ScreenLabel.click_labels: + """ + 设置屏幕控件的拼接状态 + """ + # 获取选中的屏幕控件 + selected_widgets = [self.screen_labels[value - 1] for value in ScreenLabel.click_labels] + + # 生成拼接指令参数 + params = self.serial_protocol.calculate_mosaic_params(selected_widgets) + + # 2. 【新增】通过串口管理器发送 + # 这里的 self.serial_manager 来自于我们在 main.py 中的初始化 + if hasattr(self, 'serial_manager') and self.serial_manager.is_connected: + start = params['start'] + v_count = params['v_count'] + h_count = params['h_count'] + + # 计算拼接id + ScreenLabel.big_pic_id += 1 + + # 发送全部取消选择指令 + cancel_all_bytes = self.serial_protocol.cancel_all() + + if not cancel_all_bytes: + logger.error("指令封装失败,数据无效") + return + + # send_command 方法来自 SerialManager 类 + success = self.serial_manager.send_command(cancel_all_bytes) + + if success: + logger.info(f"✅ 取消选择指令发送成功") + else: + logger.error("取消选择指令发送失败") + + # 生成拼接指令 + mosaic_packet_bytes = self.serial_protocol.build_mosaic_packet(start, v_count, h_count, ScreenLabel.big_pic_id) + + if not mosaic_packet_bytes: + logger.error("指令封装失败,数据无效") + return + + # send_command 方法来自 SerialManager 类 + success = self.serial_manager.send_command(mosaic_packet_bytes) + + if success: + logger.info(f"✅ 拼接指令发送成功") + else: + logger.error("拼接指令发送失败") + + # 发送设置窗口信号源改变标志指令 + change_signal_bytes = self.serial_protocol.change_signal(ScreenLabel.big_pic_id) + + if not change_signal_bytes: + logger.error("指令封装失败,数据无效") + return + + # send_command 方法来自 SerialManager 类 + success = self.serial_manager.send_command(change_signal_bytes) + + if success: + logger.info(f"✅ 设置窗口信号源改变标志指令发送成功") + else: + logger.error("设置窗口信号源改变标志指令发送失败") + + # 发送信号源切换指令 + send_signal_bytes = self.serial_protocol.send_signal(source['type'], source['id']) + + if not send_signal_bytes: + logger.error("指令封装失败,数据无效") + return + + # send_command 方法来自 SerialManager 类 + success = self.serial_manager.send_command(send_signal_bytes) + + if success: + logger.info(f"✅ 设置窗口信号源改变标志指令发送成功") + else: + logger.error("设置窗口信号源改变标志指令发送失败") + + if success: + for i in ScreenLabel.click_labels: + ScreenLabel.split_solution[i - 1]["status"] = 2 + ScreenLabel.split_solution[i - 1]['source_type'] = source['type'] + ScreenLabel.split_solution[i - 1]['source_id'] = source['id'] + ScreenLabel.split_solution[i - 1]["big_pic_id"] = ScreenLabel.big_pic_id + + label = self.screen_labels[i - 1] # 数组下标从0开始 + label.setText(f"Screen {i}\n[分组_{ScreenLabel.big_pic_id}]\n{source['name']}\n(已分配)") + label.update_style(2) + + # 清空选中的屏幕控件 + ScreenLabel.click_labels = [] + + logger.info(f"✅ 指令发送成功") + else: + logger.error("指令发送失败") - print(select_all_bytes) + else: + logger.error("指令发送失败,未连接数据大屏") + return - if not select_all_bytes: - logger.error("指令封装失败,数据无效") - return + elif ScreenLabel.split_solution[screen_id - 1]['status'] == 2 and ScreenLabel.split_solution[screen_id - 1]['big_pic_id'] > 0: + """ + 解除屏幕拼接状态并切换信号 + """ + # 获取选中的屏幕控件 + ids = [] - # send_command 方法来自 SerialManager 类 - success = self.serial_manager.send_command(select_all_bytes) + for i in ScreenLabel.split_solution: + if i['status'] == 2 and i['big_pic_id'] == ScreenLabel.split_solution[screen_id - 1]['big_pic_id']: + ids.append(i['screen_id']) - if success: + selected_widgets = [self.screen_labels[value - 1] for value in ids] - click_labels = ScreenLabel.click_labels + # 生成拼接指令参数 + params = self.serial_protocol.calculate_mosaic_params(selected_widgets) - # 1. 查找对应的 Label 对象 - # label = self.screen_labels[screen_id - 1] # 数组下标从0开始 + # 2. 【新增】通过串口管理器发送 + # 这里的 self.serial_manager 来自于我们在 main.py 中的初始化 + if hasattr(self, 'serial_manager') and self.serial_manager.is_connected: + start = params['start'] + v_count = params['v_count'] + h_count = params['h_count'] - if click_labels and screen_id in click_labels: - for click_label in click_labels: - label = self.screen_labels[click_label - 1] # 数组下标从0开始 + # 计算拼接id + big_pic_id = ScreenLabel.split_solution[screen_id - 1]['big_pic_id'] - # 2. 修改显示文字和样式 - label.setText(f"Screen {click_label}\n[{source['name']}]\n(已分配)") - label.setStyleSheet(""" - background-color: rgb(0, 255, 127); /* 绿色背景 */ - color: black; - border: 2px solid white; - font-weight: bold; - """) + # 发送全部取消选择指令 + cancel_all_bytes = self.serial_protocol.cancel_all() - # 3. 这里调用核心控制器发送指令 - # self.matrix.send_layout_command([screen_id], source) + if not cancel_all_bytes: + logger.error("指令封装失败,数据无效") + return - if ScreenLabel.split_solution[click_label - 1]: - ScreenLabel.split_solution[click_label - 1]['status'] = 2 - ScreenLabel.split_solution[click_label - 1]['source_type'] = source['type'] - ScreenLabel.split_solution[click_label - 1]['source_id'] = source['id'] + # send_command 方法来自 SerialManager 类 + success = self.serial_manager.send_command(cancel_all_bytes) - print(f"【指令】屏幕 {click_label} 已切换至 {source['name']}") - else: - label = self.screen_labels[screen_id - 1] # 数组下标从0开始 + if success: + logger.info(f"✅ 取消选择指令发送成功") + else: + logger.error("取消选择指令发送失败") - # 2. 修改显示文字和样式 - label.setText(f"Screen {screen_id}\n[{source['name']}]\n(已分配)") - label.setStyleSheet(""" - background-color: rgb(0, 255, 127); /* 绿色背景 */ - color: black; - border: 2px solid white; - font-weight: bold; - """) + # 生成拼接指令 + mosaic_packet_bytes = self.serial_protocol.build_mosaic_packet(start, v_count, h_count, big_pic_id, 0x11) - # 3. 这里调用核心控制器发送指令 - # self.matrix.send_layout_command([screen_id], source) + if not mosaic_packet_bytes: + logger.error("指令封装失败,数据无效") + return - if ScreenLabel.split_solution[screen_id - 1]: - ScreenLabel.split_solution[screen_id - 1]['status'] = 2 - ScreenLabel.split_solution[screen_id - 1]['source_type'] = source['type'] - ScreenLabel.split_solution[screen_id - 1]['source_id'] = source['id'] + # send_command 方法来自 SerialManager 类 + success = self.serial_manager.send_command(mosaic_packet_bytes) - print(f"【指令】屏幕 {screen_id} 已切换至 {source['name']}") + if success: + logger.info(f"✅ 拼接指令发送成功") + else: + logger.error("拼接指令发送失败") - ScreenLabel.click_labels = [] + # 发送设置窗口信号源改变标志指令 + cancel_signal_bytes = self.serial_protocol.cancel_signal() - logger.info(f"✅ 指令发送成功") - else: - logger.error("指令发送失败") + if not cancel_signal_bytes: + logger.error("指令封装失败,数据无效") + return - else: - logger.error("指令发送失败,未连接数据大屏") - return + # send_command 方法来自 SerialManager 类 + success = self.serial_manager.send_command(cancel_signal_bytes) - # # 3. 这里调用核心控制器发送指令 - # # self.matrix.send_layout_command([screen_id], source) - # print(f"【指令】屏幕 {screen_id} 已切换至 {source}") + if success: + logger.info(f"✅ 取消设置窗口信号源改变标志指令发送成功") + else: + logger.error("取消设置窗口信号源改变标志指令发送失败") - def set_split(self, screen_id): - """ - 设置屏幕控件的拼接状态 - """ - # self.update_style() + # 发送信号源切换指令 + send_signal_bytes = self.serial_protocol.build_mosaic_packet(start, v_count, h_count, big_pic_id, 0x12) - # 获取选中的屏幕控件 - selected_widgets = [self.screen_labels[value - 1] for value in ScreenLabel.click_labels] + if not send_signal_bytes: + logger.error("指令封装失败,数据无效") + return - # 生成拼接指令参数 - params = self.serial_protocol.calculate_mosaic_params(selected_widgets) + # send_command 方法来自 SerialManager 类 + success = self.serial_manager.send_command(send_signal_bytes) - # 2. 【新增】通过串口管理器发送 - # 这里的 self.serial_manager 来自于我们在 main.py 中的初始化 - if hasattr(self, 'serial_manager') and self.serial_manager.is_connected: - start = params['start'] - v_count = params['v_count'] - h_count = params['h_count'] + if success: + logger.info(f"✅ 还原拼接前的信号源指令发送成功") + else: + logger.error("还原拼接前的信号源指令发送失败") - # 计算拼接id - big_pic_id = 1 - for i in range(start, start + v_count * h_count): - if ScreenLabel.split_solution[i]['status'] == 3 and ScreenLabel.split_solution[i]['big_pic_id'] > 0: - big_pic_id += 1 - break + if success: + for i in ids: + ScreenLabel.split_solution[i - 1]["status"] = 0 + ScreenLabel.split_solution[i - 1]['source_type'] = 0 + ScreenLabel.split_solution[i - 1]['source_id'] = 0 + ScreenLabel.split_solution[i - 1]["big_pic_id"] = 0 - # 发送全部取消选择指令 - cancel_all_bytes = self.serial_protocol.cancel_all() + label = self.screen_labels[i - 1] # 数组下标从0开始 + label.setText(f"Screen {i}\n(空闲)") + label.update_style(0) - if not cancel_all_bytes: - logger.error("指令封装失败,数据无效") - return + # 清空选中的屏幕控件 + ScreenLabel.click_labels = [] - # send_command 方法来自 SerialManager 类 - success = self.serial_manager.send_command(cancel_all_bytes) + logger.info(f"✅ 指令发送成功") + else: + logger.error("指令发送失败") - if success: - logger.info(f"✅ 取消选择指令发送成功") + else: + logger.error("指令发送失败,未连接数据大屏") + return else: - logger.error("取消选择指令发送失败") + label = self.screen_labels[screen_id - 1] # 数组下标从0开始 + + # 2. 修改显示文字和样式 + label.setText(f"Screen {screen_id}\n[{source['name']}]\n(已分配)") + label.update_style(2) + + # 3. 这里调用核心控制器发送指令 + # self.matrix.send_layout_command([screen_id], source) + + if ScreenLabel.split_solution[screen_id - 1]: + ScreenLabel.split_solution[screen_id - 1]['status'] = 2 + ScreenLabel.split_solution[screen_id - 1]['source_type'] = source['type'] + ScreenLabel.split_solution[screen_id - 1]['source_id'] = source['id'] + + print(f"【指令】屏幕 {screen_id} 已切换至 {source['name']}") - # 生成拼接指令 - mosaic_packet_bytes = self.serial_protocol.build_mosaic_packet(start, v_count, h_count, big_pic_id) + ScreenLabel.click_labels = [] - if not mosaic_packet_bytes: + + + + + + # send_command 方法来自 SerialManager 类 + select_all_bytes = self.serial_protocol.select_all() + + print(select_all_bytes) + + if not select_all_bytes: logger.error("指令封装失败,数据无效") return # send_command 方法来自 SerialManager 类 - success = self.serial_manager.send_command(mosaic_packet_bytes) + success = self.serial_manager.send_command(select_all_bytes) if success: - ScreenLabel.click_labels = [] - ids = [w.screen_id for w in selected_widgets] - for i in ids: - ScreenLabel.split_solution[i - 1]["status"] = 3 - ScreenLabel.split_solution[i - 1]["big_pic_id"] = big_pic_id + # 1. 查找对应的 Label 对象 + # label = self.screen_labels[screen_id - 1] # 数组下标从0开始 + - label = self.screen_labels[i - 1] # 数组下标从0开始 - label.setText(f"Screen {i}\n[分组_{big_pic_id}]\n(已拼接)") logger.info(f"✅ 指令发送成功") else: