Spaces:
Running
Running
| """ | |
| Module-level session registry for AWM environments. | |
| Tracks active AWMEnvironment instances with idle timestamps, and runs a | |
| background daemon thread that periodically kills idle sessions. | |
| All config values are read from config.py: | |
| MAX_IDLE_TIME, CLEANUP_INTERVAL, ALLOWED_IDLE_SESSIONS | |
| """ | |
| import logging | |
| import threading | |
| import time | |
| import weakref | |
| from typing import TYPE_CHECKING, Any | |
| from .config import ALLOWED_IDLE_SESSIONS, CLEANUP_INTERVAL, MAX_IDLE_TIME | |
| if TYPE_CHECKING: | |
| from .awm_environment import AWMEnvironment | |
| logger = logging.getLogger(__name__) | |
| class SessionRegistry: | |
| """Thread-safe singleton tracking active AWMEnvironment instances.""" | |
| def __init__(self) -> None: | |
| self._lock = threading.Lock() | |
| # session_id -> {env: weakref, last_active: float, scenario: str|None} | |
| self._sessions: dict[str, dict[str, Any]] = {} | |
| self._daemon_started = False | |
| def register( | |
| self, session_id: str, env: "AWMEnvironment", scenario: str | None = None | |
| ) -> None: | |
| with self._lock: | |
| self._sessions[session_id] = { | |
| "env": weakref.ref(env), | |
| "last_active": time.monotonic(), | |
| "scenario": scenario, | |
| "registered_at": time.time(), | |
| } | |
| self._ensure_daemon() | |
| def unregister(self, session_id: str) -> None: | |
| with self._lock: | |
| self._sessions.pop(session_id, None) | |
| def touch(self, session_id: str) -> None: | |
| """Update last_active timestamp for a session.""" | |
| with self._lock: | |
| entry = self._sessions.get(session_id) | |
| if entry is not None: | |
| entry["last_active"] = time.monotonic() | |
| def active_count(self) -> int: | |
| with self._lock: | |
| return len(self._sessions) | |
| def get_stats(self) -> dict[str, Any]: | |
| """Return registry stats for the /stats endpoint.""" | |
| now = time.monotonic() | |
| with self._lock: | |
| idle_times = [] | |
| scenarios: dict[str, int] = {} | |
| for entry in self._sessions.values(): | |
| idle = now - entry["last_active"] | |
| idle_times.append(idle) | |
| sc = entry.get("scenario") or "unknown" | |
| scenarios[sc] = scenarios.get(sc, 0) + 1 | |
| return { | |
| "total_sessions": len(self._sessions), | |
| "max_idle_time_config": MAX_IDLE_TIME, | |
| "cleanup_interval_config": CLEANUP_INTERVAL, | |
| "allowed_idle_sessions_config": ALLOWED_IDLE_SESSIONS, | |
| "max_idle_s": round(max(idle_times), 1) if idle_times else 0, | |
| "scenarios": scenarios, | |
| } | |
| def cleanup_idle(self) -> int: | |
| """Kill sessions that have been idle longer than MAX_IDLE_TIME. | |
| Only triggers when total sessions exceed ALLOWED_IDLE_SESSIONS. | |
| Returns the number of sessions cleaned up. | |
| """ | |
| now = time.monotonic() | |
| to_remove: list[str] = [] | |
| with self._lock: | |
| if len(self._sessions) <= ALLOWED_IDLE_SESSIONS: | |
| return 0 | |
| for sid, entry in self._sessions.items(): | |
| idle = now - entry["last_active"] | |
| if idle > MAX_IDLE_TIME: | |
| to_remove.append(sid) | |
| cleaned = 0 | |
| for sid in to_remove: | |
| with self._lock: | |
| entry = self._sessions.pop(sid, None) | |
| if entry is None: | |
| continue | |
| env_ref = entry["env"] | |
| env = env_ref() | |
| if env is not None: | |
| try: | |
| env._cleanup_session() | |
| logger.info( | |
| f"[cleanup] Killed idle session {sid} " | |
| f"(scenario={entry.get('scenario')})" | |
| ) | |
| cleaned += 1 | |
| except Exception as e: | |
| logger.warning(f"[cleanup] Error cleaning session {sid}: {e}") | |
| if cleaned > 0: | |
| logger.info(f"[cleanup] Cleaned {cleaned} idle sessions") | |
| return cleaned | |
| def _ensure_daemon(self) -> None: | |
| """Start the cleanup daemon thread if not already running.""" | |
| if self._daemon_started: | |
| return | |
| self._daemon_started = True | |
| t = threading.Thread(target=self._daemon_loop, daemon=True, name="awm-cleanup") | |
| t.start() | |
| logger.info("[cleanup] Daemon thread started") | |
| def _daemon_loop(self) -> None: | |
| """Background loop that periodically cleans idle sessions.""" | |
| while True: | |
| time.sleep(CLEANUP_INTERVAL) | |
| try: | |
| self.cleanup_idle() | |
| except Exception as e: | |
| logger.error(f"[cleanup] Daemon error: {e}") | |
| # Prune dead weakrefs | |
| with self._lock: | |
| dead = [ | |
| sid | |
| for sid, entry in self._sessions.items() | |
| if entry["env"]() is None | |
| ] | |
| for sid in dead: | |
| del self._sessions[sid] | |
| # Module-level singleton | |
| registry = SessionRegistry() | |