import time, threading from datetime import datetime from typing import Dict, Any class CiTriggerEngine: def __init__(self): self.events: Dict[str, Any] = {} self.active = True def add_input(self, data: Dict[str, Any]): self.events[data['id']] = data print(f"[TRIGGER] Подію {data['id']} додано до черги.") def process_inputs(self): while self.active: now = datetime.utcnow() for eid, ev in list(self.events.items()): ts = datetime.fromisoformat(ev['timestamp']) if now >= ts: self.trigger(ev) del self.events[eid] time.sleep(1) def trigger(self, event): print(f"[CiTrigger] ⚡ Активовано вхід: {event['назва']}") trigger_engine = CiTriggerEngine() threading.Thread(target=trigger_engine.process_inputs, daemon=True).start()