Spaces:
Sleeping
Sleeping
File size: 2,564 Bytes
0f8fe33 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# socket_manager.py (Optimized)
# - Non-blocking emit queue with background worker
# - Rate-limited batching for frequent events
# - Backwards-compatible init_socketio & emit_new_event API
import threading
import time
import queue
_emit_q = queue.Queue(maxsize=2000)
_socketio = None
_emit_lock = threading.Lock()
_worker_thr = None
_stop_worker = threading.Event()
# batch/rate config
_BATCH_INTERVAL = 0.5 # seconds between worker sends
_BATCH_MAX = 10 # max events to bundle per emit
def init_socketio(socketio):
"""Initialize global socketio and start background emit worker."""
global _socketio, _worker_thr
_socketio = socketio
print("✅ SocketIO initialized (thread-safe)")
if _worker_thr is None or not _worker_thr.is_alive():
_worker_thr = threading.Thread(target=_emit_worker, daemon=True)
_worker_thr.start()
def _emit_worker():
"""Background worker: drains _emit_q and emits aggregated payloads at intervals."""
last_send = 0.0
buffer = []
while not _stop_worker.is_set():
try:
evt = _emit_q.get(timeout=_BATCH_INTERVAL)
buffer.append(evt)
except Exception:
# timeout, flush if buffer exists
pass
now = time.time()
if buffer and (now - last_send >= _BATCH_INTERVAL or len(buffer) >= _BATCH_MAX):
payload = {"count": len(buffer), "items": buffer[:_BATCH_MAX]}
try:
if _socketio:
# emit in background so worker isn't blocked on network
_socketio.start_background_task(lambda: _socketio.emit("new_event", payload, namespace="/"))
except Exception as e:
print("⚠️ emit worker error:", e)
buffer.clear()
last_send = now
# final flush on shutdown
if buffer and _socketio:
try:
_socketio.start_background_task(lambda: _socketio.emit("new_event", {"count": len(buffer), "items": buffer}, namespace="/"))
except Exception:
pass
def emit_new_event(evt):
"""Enqueue event for background emit. Non-blocking.
Compatible with previous API: callers can pass full event dicts.
"""
try:
_emit_q.put_nowait(evt)
except queue.Full:
# drop silently (prefer availability over backlog)
return
def shutdown_socket_manager(timeout=2):
"""Stop background worker gracefully."""
_stop_worker.set()
if _worker_thr and _worker_thr.is_alive():
_worker_thr.join(timeout=timeout)
|