File size: 2,564 Bytes
0f8fe33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# socket_manager.py (Optimized)
# - Non-blocking emit queue with background worker
# - Rate-limited batching for frequent events
# - Backwards-compatible init_socketio & emit_new_event API

import threading
import time
import queue

_emit_q = queue.Queue(maxsize=2000)
_socketio = None
_emit_lock = threading.Lock()
_worker_thr = None
_stop_worker = threading.Event()

# batch/rate config
_BATCH_INTERVAL = 0.5  # seconds between worker sends
_BATCH_MAX = 10        # max events to bundle per emit


def init_socketio(socketio):
    """Initialize global socketio and start background emit worker."""
    global _socketio, _worker_thr
    _socketio = socketio
    print("✅ SocketIO initialized (thread-safe)")
    if _worker_thr is None or not _worker_thr.is_alive():
        _worker_thr = threading.Thread(target=_emit_worker, daemon=True)
        _worker_thr.start()


def _emit_worker():
    """Background worker: drains _emit_q and emits aggregated payloads at intervals."""
    last_send = 0.0
    buffer = []
    while not _stop_worker.is_set():
        try:
            evt = _emit_q.get(timeout=_BATCH_INTERVAL)
            buffer.append(evt)
        except Exception:
            # timeout, flush if buffer exists
            pass

        now = time.time()
        if buffer and (now - last_send >= _BATCH_INTERVAL or len(buffer) >= _BATCH_MAX):
            payload = {"count": len(buffer), "items": buffer[:_BATCH_MAX]}
            try:
                if _socketio:
                    # emit in background so worker isn't blocked on network
                    _socketio.start_background_task(lambda: _socketio.emit("new_event", payload, namespace="/"))
            except Exception as e:
                print("⚠️ emit worker error:", e)
            buffer.clear()
            last_send = now

    # final flush on shutdown
    if buffer and _socketio:
        try:
            _socketio.start_background_task(lambda: _socketio.emit("new_event", {"count": len(buffer), "items": buffer}, namespace="/"))
        except Exception:
            pass


def emit_new_event(evt):
    """Enqueue event for background emit. Non-blocking.

    Compatible with previous API: callers can pass full event dicts.
    """
    try:
        _emit_q.put_nowait(evt)
    except queue.Full:
        # drop silently (prefer availability over backlog)
        return


def shutdown_socket_manager(timeout=2):
    """Stop background worker gracefully."""
    _stop_worker.set()
    if _worker_thr and _worker_thr.is_alive():
        _worker_thr.join(timeout=timeout)