File size: 5,173 Bytes
d57737f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
Module-level session registry for AWM environments.

Tracks active AWMEnvironment instances with idle timestamps, and runs a
background daemon thread that periodically kills idle sessions.

All config values are read from config.py:
    MAX_IDLE_TIME, CLEANUP_INTERVAL, ALLOWED_IDLE_SESSIONS
"""

import logging
import threading
import time
import weakref
from typing import TYPE_CHECKING, Any

from .config import ALLOWED_IDLE_SESSIONS, CLEANUP_INTERVAL, MAX_IDLE_TIME

if TYPE_CHECKING:
    from .awm_environment import AWMEnvironment

logger = logging.getLogger(__name__)


class SessionRegistry:
    """Thread-safe singleton tracking active AWMEnvironment instances."""

    def __init__(self) -> None:
        self._lock = threading.Lock()
        # session_id -> {env: weakref, last_active: float, scenario: str|None}
        self._sessions: dict[str, dict[str, Any]] = {}
        self._daemon_started = False

    def register(
        self, session_id: str, env: "AWMEnvironment", scenario: str | None = None
    ) -> None:
        with self._lock:
            self._sessions[session_id] = {
                "env": weakref.ref(env),
                "last_active": time.monotonic(),
                "scenario": scenario,
                "registered_at": time.time(),
            }
        self._ensure_daemon()

    def unregister(self, session_id: str) -> None:
        with self._lock:
            self._sessions.pop(session_id, None)

    def touch(self, session_id: str) -> None:
        """Update last_active timestamp for a session."""
        with self._lock:
            entry = self._sessions.get(session_id)
            if entry is not None:
                entry["last_active"] = time.monotonic()

    @property
    def active_count(self) -> int:
        with self._lock:
            return len(self._sessions)

    def get_stats(self) -> dict[str, Any]:
        """Return registry stats for the /stats endpoint."""
        now = time.monotonic()
        with self._lock:
            idle_times = []
            scenarios: dict[str, int] = {}
            for entry in self._sessions.values():
                idle = now - entry["last_active"]
                idle_times.append(idle)
                sc = entry.get("scenario") or "unknown"
                scenarios[sc] = scenarios.get(sc, 0) + 1

            return {
                "total_sessions": len(self._sessions),
                "max_idle_time_config": MAX_IDLE_TIME,
                "cleanup_interval_config": CLEANUP_INTERVAL,
                "allowed_idle_sessions_config": ALLOWED_IDLE_SESSIONS,
                "max_idle_s": round(max(idle_times), 1) if idle_times else 0,
                "scenarios": scenarios,
            }

    def cleanup_idle(self) -> int:
        """Kill sessions that have been idle longer than MAX_IDLE_TIME.

        Only triggers when total sessions exceed ALLOWED_IDLE_SESSIONS.
        Returns the number of sessions cleaned up.
        """
        now = time.monotonic()
        to_remove: list[str] = []

        with self._lock:
            if len(self._sessions) <= ALLOWED_IDLE_SESSIONS:
                return 0

            for sid, entry in self._sessions.items():
                idle = now - entry["last_active"]
                if idle > MAX_IDLE_TIME:
                    to_remove.append(sid)

        cleaned = 0
        for sid in to_remove:
            with self._lock:
                entry = self._sessions.pop(sid, None)
            if entry is None:
                continue

            env_ref = entry["env"]
            env = env_ref()
            if env is not None:
                try:
                    env._cleanup_session()
                    logger.info(
                        f"[cleanup] Killed idle session {sid} "
                        f"(scenario={entry.get('scenario')})"
                    )
                    cleaned += 1
                except Exception as e:
                    logger.warning(f"[cleanup] Error cleaning session {sid}: {e}")

        if cleaned > 0:
            logger.info(f"[cleanup] Cleaned {cleaned} idle sessions")

        return cleaned

    def _ensure_daemon(self) -> None:
        """Start the cleanup daemon thread if not already running."""
        if self._daemon_started:
            return
        self._daemon_started = True
        t = threading.Thread(target=self._daemon_loop, daemon=True, name="awm-cleanup")
        t.start()
        logger.info("[cleanup] Daemon thread started")

    def _daemon_loop(self) -> None:
        """Background loop that periodically cleans idle sessions."""
        while True:
            time.sleep(CLEANUP_INTERVAL)
            try:
                self.cleanup_idle()
            except Exception as e:
                logger.error(f"[cleanup] Daemon error: {e}")

            # Prune dead weakrefs
            with self._lock:
                dead = [
                    sid
                    for sid, entry in self._sessions.items()
                    if entry["env"]() is None
                ]
                for sid in dead:
                    del self._sessions[sid]


# Module-level singleton
registry = SessionRegistry()