File size: 7,162 Bytes
27e74f3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import threading
import time
import logging
import json
from src.iot.thing import Thing, Parameter
from src.iot.thing_manager import ThingManager

logger = logging.getLogger(__name__)

class CountdownTimer(Thing):
    """
    一个用于延迟执行命令的倒计时器设备。
    """
    DEFAULT_DELAY = 5 # seconds

    def __init__(self):
        super().__init__("CountdownTimer", "一个用于延迟执行命令的倒计时器")
        # 使用字典存储活动的计时器,键是 timer_id,值是 threading.Timer 对象
        self._timers = {}
        self._next_timer_id = 0
        # 使用锁来保护对 _timers 和 _next_timer_id 的访问,确保线程安全
        self._lock = threading.Lock()

        print(f"[虚拟设备] 倒计时器设备初始化完成")

        # 定义方法 - 使用 Parameter 对象
        self.add_method(
            "StartCountdown",
            "启动一个倒计时,结束后执行指定命令",
            [
                Parameter("command", "要执行的IoT命令 (JSON格式字符串)", "string", required=True),
                Parameter("delay", "延迟时间(秒),默认为5秒", "integer", required=False) # 使用 required=False 标记可选参数
            ],
            lambda params: self._start_countdown(params)
        )
        self.add_method(
            "CancelCountdown",
            "取消指定的倒计时",
            [Parameter("timer_id", "要取消的计时器ID", "integer", required=True)],
            lambda params: self._cancel_countdown(params)
        )

    def _execute_command(self, timer_id, command_str):
        """计时器到期时执行的回调函数。"""
        # 首先从活动计时器列表中移除自己
        with self._lock:
            if timer_id not in self._timers:
                # 可能已经被取消
                logger.info(f"倒计时 {timer_id} 在执行前已被取消或不存在。")
                return
            del self._timers[timer_id]

        logger.info(f"倒计时 {timer_id} 结束,准备执行命令: {command_str}")

        try:
            # 命令应该是 JSON 格式的字符串,代表一个命令字典
            command_dict = json.loads(command_str)
            # 获取 ThingManager 单例并执行命令
            thing_manager = ThingManager.get_instance()
            result = thing_manager.invoke(command_dict)
            logger.info(f"倒计时 {timer_id} 执行命令 '{command_str}' 结果: {result}")
        except json.JSONDecodeError:
            logger.error(f"倒计时 {timer_id}: 命令 '{command_str}' 格式错误,无法解析JSON。")
            # 可以选择返回错误状态给调用者,但这在后台线程中较难实现
        except Exception as e:
            logger.error(f"倒计时 {timer_id} 执行命令 '{command_str}' 时出错: {e}", exc_info=True)
            # 同上

    def _start_countdown(self, params_dict):
        """处理 StartCountdown 方法调用。注意: params 现在是 Parameter 对象的字典"""
        # 从 Parameter 对象字典中获取值
        command_param = params_dict.get("command")
        delay_param = params_dict.get("delay")

        command_str = command_param.get_value() if command_param else None
        # 处理可选参数 delay
        delay = delay_param.get_value() if delay_param and delay_param.get_value() is not None else self.DEFAULT_DELAY

        if not command_str:
            logger.error("启动倒计时失败:缺少 'command' 参数值。")
            return {"status": "error", "message": "缺少 'command' 参数值"}

        # 验证延迟时间
        try:
            # 确保 delay 是整数类型
            if not isinstance(delay, int):
                delay = int(delay)

            if delay <= 0:
                logger.warning(f"提供的延迟时间 {delay} 无效,使用默认值 {self.DEFAULT_DELAY} 秒。")
                delay = self.DEFAULT_DELAY
        except (ValueError, TypeError):
            logger.warning(f"提供的延迟时间 '{delay}' 无效,使用默认值 {self.DEFAULT_DELAY} 秒。")
            delay = self.DEFAULT_DELAY

        # 尝试解析命令字符串以进行早期验证
        try:
            json.loads(command_str)
        except json.JSONDecodeError:
             logger.error(f"启动倒计时失败:命令格式错误,无法解析JSON: {command_str}")
             return {"status": "error", "message": f"命令格式错误,无法解析JSON: {command_str}"}

        with self._lock:
            timer_id = self._next_timer_id
            self._next_timer_id += 1
            timer = threading.Timer(delay, self._execute_command, args=[timer_id, command_str])
            self._timers[timer_id] = timer
            timer.start()

        logger.info(f"启动倒计时 {timer_id},将在 {delay} 秒后执行命令: {command_str}")
        return {"status": "success", "message": f"倒计时 {timer_id} 已启动,将在 {delay} 秒后执行。", "timer_id": timer_id}

    def _cancel_countdown(self, params_dict):
        """处理 CancelCountdown 方法调用。注意: params 现在是 Parameter 对象的字典"""
        timer_id_param = params_dict.get("timer_id")
        timer_id = timer_id_param.get_value() if timer_id_param else None

        if timer_id is None:
            logger.error("取消倒计时失败:缺少 'timer_id' 参数值。")
            return {"status": "error", "message": "缺少 'timer_id' 参数值"}

        try:
            # 确保 timer_id 是整数
            if not isinstance(timer_id, int):
                 timer_id = int(timer_id)
        except (ValueError, TypeError):
            logger.error(f"取消倒计时失败:无效的 'timer_id' {timer_id}。")
            return {"status": "error", "message": f"无效的 'timer_id': {timer_id}"}

        with self._lock:
            if timer_id in self._timers:
                timer = self._timers.pop(timer_id)
                timer.cancel()
                logger.info(f"倒计时 {timer_id} 已成功取消。")
                return {"status": "success", "message": f"倒计时 {timer_id} 已取消"}
            else:
                logger.warning(f"尝试取消不存在或已完成的倒计时 {timer_id}。")
                return {"status": "error", "message": f"找不到ID为 {timer_id} 的活动倒计时"}

    def cleanup(self):
        """在应用程序关闭时清理所有活动的计时器。"""
        logger.info("正在清理倒计时器...")
        with self._lock:
            active_timer_ids = list(self._timers.keys()) # 创建键的副本以安全迭代
            for timer_id in active_timer_ids:
                if timer_id in self._timers:
                    timer = self._timers.pop(timer_id)
                    timer.cancel()
                    logger.info(f"已取消后台计时器 {timer_id}")
        logger.info("倒计时器清理完成。")

# 注意:这个 cleanup 方法需要在应用程序关闭时被显式调用。
# ThingManager 或 Application 类可以负责在 shutdown 过程中调用其管理的 Things 的 cleanup 方法。