File size: 939 Bytes
a264c57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time, threading
from datetime import datetime
from typing import Dict, Any

class CiTriggerEngine:
    def __init__(self):
        self.events: Dict[str, Any] = {}
        self.active = True

    def add_input(self, data: Dict[str, Any]):
        self.events[data['id']] = data
        print(f"[TRIGGER] Подію {data['id']} додано до черги.")

    def process_inputs(self):
        while self.active:
            now = datetime.utcnow()
            for eid, ev in list(self.events.items()):
                ts = datetime.fromisoformat(ev['timestamp'])
                if now >= ts:
                    self.trigger(ev)
                    del self.events[eid]
            time.sleep(1)

    def trigger(self, event):
        print(f"[CiTrigger] ⚡ Активовано вхід: {event['назва']}")

trigger_engine = CiTriggerEngine()
threading.Thread(target=trigger_engine.process_inputs, daemon=True).start()