| import sys |
| import os |
| import logging |
| import threading |
| from pathlib import Path |
| from urllib.parse import urlparse |
|
|
| from PyQt5.QtCore import ( |
| Qt, QTimer, QPropertyAnimation, QRect, |
| QEvent, QObject |
| ) |
| from PyQt5.QtWidgets import ( |
| QApplication, QWidget, QVBoxLayout, |
| QHBoxLayout, QLabel, QPushButton, QSlider, QLineEdit, |
| QComboBox, QCheckBox, QMessageBox, QFrame, |
| QStackedWidget, QTabBar, QStyleOptionSlider, QStyle, |
| QGraphicsOpacityEffect, QSizePolicy, QScrollArea, QGridLayout |
| ) |
| from PyQt5.QtGui import ( |
| QPainter, QColor, QFont, QMouseEvent, QMovie, QBrush, QPen, |
| QLinearGradient, QTransform, QPainterPath |
| ) |
|
|
| from src.utils.config_manager import ConfigManager |
| import queue |
| import time |
| import numpy as np |
| from typing import Optional, Callable |
| from pynput import keyboard as pynput_keyboard |
| from abc import ABCMeta |
| from src.display.base_display import BaseDisplay |
| import json |
|
|
| |
| CONFIG_PATH = Path(__file__).parent.parent.parent / "config" / "config.json" |
|
|
|
|
| def restart_program(): |
| """使用 os.execv 重启当前 Python 程序。""" |
| try: |
| python = sys.executable |
| print(f"Attempting to restart with: {python} {sys.argv}") |
| |
| app = QApplication.instance() |
| if app: |
| app.quit() |
| |
| os.execv(python, [python] + sys.argv) |
| except Exception as e: |
| print(f"重启程序失败: {e}") |
| logging.getLogger("Display").error(f"重启程序失败: {e}", exc_info=True) |
| |
| sys.exit(1) |
|
|
|
|
| |
| class CombinedMeta(type(QObject), ABCMeta): |
| pass |
|
|
|
|
| class GuiDisplay(BaseDisplay, QObject, metaclass=CombinedMeta): |
| def __init__(self): |
| |
| super().__init__() |
| QObject.__init__(self) |
|
|
| |
| self.logger = logging.getLogger("Display") |
| |
| self.app = None |
| self.root = None |
| |
| |
| self.status_label = None |
| self.emotion_label = None |
| self.tts_text_label = None |
| self.volume_scale = None |
| self.manual_btn = None |
| self.abort_btn = None |
| self.auto_btn = None |
| self.mode_btn = None |
| self.mute = None |
| self.stackedWidget = None |
| self.nav_tab_bar = None |
| |
| |
| self.emotion_movie = None |
| |
| self.emotion_effect = None |
| self.emotion_animation = None |
| self.next_emotion_path = None |
| self.is_emotion_animating = False |
| |
| |
| self.volume_label = None |
| self.volume_control_available = False |
| self.volume_controller_failed = False |
| |
| |
| self.mic_visualizer = None |
| self.mic_timer = None |
| self.is_listening = False |
| |
| |
| self.wakeWordEnableSwitch = None |
| self.wakeWordsLineEdit = None |
| self.saveSettingsButton = None |
| |
| self.deviceIdLineEdit = None |
| self.wsProtocolComboBox = None |
| self.wsAddressLineEdit = None |
| self.wsTokenLineEdit = None |
| |
| self.otaProtocolComboBox = None |
| self.otaAddressLineEdit = None |
| |
| self.haProtocolComboBox = None |
| self.ha_server = None |
| self.ha_port = None |
| self.ha_key = None |
| self.Add_ha_devices = None |
|
|
| self.is_muted = False |
| self.pre_mute_volume = self.current_volume |
| |
| |
| self.auto_mode = False |
|
|
| |
| self.button_press_callback = None |
| self.button_release_callback = None |
| self.status_update_callback = None |
| self.text_update_callback = None |
| self.emotion_update_callback = None |
| self.mode_callback = None |
| self.auto_callback = None |
| self.abort_callback = None |
| self.send_text_callback = None |
|
|
| |
| self.update_queue = queue.Queue() |
|
|
| |
| self._running = True |
|
|
| |
| self.keyboard_listener = None |
|
|
| |
| self.last_mouse_pos = None |
| |
| |
| self.update_timer = None |
| self.volume_update_timer = None |
| |
| |
| self.current_effect = None |
| self.current_animation = None |
| self.animation = None |
| self.fade_widget = None |
| self.animated_widget = None |
| |
| |
| self.volume_control_available = (hasattr(self, 'volume_controller') and |
| self.volume_controller is not None) |
| |
| |
| self.get_current_volume() |
|
|
| |
| self.devices_list = [] |
| self.device_labels = {} |
| self.history_title = None |
| self.iot_card = None |
| self.ha_update_timer = None |
| self.device_states = {} |
|
|
| def eventFilter(self, source, event): |
| if source == self.volume_scale and event.type() == QEvent.MouseButtonPress: |
| if event.button() == Qt.LeftButton: |
| slider = self.volume_scale |
| opt = QStyleOptionSlider() |
| slider.initStyleOption(opt) |
| |
| |
| handle_rect = slider.style().subControlRect( |
| QStyle.CC_Slider, opt, QStyle.SC_SliderHandle, slider) |
| groove_rect = slider.style().subControlRect( |
| QStyle.CC_Slider, opt, QStyle.SC_SliderGroove, slider) |
|
|
| |
| if handle_rect.contains(event.pos()): |
| return False |
|
|
| |
| if slider.orientation() == Qt.Horizontal: |
| |
| if (event.pos().x() < groove_rect.left() or |
| event.pos().x() > groove_rect.right()): |
| return False |
| pos = event.pos().x() - groove_rect.left() |
| max_pos = groove_rect.width() |
| else: |
| if (event.pos().y() < groove_rect.top() or |
| event.pos().y() > groove_rect.bottom()): |
| return False |
| pos = groove_rect.bottom() - event.pos().y() |
| max_pos = groove_rect.height() |
|
|
| if max_pos > 0: |
| value_range = slider.maximum() - slider.minimum() |
| |
| new_value = slider.minimum() + round( |
| (value_range * pos) / max_pos) |
| |
| |
| slider.setValue(int(new_value)) |
| |
| return True |
| |
| return super().eventFilter(source, event) |
|
|
| def _setup_navigation(self): |
| """设置导航标签栏 (QTabBar)""" |
| |
| self.nav_tab_bar.addTab("聊天") |
| self.nav_tab_bar.addTab("设备管理") |
| self.nav_tab_bar.addTab("参数配置") |
|
|
| |
| self.nav_tab_bar.currentChanged.connect(self._on_navigation_index_changed) |
|
|
| |
| self.nav_tab_bar.setCurrentIndex(0) |
|
|
| def _on_navigation_index_changed(self, index: int): |
| """处理导航标签变化 (通过索引)""" |
| |
| index_to_routeKey = {0: "mainInterface", 1: "iotInterface", 2: "settingInterface"} |
| routeKey = index_to_routeKey.get(index) |
|
|
| if routeKey is None: |
| self.logger.warning(f"未知的导航索引: {index}") |
| return |
|
|
| target_index = index |
| if target_index == self.stackedWidget.currentIndex(): |
| return |
|
|
| current_widget = self.stackedWidget.currentWidget() |
| self.stackedWidget.setCurrentIndex(target_index) |
| new_widget = self.stackedWidget.currentWidget() |
|
|
| |
| if routeKey == "settingInterface": |
| self._load_settings() |
|
|
| |
| if routeKey == "iotInterface": |
| self._load_iot_devices() |
|
|
| def set_callbacks( |
| self, |
| press_callback: Optional[Callable] = None, |
| release_callback: Optional[Callable] = None, |
| status_callback: Optional[Callable] = None, |
| text_callback: Optional[Callable] = None, |
| emotion_callback: Optional[Callable] = None, |
| mode_callback: Optional[Callable] = None, |
| auto_callback: Optional[Callable] = None, |
| abort_callback: Optional[Callable] = None, |
| send_text_callback: Optional[Callable] = None, |
| ): |
| """设置回调函数""" |
| self.button_press_callback = press_callback |
| self.button_release_callback = release_callback |
| self.status_update_callback = status_callback |
| self.text_update_callback = text_callback |
| self.emotion_update_callback = emotion_callback |
| self.mode_callback = mode_callback |
| self.auto_callback = auto_callback |
| self.abort_callback = abort_callback |
| self.send_text_callback = send_text_callback |
|
|
| def _process_updates(self): |
| """处理更新队列""" |
| if not self._running: |
| return |
| |
| try: |
| while True: |
| try: |
| |
| update_func = self.update_queue.get_nowait() |
| update_func() |
| self.update_queue.task_done() |
| except queue.Empty: |
| break |
| except Exception as e: |
| self.logger.error(f"处理更新队列时发生错误: {e}") |
|
|
| def _on_manual_button_press(self): |
| """手动模式按钮按下事件处理""" |
| try: |
| |
| if self.manual_btn and self.manual_btn.isVisible(): |
| self.manual_btn.setText("松开以停止") |
|
|
| |
| if self.button_press_callback: |
| self.button_press_callback() |
| except Exception as e: |
| self.logger.error(f"按钮按下回调执行失败: {e}") |
|
|
| def _on_manual_button_release(self): |
| """手动模式按钮释放事件处理""" |
| try: |
| |
| if self.manual_btn and self.manual_btn.isVisible(): |
| self.manual_btn.setText("按住后说话") |
|
|
| |
| if self.button_release_callback: |
| self.button_release_callback() |
| except Exception as e: |
| self.logger.error(f"按钮释放回调执行失败: {e}") |
|
|
| def _on_auto_button_click(self): |
| """自动模式按钮点击事件处理""" |
| try: |
| if self.auto_callback: |
| self.auto_callback() |
| except Exception as e: |
| self.logger.error(f"自动模式按钮回调执行失败: {e}") |
|
|
| def _on_abort_button_click(self): |
| """处理中止按钮点击事件""" |
| if self.abort_callback: |
| self.abort_callback() |
|
|
| def _on_mode_button_click(self): |
| """对话模式切换按钮点击事件""" |
| try: |
| |
| if self.mode_callback: |
| |
| if not self.mode_callback(not self.auto_mode): |
| return |
|
|
| |
| self.auto_mode = not self.auto_mode |
|
|
| |
| if self.auto_mode: |
| |
| self.update_mode_button_status("自动对话") |
|
|
| |
| self.update_queue.put(self._switch_to_auto_mode) |
| else: |
| |
| self.update_mode_button_status("手动对话") |
|
|
| |
| self.update_queue.put(self._switch_to_manual_mode) |
|
|
| except Exception as e: |
| self.logger.error(f"模式切换按钮回调执行失败: {e}") |
|
|
| def _switch_to_auto_mode(self): |
| """切换到自动模式的UI更新""" |
| if self.manual_btn and self.auto_btn: |
| self.manual_btn.hide() |
| self.auto_btn.show() |
|
|
| def _switch_to_manual_mode(self): |
| """切换到手动模式的UI更新""" |
| if self.manual_btn and self.auto_btn: |
| self.auto_btn.hide() |
| self.manual_btn.show() |
|
|
| def update_status(self, status: str): |
| """更新状态文本 (只更新主状态)""" |
| full_status_text = f"状态: {status}" |
| self.update_queue.put(lambda: self._safe_update_label(self.status_label, full_status_text)) |
| |
| |
| if "聆听中" in status: |
| self.update_queue.put(self._start_mic_visualization) |
| elif "待命" in status or "说话中" in status: |
| self.update_queue.put(self._stop_mic_visualization) |
|
|
| def update_text(self, text: str): |
| """更新TTS文本""" |
| self.update_queue.put(lambda: self._safe_update_label(self.tts_text_label, text)) |
|
|
| def update_emotion(self, emotion_path: str): |
| """更新表情,使用GIF动画显示""" |
| |
| abs_path = os.path.abspath(emotion_path) |
| |
| |
| if hasattr(self, 'last_emotion_path') and self.last_emotion_path == abs_path: |
| return |
| |
| |
| self.last_emotion_path = abs_path |
| self.logger.info(f"设置表情GIF: {abs_path}") |
| self.update_queue.put(lambda: self._set_emotion_gif(self.emotion_label, abs_path)) |
| |
| def _set_emotion_gif(self, label, gif_path): |
| """设置GIF动画到标签,带淡入淡出效果""" |
| if not label or self.root.isHidden(): |
| return |
| |
| try: |
| |
| if not os.path.exists(gif_path): |
| self.logger.error(f"GIF文件不存在: {gif_path}") |
| label.setText("😊") |
| return |
| |
| |
| if (self.emotion_movie and |
| getattr(self.emotion_movie, '_gif_path', None) == gif_path and |
| self.emotion_movie.state() == QMovie.Running): |
| return |
| |
| |
| if self.is_emotion_animating: |
| self.next_emotion_path = gif_path |
| return |
| |
| self.logger.info(f"加载GIF文件: {gif_path}") |
| |
| |
| self.is_emotion_animating = True |
| |
| |
| if self.emotion_movie and label.movie() == self.emotion_movie: |
| |
| if not self.emotion_effect: |
| self.emotion_effect = QGraphicsOpacityEffect(label) |
| label.setGraphicsEffect(self.emotion_effect) |
| self.emotion_effect.setOpacity(1.0) |
| |
| |
| self.emotion_animation = QPropertyAnimation(self.emotion_effect, b"opacity") |
| self.emotion_animation.setDuration(180) |
| self.emotion_animation.setStartValue(1.0) |
| self.emotion_animation.setEndValue(0.25) |
| |
| |
| def on_fade_out_finished(): |
| try: |
| |
| if self.emotion_movie: |
| self.emotion_movie.stop() |
| |
| |
| self._set_new_emotion_gif(label, gif_path) |
| except Exception as e: |
| self.logger.error(f"淡出动画完成后设置GIF失败: {e}") |
| self.is_emotion_animating = False |
| |
| |
| self.emotion_animation.finished.connect(on_fade_out_finished) |
| |
| |
| self.emotion_animation.start() |
| else: |
| |
| self._set_new_emotion_gif(label, gif_path) |
| |
| except Exception as e: |
| self.logger.error(f"更新表情GIF动画失败: {e}") |
| |
| try: |
| label.setText("😊") |
| except Exception: |
| pass |
| self.is_emotion_animating = False |
| |
| def _set_new_emotion_gif(self, label, gif_path): |
| """设置新的GIF动画并执行淡入效果""" |
| try: |
| |
| movie = QMovie(gif_path) |
| if not movie.isValid(): |
| self.logger.error(f"无效的GIF文件: {gif_path}") |
| label.setText("😊") |
| self.is_emotion_animating = False |
| return |
| |
| |
| movie.setCacheMode(QMovie.CacheAll) |
| |
| |
| movie._gif_path = gif_path |
| |
| |
| movie.error.connect(lambda: self.logger.error(f"GIF播放错误: {movie.lastError()}")) |
| |
| |
| self.emotion_movie = movie |
| |
| |
| label.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) |
| label.setAlignment(Qt.AlignCenter) |
| |
| |
| label.setMovie(movie) |
| |
| |
| movie.setSpeed(105) |
| |
| |
| if self.emotion_effect: |
| self.emotion_effect.setOpacity(0.0) |
| else: |
| self.emotion_effect = QGraphicsOpacityEffect(label) |
| label.setGraphicsEffect(self.emotion_effect) |
| self.emotion_effect.setOpacity(0.0) |
| |
| |
| movie.start() |
| |
| |
| self.emotion_animation = QPropertyAnimation(self.emotion_effect, b"opacity") |
| self.emotion_animation.setDuration(180) |
| self.emotion_animation.setStartValue(0.25) |
| self.emotion_animation.setEndValue(1.0) |
| |
| |
| def on_fade_in_finished(): |
| self.is_emotion_animating = False |
| |
| if self.next_emotion_path: |
| next_path = self.next_emotion_path |
| self.next_emotion_path = None |
| self._set_emotion_gif(label, next_path) |
| |
| |
| self.emotion_animation.finished.connect(on_fade_in_finished) |
| |
| |
| self.emotion_animation.start() |
| |
| except Exception as e: |
| self.logger.error(f"设置新的GIF动画失败: {e}") |
| self.is_emotion_animating = False |
| |
| try: |
| label.setText("😊") |
| except Exception: |
| pass |
|
|
| def _safe_update_label(self, label, text): |
| """安全地更新标签文本""" |
| if label and not self.root.isHidden(): |
| try: |
| label.setText(text) |
| except RuntimeError as e: |
| self.logger.error(f"更新标签失败: {e}") |
|
|
| def start_update_threads(self): |
| """启动更新线程""" |
| |
| self.last_emotion_path = None |
|
|
| def update_loop(): |
| while self._running: |
| try: |
| |
| if self.status_update_callback: |
| status = self.status_update_callback() |
| if status: |
| self.update_status(status) |
|
|
| |
| if self.text_update_callback: |
| text = self.text_update_callback() |
| if text: |
| self.update_text(text) |
|
|
| |
| if self.emotion_update_callback: |
| emotion = self.emotion_update_callback() |
| if emotion: |
| |
| self.update_emotion(emotion) |
|
|
| except Exception as e: |
| self.logger.error(f"更新失败: {e}") |
| time.sleep(0.1) |
|
|
| threading.Thread(target=update_loop, daemon=True).start() |
|
|
| def on_close(self): |
| """关闭窗口处理""" |
| self._running = False |
| if self.update_timer: |
| self.update_timer.stop() |
| if self.mic_timer: |
| self.mic_timer.stop() |
| if self.root: |
| self.root.close() |
| self.stop_keyboard_listener() |
|
|
| def start(self): |
| """启动GUI""" |
| try: |
| |
| self.app = QApplication.instance() |
| if self.app is None: |
| self.app = QApplication(sys.argv) |
| |
| |
| default_font = QFont("ASLantTermuxFont Mono", 12) |
| self.app.setFont(default_font) |
| |
| |
| from PyQt5 import uic |
| self.root = QWidget() |
| ui_path = Path(__file__).parent / "gui_display.ui" |
| if not ui_path.exists(): |
| self.logger.error(f"UI文件不存在: {ui_path}") |
| raise FileNotFoundError(f"UI文件不存在: {ui_path}") |
| |
| uic.loadUi(str(ui_path), self.root) |
|
|
| |
| self.status_label = self.root.findChild(QLabel, "status_label") |
| self.emotion_label = self.root.findChild(QLabel, "emotion_label") |
| self.tts_text_label = self.root.findChild(QLabel, "tts_text_label") |
| self.manual_btn = self.root.findChild(QPushButton, "manual_btn") |
| self.abort_btn = self.root.findChild(QPushButton, "abort_btn") |
| self.auto_btn = self.root.findChild(QPushButton, "auto_btn") |
| self.mode_btn = self.root.findChild(QPushButton, "mode_btn") |
| |
| |
| self.iot_card = self.root.findChild(QFrame, "iotPage") |
| if self.iot_card is None: |
| |
| self.iot_card = self.root.findChild(QFrame, "iot_card") |
| if self.iot_card is None: |
| |
| self.stackedWidget = self.root.findChild(QStackedWidget, "stackedWidget") |
| if self.stackedWidget and self.stackedWidget.count() > 1: |
| self.iot_card = self.stackedWidget.widget(1) |
| self.logger.info(f"使用 stackedWidget 的第2个页面作为 iot_card: {self.iot_card}") |
| else: |
| self.logger.warning("无法找到 iot_card,IOT设备功能将不可用") |
| else: |
| self.logger.info(f"找到 iot_card: {self.iot_card}") |
| |
| |
| self.audio_control_stack = self.root.findChild(QStackedWidget, "audio_control_stack") |
| self.volume_page = self.root.findChild(QWidget, "volume_page") |
| self.mic_page = self.root.findChild(QWidget, "mic_page") |
| |
| |
| self.volume_scale = self.root.findChild(QSlider, "volume_scale") |
| self.mute = self.root.findChild(QPushButton, "mute") |
| |
| if self.mute: |
| self.mute.setCheckable(True) |
| self.mute.clicked.connect(self._on_mute_click) |
| |
| |
| self.volume_label = self.root.findChild(QLabel, "volume_label") |
| if not self.volume_label and self.volume_scale: |
| |
| volume_layout = self.root.findChild(QHBoxLayout, "volume_layout") |
| if volume_layout: |
| self.volume_label = QLabel(f"{self.current_volume}%") |
| self.volume_label.setObjectName("volume_label") |
| self.volume_label.setMinimumWidth(40) |
| self.volume_label.setAlignment(Qt.AlignCenter) |
| volume_layout.addWidget(self.volume_label) |
| |
| |
| self.mic_visualizer_card = self.root.findChild(QFrame, "mic_visualizer_card") |
| self.mic_visualizer_widget = self.root.findChild(QWidget, "mic_visualizer_widget") |
| |
| if self.mic_visualizer_widget: |
| |
| self.mic_visualizer = MicrophoneVisualizer(self.mic_visualizer_widget) |
| |
| |
| layout = QVBoxLayout(self.mic_visualizer_widget) |
| layout.setContentsMargins(0, 0, 0, 0) |
| layout.addWidget(self.mic_visualizer) |
| |
| |
| self.mic_timer = QTimer() |
| self.mic_timer.timeout.connect(self._update_mic_visualizer) |
| |
| |
| volume_control_working = self.volume_control_available and not self.volume_controller_failed |
| if not volume_control_working: |
| self.logger.warning("系统不支持音量控制或控制失败,音量控制功能已禁用") |
| |
| if self.volume_scale: |
| self.volume_scale.setEnabled(False) |
| if self.mute: |
| self.mute.setEnabled(False) |
| if self.volume_label: |
| self.volume_label.setText("不可用") |
| else: |
| |
| if self.volume_scale: |
| self.volume_scale.setRange(0, 100) |
| self.volume_scale.setValue(self.current_volume) |
| self.volume_scale.valueChanged.connect(self._on_volume_change) |
| self.volume_scale.installEventFilter(self) |
| |
| if self.volume_label: |
| self.volume_label.setText(f"{self.current_volume}%") |
| |
| |
| self.wakeWordEnableSwitch = self.root.findChild(QCheckBox, "wakeWordEnableSwitch") |
| self.wakeWordsLineEdit = self.root.findChild(QLineEdit, "wakeWordsLineEdit") |
| self.saveSettingsButton = self.root.findChild(QPushButton, "saveSettingsButton") |
| |
| |
| self.deviceIdLineEdit = self.root.findChild(QLineEdit, "deviceIdLineEdit") |
| self.wsProtocolComboBox = self.root.findChild(QComboBox, "wsProtocolComboBox") |
| self.wsAddressLineEdit = self.root.findChild(QLineEdit, "wsAddressLineEdit") |
| self.wsTokenLineEdit = self.root.findChild(QLineEdit, "wsTokenLineEdit") |
| |
| self.haProtocolComboBox = self.root.findChild(QComboBox, "haProtocolComboBox") |
| self.ha_server = self.root.findChild(QLineEdit, "ha_server") |
| self.ha_port = self.root.findChild(QLineEdit, "ha_port") |
| self.ha_key = self.root.findChild(QLineEdit, "ha_key") |
| self.Add_ha_devices = self.root.findChild(QPushButton, "Add_ha_devices") |
|
|
| |
| self.otaProtocolComboBox = self.root.findChild(QComboBox, "otaProtocolComboBox") |
| self.otaAddressLineEdit = self.root.findChild(QLineEdit, "otaAddressLineEdit") |
|
|
| |
| if self.wsProtocolComboBox: |
| |
| self.wsProtocolComboBox.clear() |
| self.wsProtocolComboBox.addItems(["wss://", "ws://"]) |
| |
| |
| if self.otaProtocolComboBox: |
| self.otaProtocolComboBox.clear() |
| self.otaProtocolComboBox.addItems(["https://", "http://"]) |
|
|
| |
| if self.haProtocolComboBox: |
| self.haProtocolComboBox.clear() |
| self.haProtocolComboBox.addItems(["http://", "https://"]) |
|
|
| |
| self.stackedWidget = self.root.findChild(QStackedWidget, "stackedWidget") |
| self.nav_tab_bar = self.root.findChild(QTabBar, "nav_tab_bar") |
|
|
| |
| self._setup_navigation() |
|
|
| |
| if self.manual_btn: |
| self.manual_btn.pressed.connect(self._on_manual_button_press) |
| self.manual_btn.released.connect(self._on_manual_button_release) |
| if self.abort_btn: |
| self.abort_btn.clicked.connect(self._on_abort_button_click) |
| if self.auto_btn: |
| self.auto_btn.clicked.connect(self._on_auto_button_click) |
| |
| self.auto_btn.hide() |
| if self.mode_btn: |
| self.mode_btn.clicked.connect(self._on_mode_button_click) |
| |
| |
| self.text_input = self.root.findChild(QLineEdit, "text_input") |
| self.send_btn = self.root.findChild(QPushButton, "send_btn") |
| if self.text_input and self.send_btn: |
| self.send_btn.clicked.connect(self._on_send_button_click) |
| |
| self.text_input.returnPressed.connect(self._on_send_button_click) |
|
|
| |
| if self.saveSettingsButton: |
| self.saveSettingsButton.clicked.connect(self._save_settings) |
|
|
| |
| if self.Add_ha_devices: |
| self.Add_ha_devices.clicked.connect(self._on_add_ha_devices_click) |
|
|
| |
| self.root.mousePressEvent = self.mousePressEvent |
| self.root.mouseReleaseEvent = self.mouseReleaseEvent |
|
|
| |
| self.start_keyboard_listener() |
| |
| |
| self.start_update_threads() |
| |
| |
| self.update_timer = QTimer() |
| self.update_timer.timeout.connect(self._process_updates) |
| self.update_timer.start(100) |
| |
| |
| self.logger.info("开始启动GUI主循环") |
| self.root.show() |
| |
| |
| except Exception as e: |
| self.logger.error(f"GUI启动失败: {e}", exc_info=True) |
| |
| print(f"GUI启动失败: {e},请尝试使用CLI模式") |
| raise |
|
|
| def update_mode_button_status(self, text: str): |
| """更新模式按钮状态""" |
| self.update_queue.put(lambda: self._safe_update_button(self.mode_btn, text)) |
|
|
| def update_button_status(self, text: str): |
| """更新按钮状态 - 保留此方法以满足抽象基类要求""" |
| |
| if self.auto_mode: |
| self.update_queue.put(lambda: self._safe_update_button(self.auto_btn, text)) |
| else: |
| |
| |
| pass |
| |
| def _safe_update_button(self, button, text): |
| """安全地更新按钮文本""" |
| if button and not self.root.isHidden(): |
| try: |
| button.setText(text) |
| except RuntimeError as e: |
| self.logger.error(f"更新按钮失败: {e}") |
|
|
| def _on_volume_change(self, value): |
| """处理音量滑块变化,使用节流""" |
|
|
| def update_volume(): |
| self.update_volume(value) |
|
|
| |
| if hasattr(self, "volume_update_timer") and self.volume_update_timer and self.volume_update_timer.isActive(): |
| self.volume_update_timer.stop() |
|
|
| |
| self.volume_update_timer = QTimer() |
| self.volume_update_timer.setSingleShot(True) |
| self.volume_update_timer.timeout.connect(update_volume) |
| self.volume_update_timer.start(300) |
|
|
| def update_volume(self, volume: int): |
| """重写父类的update_volume方法,确保UI同步更新""" |
| |
| if not self.volume_control_available or self.volume_controller_failed: |
| return |
| |
| |
| super().update_volume(volume) |
| |
| |
| if not self.root.isHidden(): |
| try: |
| if self.volume_scale: |
| self.volume_scale.setValue(volume) |
| if self.volume_label: |
| self.volume_label.setText(f"{volume}%") |
| except RuntimeError as e: |
| self.logger.error(f"更新音量UI失败: {e}") |
|
|
| def start_keyboard_listener(self): |
| """启动键盘监听""" |
| try: |
|
|
| def on_press(key): |
| try: |
| |
| if key == pynput_keyboard.Key.f2 and not self.auto_mode: |
| if self.button_press_callback: |
| self.button_press_callback() |
| if self.manual_btn: |
| self.update_queue.put(lambda: self._safe_update_button(self.manual_btn, "松开以停止")) |
|
|
| |
| elif key == pynput_keyboard.Key.f3: |
| if self.abort_callback: |
| self.abort_callback() |
| except Exception as e: |
| self.logger.error(f"键盘事件处理错误: {e}") |
|
|
| def on_release(key): |
| try: |
| |
| if key == pynput_keyboard.Key.f2 and not self.auto_mode: |
| if self.button_release_callback: |
| self.button_release_callback() |
| if self.manual_btn: |
| self.update_queue.put(lambda: self._safe_update_button(self.manual_btn, "按住后说话")) |
| except Exception as e: |
| self.logger.error(f"键盘事件处理错误: {e}") |
|
|
| |
| self.keyboard_listener = pynput_keyboard.Listener( |
| on_press=on_press, on_release=on_release |
| ) |
| self.keyboard_listener.start() |
| self.logger.info("键盘监听器初始化成功") |
| except Exception as e: |
| self.logger.error(f"键盘监听器初始化失败: {e}") |
|
|
| def stop_keyboard_listener(self): |
| """停止键盘监听""" |
| if self.keyboard_listener: |
| try: |
| self.keyboard_listener.stop() |
| self.keyboard_listener = None |
| self.logger.info("键盘监听器已停止") |
| except Exception as e: |
| self.logger.error(f"停止键盘监听器失败: {e}") |
|
|
| def mousePressEvent(self, event: QMouseEvent): |
| """鼠标按下事件处理""" |
| if event.button() == Qt.LeftButton: |
| self.last_mouse_pos = event.pos() |
|
|
| def mouseReleaseEvent(self, event: QMouseEvent): |
| """鼠标释放事件处理 (修改为使用 QTabBar 索引)""" |
| if event.button() == Qt.LeftButton and self.last_mouse_pos is not None: |
| delta = event.pos().x() - self.last_mouse_pos.x() |
| self.last_mouse_pos = None |
|
|
| if abs(delta) > 100: |
| current_index = self.nav_tab_bar.currentIndex() if self.nav_tab_bar else 0 |
| tab_count = self.nav_tab_bar.count() if self.nav_tab_bar else 0 |
|
|
| if delta > 0 and current_index > 0: |
| new_index = current_index - 1 |
| if self.nav_tab_bar: self.nav_tab_bar.setCurrentIndex(new_index) |
| elif delta < 0 and current_index < tab_count - 1: |
| new_index = current_index + 1 |
| if self.nav_tab_bar: self.nav_tab_bar.setCurrentIndex(new_index) |
|
|
| def _on_mute_click(self): |
| """静音按钮点击事件处理 (使用 isChecked 状态)""" |
| try: |
| if not self.volume_control_available or self.volume_controller_failed or not self.mute: |
| return |
|
|
| self.is_muted = self.mute.isChecked() |
|
|
| if self.is_muted: |
| |
| self.pre_mute_volume = self.current_volume |
| self.update_volume(0) |
| self.mute.setText("取消静音") |
| if self.volume_label: |
| self.volume_label.setText("静音") |
| else: |
| |
| self.update_volume(self.pre_mute_volume) |
| self.mute.setText("点击静音") |
| if self.volume_label: |
| self.volume_label.setText(f"{self.pre_mute_volume}%") |
|
|
| except Exception as e: |
| self.logger.error(f"静音按钮点击事件处理失败: {e}") |
|
|
| def _load_settings(self): |
| """加载配置文件并更新设置页面UI (使用ConfigManager)""" |
| try: |
| |
| config_manager = ConfigManager.get_instance() |
| |
| |
| use_wake_word = config_manager.get_config("WAKE_WORD_OPTIONS.USE_WAKE_WORD", False) |
| wake_words = config_manager.get_config("WAKE_WORD_OPTIONS.WAKE_WORDS", []) |
| |
| if self.wakeWordEnableSwitch: |
| self.wakeWordEnableSwitch.setChecked(use_wake_word) |
|
|
| if self.wakeWordsLineEdit: |
| self.wakeWordsLineEdit.setText(", ".join(wake_words)) |
|
|
| |
| device_id = config_manager.get_config("SYSTEM_OPTIONS.DEVICE_ID", "") |
| websocket_url = config_manager.get_config("SYSTEM_OPTIONS.NETWORK.WEBSOCKET_URL", "") |
| websocket_token = config_manager.get_config("SYSTEM_OPTIONS.NETWORK.WEBSOCKET_ACCESS_TOKEN", "") |
| ota_url = config_manager.get_config("SYSTEM_OPTIONS.NETWORK.OTA_VERSION_URL", "") |
|
|
| if self.deviceIdLineEdit: |
| self.deviceIdLineEdit.setText(device_id) |
|
|
| |
| if websocket_url and self.wsProtocolComboBox and self.wsAddressLineEdit: |
| try: |
| parsed_url = urlparse(websocket_url) |
| protocol = parsed_url.scheme |
| |
| |
| address = parsed_url.netloc + parsed_url.path |
| |
| |
| if address.startswith(f"{protocol}://"): |
| address = address[len(f"{protocol}://"):] |
|
|
| index = self.wsProtocolComboBox.findText(f"{protocol}://", Qt.MatchFixedString) |
| if index >= 0: |
| self.wsProtocolComboBox.setCurrentIndex(index) |
| else: |
| self.logger.warning(f"未知的 WebSocket 协议: {protocol}") |
| self.wsProtocolComboBox.setCurrentIndex(0) |
|
|
| self.wsAddressLineEdit.setText(address) |
| except Exception as e: |
| self.logger.error(f"解析 WebSocket URL 时出错: {websocket_url} - {e}") |
| self.wsProtocolComboBox.setCurrentIndex(0) |
| self.wsAddressLineEdit.clear() |
|
|
| if self.wsTokenLineEdit: |
| self.wsTokenLineEdit.setText(websocket_token) |
|
|
| |
| if ota_url and self.otaProtocolComboBox and self.otaAddressLineEdit: |
| try: |
| parsed_url = urlparse(ota_url) |
| protocol = parsed_url.scheme |
| |
| |
| address = parsed_url.netloc + parsed_url.path |
| |
| |
| if address.startswith(f"{protocol}://"): |
| address = address[len(f"{protocol}://"):] |
| |
| if protocol == "https": |
| self.otaProtocolComboBox.setCurrentIndex(0) |
| elif protocol == "http": |
| self.otaProtocolComboBox.setCurrentIndex(1) |
| else: |
| self.logger.warning(f"未知的OTA协议: {protocol}") |
| self.otaProtocolComboBox.setCurrentIndex(0) |
| |
| self.otaAddressLineEdit.setText(address) |
| except Exception as e: |
| self.logger.error(f"解析OTA URL时出错: {ota_url} - {e}") |
| self.otaProtocolComboBox.setCurrentIndex(0) |
| self.otaAddressLineEdit.clear() |
|
|
| |
| ha_options = config_manager.get_config("HOME_ASSISTANT", {}) |
| ha_url = ha_options.get("URL", "") |
| ha_token = ha_options.get("TOKEN", "") |
| |
| |
| if ha_url and self.haProtocolComboBox and self.ha_server: |
| try: |
| parsed_url = urlparse(ha_url) |
| protocol = parsed_url.scheme |
| port = parsed_url.port |
| |
| address = parsed_url.netloc |
| if ":" in address: |
| address = address.split(":")[0] |
| |
| |
| if protocol == "https": |
| self.haProtocolComboBox.setCurrentIndex(1) |
| else: |
| self.haProtocolComboBox.setCurrentIndex(0) |
| |
| |
| self.ha_server.setText(address) |
| |
| |
| if port and self.ha_port: |
| self.ha_port.setText(str(port)) |
| except Exception as e: |
| self.logger.error(f"解析Home Assistant URL时出错: {ha_url} - {e}") |
| |
| self.haProtocolComboBox.setCurrentIndex(0) |
| self.ha_server.clear() |
| |
| |
| if self.ha_key: |
| self.ha_key.setText(ha_token) |
|
|
| except Exception as e: |
| self.logger.error(f"加载配置文件时出错: {e}", exc_info=True) |
| QMessageBox.critical(self.root, "错误", f"加载设置失败: {e}") |
|
|
| def _save_settings(self): |
| """保存设置页面的更改到配置文件 (使用ConfigManager)""" |
| try: |
| |
| config_manager = ConfigManager.get_instance() |
| |
| |
| |
| use_wake_word = self.wakeWordEnableSwitch.isChecked() if self.wakeWordEnableSwitch else False |
| wake_words_text = self.wakeWordsLineEdit.text() if self.wakeWordsLineEdit else "" |
| wake_words = [word.strip() for word in wake_words_text.split(',') if word.strip()] |
| |
| |
| new_device_id = self.deviceIdLineEdit.text() if self.deviceIdLineEdit else "" |
| selected_protocol_text = self.wsProtocolComboBox.currentText() if self.wsProtocolComboBox else "wss://" |
| selected_protocol = selected_protocol_text.replace("://", "") |
| new_ws_address = self.wsAddressLineEdit.text() if self.wsAddressLineEdit else "" |
| new_ws_token = self.wsTokenLineEdit.text() if self.wsTokenLineEdit else "" |
| |
| |
| selected_ota_protocol_text = self.otaProtocolComboBox.currentText() if self.otaProtocolComboBox else "https://" |
| selected_ota_protocol = selected_ota_protocol_text.replace("://", "") |
| new_ota_address = self.otaAddressLineEdit.text() if self.otaAddressLineEdit else "" |
| |
| |
| if new_ws_address.startswith('/'): |
| new_ws_address = new_ws_address[1:] |
| |
| |
| new_websocket_url = f"{selected_protocol}://{new_ws_address}" |
| if new_websocket_url and not new_websocket_url.endswith('/'): |
| new_websocket_url += '/' |
| |
| |
| new_ota_url = f"{selected_ota_protocol}://{new_ota_address}" |
| if new_ota_url and not new_ota_url.endswith('/'): |
| new_ota_url += '/' |
| |
| |
| ha_protocol = self.haProtocolComboBox.currentText().replace("://", "") if self.haProtocolComboBox else "http" |
| ha_server = self.ha_server.text() if self.ha_server else "" |
| ha_port = self.ha_port.text() if self.ha_port else "" |
| ha_key = self.ha_key.text() if self.ha_key else "" |
| |
| |
| if ha_server: |
| ha_url = f"{ha_protocol}://{ha_server}" |
| if ha_port: |
| ha_url += f":{ha_port}" |
| else: |
| ha_url = "" |
| |
| |
| current_config = config_manager._config.copy() |
| |
| |
| try: |
| import json |
| config_path = Path(__file__).parent.parent.parent / "config" / "config.json" |
| if config_path.exists(): |
| with open(config_path, 'r', encoding='utf-8') as f: |
| disk_config = json.load(f) |
| |
| |
| if ("HOME_ASSISTANT" in disk_config and |
| "DEVICES" in disk_config["HOME_ASSISTANT"]): |
| |
| latest_devices = disk_config["HOME_ASSISTANT"]["DEVICES"] |
| self.logger.info(f"从配置文件读取了 {len(latest_devices)} 个设备") |
| else: |
| latest_devices = [] |
| else: |
| latest_devices = [] |
| except Exception as e: |
| self.logger.error(f"读取配置文件中的设备列表失败: {e}") |
| |
| if "HOME_ASSISTANT" in current_config and "DEVICES" in current_config["HOME_ASSISTANT"]: |
| latest_devices = current_config["HOME_ASSISTANT"]["DEVICES"] |
| else: |
| latest_devices = [] |
| |
| |
| |
| if "WAKE_WORD_OPTIONS" not in current_config: |
| current_config["WAKE_WORD_OPTIONS"] = {} |
| current_config["WAKE_WORD_OPTIONS"]["USE_WAKE_WORD"] = use_wake_word |
| current_config["WAKE_WORD_OPTIONS"]["WAKE_WORDS"] = wake_words |
| |
| |
| if "SYSTEM_OPTIONS" not in current_config: |
| current_config["SYSTEM_OPTIONS"] = {} |
| current_config["SYSTEM_OPTIONS"]["DEVICE_ID"] = new_device_id |
| |
| if "NETWORK" not in current_config["SYSTEM_OPTIONS"]: |
| current_config["SYSTEM_OPTIONS"]["NETWORK"] = {} |
| current_config["SYSTEM_OPTIONS"]["NETWORK"]["WEBSOCKET_URL"] = new_websocket_url |
| current_config["SYSTEM_OPTIONS"]["NETWORK"]["WEBSOCKET_ACCESS_TOKEN"] = new_ws_token |
| current_config["SYSTEM_OPTIONS"]["NETWORK"]["OTA_VERSION_URL"] = new_ota_url |
| |
| |
| if "HOME_ASSISTANT" not in current_config: |
| current_config["HOME_ASSISTANT"] = {} |
| current_config["HOME_ASSISTANT"]["URL"] = ha_url |
| current_config["HOME_ASSISTANT"]["TOKEN"] = ha_key |
| |
| |
| current_config["HOME_ASSISTANT"]["DEVICES"] = latest_devices |
| |
| |
| save_success = config_manager._save_config(current_config) |
| |
| if save_success: |
| self.logger.info("设置已成功保存到 config.json") |
| reply = QMessageBox.question(self.root, "保存成功", |
| "设置已保存。\n部分设置需要重启应用程序才能生效。\n\n是否立即重启?", |
| QMessageBox.Yes | QMessageBox.No, QMessageBox.No) |
|
|
| if reply == QMessageBox.Yes: |
| self.logger.info("用户选择重启应用程序。") |
| restart_program() |
| else: |
| raise Exception("保存配置文件失败") |
| |
| except Exception as e: |
| self.logger.error(f"保存设置时发生未知错误: {e}", exc_info=True) |
| QMessageBox.critical(self.root, "错误", f"保存设置失败: {e}") |
|
|
| def _on_add_ha_devices_click(self): |
| """处理添加Home Assistant设备按钮点击事件""" |
| try: |
| self.logger.info("启动Home Assistant设备管理器...") |
| |
| |
| current_dir = os.path.dirname(os.path.abspath(__file__)) |
| |
| project_root = os.path.dirname(os.path.dirname(current_dir)) |
| |
| |
| script_path = os.path.join(project_root, "scripts", "ha_device_manager_ui.py") |
| |
| if not os.path.exists(script_path): |
| self.logger.error(f"设备管理器脚本不存在: {script_path}") |
| QMessageBox.critical(self.root, "错误", "设备管理器脚本不存在") |
| return |
| |
| |
| cmd = [sys.executable, script_path] |
| |
| |
| import subprocess |
| subprocess.Popen(cmd) |
| |
| except Exception as e: |
| self.logger.error(f"启动Home Assistant设备管理器失败: {e}", exc_info=True) |
| QMessageBox.critical(self.root, "错误", f"启动设备管理器失败: {e}") |
|
|
| def _update_mic_visualizer(self): |
| """更新麦克风可视化""" |
| if not self.is_listening or not self.mic_visualizer: |
| return |
| |
| try: |
| |
| volume_level = self._get_current_mic_level() |
| |
| |
| self.mic_visualizer.set_volume(min(1.0, volume_level)) |
| except Exception as e: |
| self.logger.error(f"更新麦克风可视化失败: {e}") |
| |
| def _get_current_mic_level(self): |
| """获取当前麦克风音量级别""" |
| try: |
| from src.application import Application |
| app = Application.get_instance() |
| if app and hasattr(app, 'audio_codec') and app.audio_codec: |
| |
| if hasattr(app.audio_codec, 'input_stream') and app.audio_codec.input_stream: |
| |
| try: |
| |
| available = app.audio_codec.input_stream.get_read_available() |
| if available > 0: |
| |
| chunk_size = min(1024, available) |
| audio_data = app.audio_codec.input_stream.read( |
| chunk_size, |
| exception_on_overflow=False |
| ) |
| |
| |
| audio_array = np.frombuffer(audio_data, dtype=np.int16) |
| |
| |
| |
| |
| rms = np.sqrt(np.mean(np.square(audio_array.astype(np.float32)))) |
| |
| |
| volume = min(1.0, rms / 32768 * 10) |
| |
| |
| if hasattr(self, '_last_volume'): |
| |
| self._last_volume = self._last_volume * 0.7 + volume * 0.3 |
| else: |
| self._last_volume = volume |
| |
| return self._last_volume |
| except Exception as e: |
| self.logger.debug(f"读取麦克风数据失败: {e}") |
| except Exception as e: |
| self.logger.debug(f"获取麦克风音量失败: {e}") |
| |
| |
| if hasattr(self, '_last_volume'): |
| |
| self._last_volume *= 0.9 |
| return self._last_volume |
| else: |
| self._last_volume = 0.0 |
| return self._last_volume |
|
|
| def _start_mic_visualization(self): |
| """开始麦克风可视化""" |
| if self.mic_visualizer and self.mic_timer and self.audio_control_stack: |
| self.is_listening = True |
| |
| |
| self.audio_control_stack.setCurrentWidget(self.mic_page) |
| |
| |
| if not self.mic_timer.isActive(): |
| self.mic_timer.start(50) |
| |
| def _stop_mic_visualization(self): |
| """停止麦克风可视化""" |
| self.is_listening = False |
| |
| |
| if self.mic_timer and self.mic_timer.isActive(): |
| self.mic_timer.stop() |
| |
| if self.mic_visualizer: |
| self.mic_visualizer.set_volume(0.0) |
| |
| if hasattr(self, '_last_volume'): |
| self._last_volume = 0.0 |
|
|
| |
| if self.audio_control_stack: |
| self.audio_control_stack.setCurrentWidget(self.volume_page) |
|
|
| def _on_send_button_click(self): |
| """处理发送文本按钮点击事件""" |
| if not self.text_input or not self.send_text_callback: |
| return |
| |
| text = self.text_input.text().strip() |
| if not text: |
| return |
| |
| |
| self.text_input.clear() |
| |
| |
| from src.application import Application |
| app = Application.get_instance() |
| if app and app.loop: |
| import asyncio |
| asyncio.run_coroutine_threadsafe( |
| self.send_text_callback(text), |
| app.loop |
| ) |
| else: |
| self.logger.error("应用程序实例或事件循环不可用") |
|
|
| def _load_iot_devices(self): |
| """加载并显示Home Assistant设备列表""" |
| try: |
| |
| if hasattr(self, 'devices_list') and self.devices_list: |
| for widget in self.devices_list: |
| widget.deleteLater() |
| self.devices_list = [] |
| |
| |
| self.device_labels = {} |
| |
| |
| if self.iot_card: |
| |
| title_text = "" |
| if self.history_title: |
| title_text = self.history_title.text() |
| |
| |
| self.history_title = None |
| |
| |
| old_layout = self.iot_card.layout() |
| if old_layout: |
| |
| while old_layout.count(): |
| item = old_layout.takeAt(0) |
| widget = item.widget() |
| if widget: |
| widget.deleteLater() |
| |
| |
| new_layout = old_layout |
| else: |
| |
| new_layout = QVBoxLayout() |
| self.iot_card.setLayout(new_layout) |
| |
| |
| new_layout.setContentsMargins(2, 2, 2, 2) |
| new_layout.setSpacing(2) |
| |
| |
| self.history_title = QLabel(title_text) |
| self.history_title.setFont(QFont(self.app.font().family(), 12)) |
| self.history_title.setAlignment(Qt.AlignCenter) |
| self.history_title.setContentsMargins(5, 2, 0, 2) |
| self.history_title.setMaximumHeight(25) |
| new_layout.addWidget(self.history_title) |
| |
| |
| try: |
| with open(CONFIG_PATH, 'r', encoding='utf-8') as f: |
| config_data = json.load(f) |
| |
| devices = config_data.get("HOME_ASSISTANT", {}).get("DEVICES", []) |
| |
| |
| self.history_title.setText(f"已连接设备 ({len(devices)})") |
| |
| |
| scroll_area = QScrollArea() |
| scroll_area.setWidgetResizable(True) |
| scroll_area.setFrameShape(QFrame.NoFrame) |
| scroll_area.setStyleSheet("background: transparent;") |
| |
| |
| container = QWidget() |
| container.setStyleSheet("background: transparent;") |
| |
| |
| grid_layout = QGridLayout(container) |
| grid_layout.setContentsMargins(3, 3, 3, 3) |
| grid_layout.setSpacing(8) |
| grid_layout.setAlignment(Qt.AlignTop) |
| |
| |
| cards_per_row = 3 |
| |
| |
| for i, device in enumerate(devices): |
| entity_id = device.get('entity_id', '') |
| friendly_name = device.get('friendly_name', '') |
| |
| |
| location = friendly_name |
| device_name = "" |
| if ',' in friendly_name: |
| parts = friendly_name.split(',', 1) |
| location = parts[0].strip() |
| device_name = parts[1].strip() |
| |
| |
| device_card = QFrame() |
| device_card.setMinimumHeight(90) |
| device_card.setMaximumHeight(150) |
| device_card.setMinimumWidth(200) |
| device_card.setProperty("entity_id", entity_id) |
| |
| device_card.setStyleSheet(""" |
| QFrame { |
| border-radius: 5px; |
| background-color: rgba(255, 255, 255, 0.7); |
| border: none; |
| } |
| """) |
| |
| card_layout = QVBoxLayout(device_card) |
| card_layout.setContentsMargins(10, 8, 10, 8) |
| card_layout.setSpacing(2) |
| |
| |
| device_name_label = QLabel(f"<b>{device_name}</b>") |
| device_name_label.setFont(QFont(self.app.font().family(), 14)) |
| device_name_label.setWordWrap(True) |
| device_name_label.setMinimumHeight(20) |
| device_name_label.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Minimum) |
| card_layout.addWidget(device_name_label) |
| |
| |
| location_label = QLabel(f"{location}") |
| location_label.setFont(QFont(self.app.font().family(), 12)) |
| location_label.setStyleSheet("color: #666666;") |
| card_layout.addWidget(location_label) |
| |
| |
| line = QFrame() |
| line.setFrameShape(QFrame.HLine) |
| line.setFrameShadow(QFrame.Sunken) |
| line.setStyleSheet("background-color: #E0E0E0;") |
| line.setMaximumHeight(1) |
| card_layout.addWidget(line) |
| |
| |
| state_text = "未知" |
| if "light" in entity_id: |
| state_text = "关闭" |
| status_display = f"状态: {state_text}" |
| elif "sensor" in entity_id: |
| if "temperature" in entity_id: |
| state_text = "0℃" |
| status_display = state_text |
| elif "humidity" in entity_id: |
| state_text = "0%" |
| status_display = state_text |
| else: |
| state_text = "正常" |
| status_display = f"状态: {state_text}" |
| elif "switch" in entity_id: |
| state_text = "关闭" |
| status_display = f"状态: {state_text}" |
| elif "button" in entity_id: |
| state_text = "可用" |
| status_display = f"状态: {state_text}" |
| else: |
| status_display = state_text |
| |
| |
| state_label = QLabel(status_display) |
| state_label.setFont(QFont(self.app.font().family(), 14)) |
| state_label.setStyleSheet("color: #2196F3; border: none;") |
| card_layout.addWidget(state_label) |
| |
| |
| self.device_labels[entity_id] = state_label |
| |
| |
| row = i // cards_per_row |
| col = i % cards_per_row |
| |
| |
| grid_layout.addWidget(device_card, row, col) |
| |
| |
| self.devices_list.append(device_card) |
| |
| |
| container.setLayout(grid_layout) |
| scroll_area.setWidget(container) |
| |
| |
| new_layout.addWidget(scroll_area) |
| |
| |
| scroll_area.setStyleSheet(""" |
| QScrollArea { |
| border: none; |
| background-color: transparent; |
| } |
| QScrollBar:vertical { |
| border: none; |
| background-color: #F5F5F5; |
| width: 8px; |
| border-radius: 4px; |
| } |
| QScrollBar::handle:vertical { |
| background-color: #BDBDBD; |
| border-radius: 4px; |
| } |
| QScrollBar::add-line:vertical, QScrollBar::sub-line:vertical { |
| height: 0px; |
| } |
| """) |
| |
| |
| if self.ha_update_timer and self.ha_update_timer.isActive(): |
| self.ha_update_timer.stop() |
| |
| |
| self.ha_update_timer = QTimer() |
| self.ha_update_timer.timeout.connect(self._update_device_states) |
| self.ha_update_timer.start(1000) |
| |
| |
| self._update_device_states() |
| |
| except Exception as e: |
| |
| self.logger.error(f"读取设备配置失败: {e}") |
| self.history_title = QLabel("加载设备配置失败") |
| self.history_title.setFont(QFont(self.app.font().family(), 14, QFont.Bold)) |
| self.history_title.setAlignment(Qt.AlignCenter) |
| new_layout.addWidget(self.history_title) |
| |
| error_label = QLabel(f"错误信息: {str(e)}") |
| error_label.setWordWrap(True) |
| error_label.setStyleSheet("color: red;") |
| new_layout.addWidget(error_label) |
| |
| except Exception as e: |
| self.logger.error(f"加载IOT设备失败: {e}", exc_info=True) |
| try: |
| |
| old_layout = self.iot_card.layout() |
| |
| |
| if old_layout: |
| while old_layout.count(): |
| item = old_layout.takeAt(0) |
| widget = item.widget() |
| if widget: |
| widget.deleteLater() |
| |
| |
| new_layout = old_layout |
| else: |
| |
| new_layout = QVBoxLayout() |
| self.iot_card.setLayout(new_layout) |
| |
| self.history_title = QLabel("加载设备失败") |
| self.history_title.setFont(QFont(self.app.font().family(), 14, QFont.Bold)) |
| self.history_title.setAlignment(Qt.AlignCenter) |
| new_layout.addWidget(self.history_title) |
| |
| error_label = QLabel(f"错误信息: {str(e)}") |
| error_label.setWordWrap(True) |
| error_label.setStyleSheet("color: red;") |
| new_layout.addWidget(error_label) |
| |
| except Exception as e2: |
| self.logger.error(f"恢复界面失败: {e2}", exc_info=True) |
|
|
| def _update_device_states(self): |
| """更新Home Assistant设备状态""" |
| |
| if not self.stackedWidget or self.stackedWidget.currentIndex() != 1: |
| return |
| |
| |
| try: |
| with open(CONFIG_PATH, 'r', encoding='utf-8') as f: |
| config_data = json.load(f) |
| |
| ha_options = config_data.get("HOME_ASSISTANT", {}) |
| ha_url = ha_options.get("URL", "") |
| ha_token = ha_options.get("TOKEN", "") |
| |
| if not ha_url or not ha_token: |
| self.logger.warning("Home Assistant URL或Token未配置,无法更新设备状态") |
| return |
| |
| |
| for entity_id, label in self.device_labels.items(): |
| threading.Thread( |
| target=self._fetch_device_state, |
| args=(ha_url, ha_token, entity_id, label), |
| daemon=True |
| ).start() |
| |
| except Exception as e: |
| self.logger.error(f"更新Home Assistant设备状态失败: {e}", exc_info=True) |
| |
| def _fetch_device_state(self, ha_url, ha_token, entity_id, label): |
| """获取单个设备的状态""" |
| import requests |
| |
| try: |
| |
| api_url = f"{ha_url}/api/states/{entity_id}" |
| headers = { |
| "Authorization": f"Bearer {ha_token}", |
| "Content-Type": "application/json" |
| } |
| |
| |
| response = requests.get(api_url, headers=headers, timeout=5) |
| |
| if response.status_code == 200: |
| state_data = response.json() |
| state = state_data.get("state", "unknown") |
| |
| |
| self.device_states[entity_id] = state |
| |
| |
| self._update_device_ui(entity_id, state, label) |
| else: |
| self.logger.warning(f"获取设备状态失败: {entity_id}, 状态码: {response.status_code}") |
| |
| except requests.RequestException as e: |
| self.logger.error(f"请求Home Assistant API失败: {e}") |
| except Exception as e: |
| self.logger.error(f"处理设备状态时出错: {e}") |
| |
| def _update_device_ui(self, entity_id, state, label): |
| """更新设备UI显示""" |
| |
| self.update_queue.put(lambda: self._safe_update_device_label(entity_id, state, label)) |
| |
| def _safe_update_device_label(self, entity_id, state, label): |
| """安全地更新设备状态标签""" |
| if not label or self.root.isHidden(): |
| return |
| |
| try: |
| display_state = state |
| |
| |
| if "light" in entity_id or "switch" in entity_id: |
| if state == "on": |
| display_state = "状态: 开启" |
| label.setStyleSheet("color: #4CAF50; border: none;") |
| else: |
| display_state = "状态: 关闭" |
| label.setStyleSheet("color: #9E9E9E; border: none;") |
| elif "temperature" in entity_id: |
| try: |
| temp = float(state) |
| display_state = f"{temp:.1f}℃" |
| label.setStyleSheet("color: #FF9800; border: none;") |
| except ValueError: |
| display_state = state |
| elif "humidity" in entity_id: |
| try: |
| humidity = float(state) |
| display_state = f"{humidity:.0f}%" |
| label.setStyleSheet("color: #03A9F4; border: none;") |
| except ValueError: |
| display_state = state |
| elif "battery" in entity_id: |
| try: |
| battery = float(state) |
| display_state = f"{battery:.0f}%" |
| |
| if battery < 20: |
| label.setStyleSheet("color: #F44336; border: none;") |
| else: |
| label.setStyleSheet("color: #4CAF50; border: none;") |
| except ValueError: |
| display_state = state |
| else: |
| display_state = f"状态: {state}" |
| label.setStyleSheet("color: #2196F3; border: none;") |
| |
| |
| label.setText(f"{display_state}") |
| except RuntimeError as e: |
| self.logger.error(f"更新设备状态标签失败: {e}") |
|
|
| class MicrophoneVisualizer(QFrame): |
| """麦克风音量可视化组件 - 波形显示版""" |
| |
| def __init__(self, parent=None): |
| super().__init__(parent) |
| self.setMinimumHeight(50) |
| self.setFrameShape(QFrame.NoFrame) |
| |
| |
| self.current_volume = 0.0 |
| self.target_volume = 0.0 |
| |
| |
| self.history_max = 30 |
| self.volume_history = [0.0] * self.history_max |
| |
| |
| self.animation_timer = QTimer() |
| self.animation_timer.timeout.connect(self._update_animation) |
| self.animation_timer.start(16) |
| |
| |
| self.min_color = QColor(80, 150, 255) |
| self.max_color = QColor(255, 100, 100) |
| self.current_color = self.min_color.name() |
| |
| |
| self.current_status = "安静" |
| self.target_status = "安静" |
| self.status_hold_count = 0 |
| self.status_threshold = 5 |
| |
| |
| self.setStyleSheet("background-color: transparent;") |
| |
| def set_volume(self, volume): |
| """设置当前音量,范围0.0-1.0""" |
| |
| volume = max(0.0, min(1.0, volume)) |
| self.target_volume = volume |
| |
| |
| self.volume_history.append(volume) |
| if len(self.volume_history) > self.history_max: |
| self.volume_history.pop(0) |
| |
| |
| volume_percent = int(volume * 100) |
| |
| |
| if volume_percent < 5: |
| new_status = "静音" |
| elif volume_percent < 20: |
| new_status = "安静" |
| elif volume_percent < 50: |
| new_status = "正常" |
| elif volume_percent < 75: |
| new_status = "较大" |
| else: |
| new_status = "很大" |
| |
| |
| if new_status == self.target_status: |
| |
| self.status_hold_count += 1 |
| else: |
| |
| self.target_status = new_status |
| self.status_hold_count = 0 |
| |
| |
| if self.status_hold_count >= self.status_threshold: |
| self.current_status = self.target_status |
| |
| self.update() |
| |
| def _update_animation(self): |
| """更新动画效果""" |
| |
| self.current_volume += (self.target_volume - self.current_volume) * 0.3 |
|
|
| |
| r = int(self.min_color.red() + (self.max_color.red() - self.min_color.red()) * self.current_volume) |
| g = int(self.min_color.green() + (self.max_color.green() - self.min_color.green()) * self.current_volume) |
| b = int(self.min_color.blue() + (self.max_color.blue() - self.min_color.blue()) * self.current_volume) |
| self.current_color = QColor(r, g, b).name() |
|
|
| self.update() |
| |
| def paintEvent(self, event): |
| """绘制事件""" |
| super().paintEvent(event) |
| |
| painter = QPainter(self) |
| painter.setRenderHint(QPainter.Antialiasing) |
| |
| try: |
| |
| rect = self.rect() |
| |
| |
| self._draw_waveform(painter, rect) |
| |
| |
| small_font = painter.font() |
| small_font.setPointSize(10) |
| painter.setFont(small_font) |
| painter.setPen(QColor(100, 100, 100)) |
| |
| |
| status_rect = QRect(rect.left(), rect.bottom() - 20, rect.width(), 20) |
| |
| |
| status_text = f"声音: {self.current_status}" |
| |
| painter.drawText(status_rect, Qt.AlignCenter, status_text) |
| except Exception as e: |
| self.logger.error(f"绘制波形图失败: {e}") if hasattr(self, 'logger') else None |
| finally: |
| painter.end() |
| |
| def _draw_waveform(self, painter, rect): |
| """绘制波形图""" |
| |
| if len(self.volume_history) < 2: |
| return |
| |
| |
| wave_rect = QRect(rect.left() + 10, rect.top() + 10, |
| rect.width() - 20, rect.height() - 40) |
| |
| |
| bg_color = QColor(240, 240, 240, 30) |
| painter.setPen(Qt.NoPen) |
| painter.setBrush(QBrush(bg_color)) |
| painter.drawRoundedRect(wave_rect, 5, 5) |
| |
| |
| wave_pen = QPen(QColor(self.current_color)) |
| wave_pen.setWidth(2) |
| painter.setPen(wave_pen) |
| |
| |
| history_len = len(self.volume_history) |
| point_interval = wave_rect.width() / (history_len - 1) |
| |
| |
| path = QPainterPath() |
| |
| |
| start_x = wave_rect.left() |
| mid_y = wave_rect.top() + wave_rect.height() / 2 |
| |
| |
| amplitude_factor = 0.8 |
| min_amplitude = 0.1 |
| |
| |
| vol = self.volume_history[0] |
| amp = max(min_amplitude, vol) * amplitude_factor |
| start_y = mid_y - amp * wave_rect.height() / 2 |
| |
| path.moveTo(start_x, start_y) |
| |
| |
| for i in range(1, history_len): |
| x = start_x + i * point_interval |
| |
| |
| vol = self.volume_history[i] |
| |
| |
| amp = max(min_amplitude, vol) * amplitude_factor |
| |
| |
| wave_phase = i / 2.0 |
| sine_factor = 0.08 * amp |
| sine_wave = sine_factor * np.sin(wave_phase) |
| |
| y = mid_y - (amp * wave_rect.height() / 2 + sine_wave * wave_rect.height()) |
| |
| |
| if i > 1: |
| |
| ctrl_x = start_x + (i - 0.5) * point_interval |
| prev_vol = self.volume_history[i-1] |
| prev_amp = max(min_amplitude, prev_vol) * amplitude_factor |
| prev_sine = sine_factor * np.sin((i-1) / 2.0) |
| ctrl_y = mid_y - (prev_amp * wave_rect.height() / 2 + prev_sine * wave_rect.height()) |
| path.quadTo(ctrl_x, ctrl_y, x, y) |
| else: |
| |
| path.lineTo(x, y) |
| |
| |
| painter.drawPath(path) |
| |
| |
| |
| gradient = QLinearGradient( |
| wave_rect.left(), wave_rect.top() + wave_rect.height(), |
| wave_rect.left(), wave_rect.top() |
| ) |
| |
| |
| gradient.setColorAt(0, QColor(self.current_color).lighter(140)) |
| gradient.setColorAt(0.5, QColor(self.current_color)) |
| gradient.setColorAt(1, QColor(self.current_color).darker(140)) |
| |
| |
| painter.save() |
| |
| |
| reflect_path = QPainterPath(path) |
| |
| transform = QTransform() |
| transform.translate(0, wave_rect.height() / 4) |
| reflect_path = transform.map(reflect_path) |
| |
| |
| reflect_pen = QPen() |
| reflect_pen.setWidth(1) |
| reflect_pen.setColor(QColor(self.current_color).lighter(160)) |
| painter.setPen(reflect_pen) |
| |
| |
| painter.setOpacity(0.3) |
| |
| |
| painter.drawPath(reflect_path) |
| |
| |
| painter.restore() |
| |
| |
| if self.current_volume > 0.1: |
| percent_text = f"{int(self.current_volume * 100)}%" |
| painter.setPen(QColor(self.current_color).darker(120)) |
| |
| |
| font = painter.font() |
| font.setPointSize(8 + int(self.current_volume * 4)) |
| font.setBold(True) |
| painter.setFont(font) |
| |
| |
| right_edge = wave_rect.right() - 40 |
| y_position = mid_y - amp * wave_rect.height() / 3 |
| |
| |
| |
| painter.drawText(int(right_edge), int(y_position), percent_text) |