|
|
import threading |
|
|
import time |
|
|
import logging |
|
|
import json |
|
|
from src.iot.thing import Thing, Parameter |
|
|
from src.iot.thing_manager import ThingManager |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class CountdownTimer(Thing): |
|
|
""" |
|
|
一个用于延迟执行命令的倒计时器设备。 |
|
|
""" |
|
|
DEFAULT_DELAY = 5 |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__("CountdownTimer", "一个用于延迟执行命令的倒计时器") |
|
|
|
|
|
self._timers = {} |
|
|
self._next_timer_id = 0 |
|
|
|
|
|
self._lock = threading.Lock() |
|
|
|
|
|
print(f"[虚拟设备] 倒计时器设备初始化完成") |
|
|
|
|
|
|
|
|
self.add_method( |
|
|
"StartCountdown", |
|
|
"启动一个倒计时,结束后执行指定命令", |
|
|
[ |
|
|
Parameter("command", "要执行的IoT命令 (JSON格式字符串)", "string", required=True), |
|
|
Parameter("delay", "延迟时间(秒),默认为5秒", "integer", required=False) |
|
|
], |
|
|
lambda params: self._start_countdown(params) |
|
|
) |
|
|
self.add_method( |
|
|
"CancelCountdown", |
|
|
"取消指定的倒计时", |
|
|
[Parameter("timer_id", "要取消的计时器ID", "integer", required=True)], |
|
|
lambda params: self._cancel_countdown(params) |
|
|
) |
|
|
|
|
|
def _execute_command(self, timer_id, command_str): |
|
|
"""计时器到期时执行的回调函数。""" |
|
|
|
|
|
with self._lock: |
|
|
if timer_id not in self._timers: |
|
|
|
|
|
logger.info(f"倒计时 {timer_id} 在执行前已被取消或不存在。") |
|
|
return |
|
|
del self._timers[timer_id] |
|
|
|
|
|
logger.info(f"倒计时 {timer_id} 结束,准备执行命令: {command_str}") |
|
|
|
|
|
try: |
|
|
|
|
|
command_dict = json.loads(command_str) |
|
|
|
|
|
thing_manager = ThingManager.get_instance() |
|
|
result = thing_manager.invoke(command_dict) |
|
|
logger.info(f"倒计时 {timer_id} 执行命令 '{command_str}' 结果: {result}") |
|
|
except json.JSONDecodeError: |
|
|
logger.error(f"倒计时 {timer_id}: 命令 '{command_str}' 格式错误,无法解析JSON。") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"倒计时 {timer_id} 执行命令 '{command_str}' 时出错: {e}", exc_info=True) |
|
|
|
|
|
|
|
|
def _start_countdown(self, params_dict): |
|
|
"""处理 StartCountdown 方法调用。注意: params 现在是 Parameter 对象的字典""" |
|
|
|
|
|
command_param = params_dict.get("command") |
|
|
delay_param = params_dict.get("delay") |
|
|
|
|
|
command_str = command_param.get_value() if command_param else None |
|
|
|
|
|
delay = delay_param.get_value() if delay_param and delay_param.get_value() is not None else self.DEFAULT_DELAY |
|
|
|
|
|
if not command_str: |
|
|
logger.error("启动倒计时失败:缺少 'command' 参数值。") |
|
|
return {"status": "error", "message": "缺少 'command' 参数值"} |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
if not isinstance(delay, int): |
|
|
delay = int(delay) |
|
|
|
|
|
if delay <= 0: |
|
|
logger.warning(f"提供的延迟时间 {delay} 无效,使用默认值 {self.DEFAULT_DELAY} 秒。") |
|
|
delay = self.DEFAULT_DELAY |
|
|
except (ValueError, TypeError): |
|
|
logger.warning(f"提供的延迟时间 '{delay}' 无效,使用默认值 {self.DEFAULT_DELAY} 秒。") |
|
|
delay = self.DEFAULT_DELAY |
|
|
|
|
|
|
|
|
try: |
|
|
json.loads(command_str) |
|
|
except json.JSONDecodeError: |
|
|
logger.error(f"启动倒计时失败:命令格式错误,无法解析JSON: {command_str}") |
|
|
return {"status": "error", "message": f"命令格式错误,无法解析JSON: {command_str}"} |
|
|
|
|
|
with self._lock: |
|
|
timer_id = self._next_timer_id |
|
|
self._next_timer_id += 1 |
|
|
timer = threading.Timer(delay, self._execute_command, args=[timer_id, command_str]) |
|
|
self._timers[timer_id] = timer |
|
|
timer.start() |
|
|
|
|
|
logger.info(f"启动倒计时 {timer_id},将在 {delay} 秒后执行命令: {command_str}") |
|
|
return {"status": "success", "message": f"倒计时 {timer_id} 已启动,将在 {delay} 秒后执行。", "timer_id": timer_id} |
|
|
|
|
|
def _cancel_countdown(self, params_dict): |
|
|
"""处理 CancelCountdown 方法调用。注意: params 现在是 Parameter 对象的字典""" |
|
|
timer_id_param = params_dict.get("timer_id") |
|
|
timer_id = timer_id_param.get_value() if timer_id_param else None |
|
|
|
|
|
if timer_id is None: |
|
|
logger.error("取消倒计时失败:缺少 'timer_id' 参数值。") |
|
|
return {"status": "error", "message": "缺少 'timer_id' 参数值"} |
|
|
|
|
|
try: |
|
|
|
|
|
if not isinstance(timer_id, int): |
|
|
timer_id = int(timer_id) |
|
|
except (ValueError, TypeError): |
|
|
logger.error(f"取消倒计时失败:无效的 'timer_id' {timer_id}。") |
|
|
return {"status": "error", "message": f"无效的 'timer_id': {timer_id}"} |
|
|
|
|
|
with self._lock: |
|
|
if timer_id in self._timers: |
|
|
timer = self._timers.pop(timer_id) |
|
|
timer.cancel() |
|
|
logger.info(f"倒计时 {timer_id} 已成功取消。") |
|
|
return {"status": "success", "message": f"倒计时 {timer_id} 已取消"} |
|
|
else: |
|
|
logger.warning(f"尝试取消不存在或已完成的倒计时 {timer_id}。") |
|
|
return {"status": "error", "message": f"找不到ID为 {timer_id} 的活动倒计时"} |
|
|
|
|
|
def cleanup(self): |
|
|
"""在应用程序关闭时清理所有活动的计时器。""" |
|
|
logger.info("正在清理倒计时器...") |
|
|
with self._lock: |
|
|
active_timer_ids = list(self._timers.keys()) |
|
|
for timer_id in active_timer_ids: |
|
|
if timer_id in self._timers: |
|
|
timer = self._timers.pop(timer_id) |
|
|
timer.cancel() |
|
|
logger.info(f"已取消后台计时器 {timer_id}") |
|
|
logger.info("倒计时器清理完成。") |
|
|
|
|
|
|
|
|
|