File size: 7,162 Bytes
27e74f3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | import threading
import time
import logging
import json
from src.iot.thing import Thing, Parameter
from src.iot.thing_manager import ThingManager
logger = logging.getLogger(__name__)
class CountdownTimer(Thing):
"""
一个用于延迟执行命令的倒计时器设备。
"""
DEFAULT_DELAY = 5 # seconds
def __init__(self):
super().__init__("CountdownTimer", "一个用于延迟执行命令的倒计时器")
# 使用字典存储活动的计时器,键是 timer_id,值是 threading.Timer 对象
self._timers = {}
self._next_timer_id = 0
# 使用锁来保护对 _timers 和 _next_timer_id 的访问,确保线程安全
self._lock = threading.Lock()
print(f"[虚拟设备] 倒计时器设备初始化完成")
# 定义方法 - 使用 Parameter 对象
self.add_method(
"StartCountdown",
"启动一个倒计时,结束后执行指定命令",
[
Parameter("command", "要执行的IoT命令 (JSON格式字符串)", "string", required=True),
Parameter("delay", "延迟时间(秒),默认为5秒", "integer", required=False) # 使用 required=False 标记可选参数
],
lambda params: self._start_countdown(params)
)
self.add_method(
"CancelCountdown",
"取消指定的倒计时",
[Parameter("timer_id", "要取消的计时器ID", "integer", required=True)],
lambda params: self._cancel_countdown(params)
)
def _execute_command(self, timer_id, command_str):
"""计时器到期时执行的回调函数。"""
# 首先从活动计时器列表中移除自己
with self._lock:
if timer_id not in self._timers:
# 可能已经被取消
logger.info(f"倒计时 {timer_id} 在执行前已被取消或不存在。")
return
del self._timers[timer_id]
logger.info(f"倒计时 {timer_id} 结束,准备执行命令: {command_str}")
try:
# 命令应该是 JSON 格式的字符串,代表一个命令字典
command_dict = json.loads(command_str)
# 获取 ThingManager 单例并执行命令
thing_manager = ThingManager.get_instance()
result = thing_manager.invoke(command_dict)
logger.info(f"倒计时 {timer_id} 执行命令 '{command_str}' 结果: {result}")
except json.JSONDecodeError:
logger.error(f"倒计时 {timer_id}: 命令 '{command_str}' 格式错误,无法解析JSON。")
# 可以选择返回错误状态给调用者,但这在后台线程中较难实现
except Exception as e:
logger.error(f"倒计时 {timer_id} 执行命令 '{command_str}' 时出错: {e}", exc_info=True)
# 同上
def _start_countdown(self, params_dict):
"""处理 StartCountdown 方法调用。注意: params 现在是 Parameter 对象的字典"""
# 从 Parameter 对象字典中获取值
command_param = params_dict.get("command")
delay_param = params_dict.get("delay")
command_str = command_param.get_value() if command_param else None
# 处理可选参数 delay
delay = delay_param.get_value() if delay_param and delay_param.get_value() is not None else self.DEFAULT_DELAY
if not command_str:
logger.error("启动倒计时失败:缺少 'command' 参数值。")
return {"status": "error", "message": "缺少 'command' 参数值"}
# 验证延迟时间
try:
# 确保 delay 是整数类型
if not isinstance(delay, int):
delay = int(delay)
if delay <= 0:
logger.warning(f"提供的延迟时间 {delay} 无效,使用默认值 {self.DEFAULT_DELAY} 秒。")
delay = self.DEFAULT_DELAY
except (ValueError, TypeError):
logger.warning(f"提供的延迟时间 '{delay}' 无效,使用默认值 {self.DEFAULT_DELAY} 秒。")
delay = self.DEFAULT_DELAY
# 尝试解析命令字符串以进行早期验证
try:
json.loads(command_str)
except json.JSONDecodeError:
logger.error(f"启动倒计时失败:命令格式错误,无法解析JSON: {command_str}")
return {"status": "error", "message": f"命令格式错误,无法解析JSON: {command_str}"}
with self._lock:
timer_id = self._next_timer_id
self._next_timer_id += 1
timer = threading.Timer(delay, self._execute_command, args=[timer_id, command_str])
self._timers[timer_id] = timer
timer.start()
logger.info(f"启动倒计时 {timer_id},将在 {delay} 秒后执行命令: {command_str}")
return {"status": "success", "message": f"倒计时 {timer_id} 已启动,将在 {delay} 秒后执行。", "timer_id": timer_id}
def _cancel_countdown(self, params_dict):
"""处理 CancelCountdown 方法调用。注意: params 现在是 Parameter 对象的字典"""
timer_id_param = params_dict.get("timer_id")
timer_id = timer_id_param.get_value() if timer_id_param else None
if timer_id is None:
logger.error("取消倒计时失败:缺少 'timer_id' 参数值。")
return {"status": "error", "message": "缺少 'timer_id' 参数值"}
try:
# 确保 timer_id 是整数
if not isinstance(timer_id, int):
timer_id = int(timer_id)
except (ValueError, TypeError):
logger.error(f"取消倒计时失败:无效的 'timer_id' {timer_id}。")
return {"status": "error", "message": f"无效的 'timer_id': {timer_id}"}
with self._lock:
if timer_id in self._timers:
timer = self._timers.pop(timer_id)
timer.cancel()
logger.info(f"倒计时 {timer_id} 已成功取消。")
return {"status": "success", "message": f"倒计时 {timer_id} 已取消"}
else:
logger.warning(f"尝试取消不存在或已完成的倒计时 {timer_id}。")
return {"status": "error", "message": f"找不到ID为 {timer_id} 的活动倒计时"}
def cleanup(self):
"""在应用程序关闭时清理所有活动的计时器。"""
logger.info("正在清理倒计时器...")
with self._lock:
active_timer_ids = list(self._timers.keys()) # 创建键的副本以安全迭代
for timer_id in active_timer_ids:
if timer_id in self._timers:
timer = self._timers.pop(timer_id)
timer.cancel()
logger.info(f"已取消后台计时器 {timer_id}")
logger.info("倒计时器清理完成。")
# 注意:这个 cleanup 方法需要在应用程序关闭时被显式调用。
# ThingManager 或 Application 类可以负责在 shutdown 过程中调用其管理的 Things 的 cleanup 方法。 |