Spaces:
Sleeping
Sleeping
| # socket_manager.py (Optimized) | |
| # - Non-blocking emit queue with background worker | |
| # - Rate-limited batching for frequent events | |
| # - Backwards-compatible init_socketio & emit_new_event API | |
| import threading | |
| import time | |
| import queue | |
| _emit_q = queue.Queue(maxsize=2000) | |
| _socketio = None | |
| _emit_lock = threading.Lock() | |
| _worker_thr = None | |
| _stop_worker = threading.Event() | |
| # batch/rate config | |
| _BATCH_INTERVAL = 0.5 # seconds between worker sends | |
| _BATCH_MAX = 10 # max events to bundle per emit | |
| def init_socketio(socketio): | |
| """Initialize global socketio and start background emit worker.""" | |
| global _socketio, _worker_thr | |
| _socketio = socketio | |
| print("✅ SocketIO initialized (thread-safe)") | |
| if _worker_thr is None or not _worker_thr.is_alive(): | |
| _worker_thr = threading.Thread(target=_emit_worker, daemon=True) | |
| _worker_thr.start() | |
| def _emit_worker(): | |
| """Background worker: drains _emit_q and emits aggregated payloads at intervals.""" | |
| last_send = 0.0 | |
| buffer = [] | |
| while not _stop_worker.is_set(): | |
| try: | |
| evt = _emit_q.get(timeout=_BATCH_INTERVAL) | |
| buffer.append(evt) | |
| except Exception: | |
| # timeout, flush if buffer exists | |
| pass | |
| now = time.time() | |
| if buffer and (now - last_send >= _BATCH_INTERVAL or len(buffer) >= _BATCH_MAX): | |
| payload = {"count": len(buffer), "items": buffer[:_BATCH_MAX]} | |
| try: | |
| if _socketio: | |
| # emit in background so worker isn't blocked on network | |
| _socketio.start_background_task(lambda: _socketio.emit("new_event", payload, namespace="/")) | |
| except Exception as e: | |
| print("⚠️ emit worker error:", e) | |
| buffer.clear() | |
| last_send = now | |
| # final flush on shutdown | |
| if buffer and _socketio: | |
| try: | |
| _socketio.start_background_task(lambda: _socketio.emit("new_event", {"count": len(buffer), "items": buffer}, namespace="/")) | |
| except Exception: | |
| pass | |
| def emit_new_event(evt): | |
| """Enqueue event for background emit. Non-blocking. | |
| Compatible with previous API: callers can pass full event dicts. | |
| """ | |
| try: | |
| _emit_q.put_nowait(evt) | |
| except queue.Full: | |
| # drop silently (prefer availability over backlog) | |
| return | |
| def shutdown_socket_manager(timeout=2): | |
| """Stop background worker gracefully.""" | |
| _stop_worker.set() | |
| if _worker_thr and _worker_thr.is_alive(): | |
| _worker_thr.join(timeout=timeout) | |