ci_inputs_system / ci_trigger_engine.py
Ihorog's picture
Create ci_trigger_engine.py
a264c57 verified
raw
history blame contribute delete
939 Bytes
import time, threading
from datetime import datetime
from typing import Dict, Any
class CiTriggerEngine:
def __init__(self):
self.events: Dict[str, Any] = {}
self.active = True
def add_input(self, data: Dict[str, Any]):
self.events[data['id']] = data
print(f"[TRIGGER] Подію {data['id']} додано до черги.")
def process_inputs(self):
while self.active:
now = datetime.utcnow()
for eid, ev in list(self.events.items()):
ts = datetime.fromisoformat(ev['timestamp'])
if now >= ts:
self.trigger(ev)
del self.events[eid]
time.sleep(1)
def trigger(self, event):
print(f"[CiTrigger] ⚡ Активовано вхід: {event['назва']}")
trigger_engine = CiTriggerEngine()
threading.Thread(target=trigger_engine.process_inputs, daemon=True).start()