Yash030 Claude Opus 4.7 commited on
Commit
188ffa9
·
1 Parent(s): 84a115b

Fix stale sessions in admin dashboard and improve auto-routing health

Browse files

- Add session cleanup background task (runs every 60s) to remove inactive sessions
- Wire cleanup loop into AppRuntime startup/shutdown lifecycle
- Per-provider health parameters: NIM gets tighter TTL (15s/2 failures), Zen looser (60s/5)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

api/runtime.py CHANGED
@@ -4,6 +4,7 @@ from __future__ import annotations
4
 
5
  import asyncio
6
  import os
 
7
  from dataclasses import dataclass, field
8
  from typing import TYPE_CHECKING, Any
9
 
@@ -104,6 +105,7 @@ class AppRuntime:
104
  messaging_platform: MessagingPlatform | None = None
105
  message_handler: ClaudeMessageHandler | None = None
106
  cli_manager: CLISessionManager | None = None
 
107
 
108
  @classmethod
109
  def for_app(
@@ -135,6 +137,12 @@ class AppRuntime:
135
  # Pre-warm provider connections on startup for faster first request
136
  await self._warmup_providers()
137
  await self._start_messaging_if_configured()
 
 
 
 
 
 
138
  self._publish_state()
139
  except Exception as exc:
140
  log_startup_failure(self.settings, exc)
@@ -147,6 +155,11 @@ class AppRuntime:
147
 
148
  async def shutdown(self) -> None:
149
  verbose = self.settings.log_api_error_tracebacks
 
 
 
 
 
150
  if self.message_handler is not None:
151
  try:
152
  self.message_handler.session_store.flush_pending_save()
 
4
 
5
  import asyncio
6
  import os
7
+ from contextlib import suppress
8
  from dataclasses import dataclass, field
9
  from typing import TYPE_CHECKING, Any
10
 
 
105
  messaging_platform: MessagingPlatform | None = None
106
  message_handler: ClaudeMessageHandler | None = None
107
  cli_manager: CLISessionManager | None = None
108
+ _session_cleanup_task: asyncio.Task | None = field(default=None, init=False)
109
 
110
  @classmethod
111
  def for_app(
 
137
  # Pre-warm provider connections on startup for faster first request
138
  await self._warmup_providers()
139
  await self._start_messaging_if_configured()
140
+ # Start background session cleanup
141
+ from core.session_tracker import SessionTracker
142
+
143
+ self._session_cleanup_task = asyncio.create_task(
144
+ SessionTracker.get_instance().start_cleanup_loop()
145
+ )
146
  self._publish_state()
147
  except Exception as exc:
148
  log_startup_failure(self.settings, exc)
 
155
 
156
  async def shutdown(self) -> None:
157
  verbose = self.settings.log_api_error_tracebacks
158
+ # Cancel session cleanup task
159
+ if self._session_cleanup_task is not None:
160
+ self._session_cleanup_task.cancel()
161
+ with suppress(asyncio.CancelledError, asyncio.TimeoutError):
162
+ await asyncio.wait_for(self._session_cleanup_task, timeout=2.0)
163
  if self.message_handler is not None:
164
  try:
165
  self.message_handler.session_store.flush_pending_save()
core/session_tracker.py CHANGED
@@ -91,7 +91,7 @@ class SessionTracker:
91
  cls._instance = None
92
 
93
  def _cleanup_old_sessions(self) -> None:
94
- """Remove sessions with no recent activity."""
95
  now = time.monotonic()
96
  cutoff = now - (self._window_seconds * 2)
97
  to_remove = [
@@ -103,6 +103,19 @@ class SessionTracker:
103
  del self._sessions[sid]
104
  if sid in self._session_requests:
105
  del self._session_requests[sid]
 
 
 
 
 
 
 
 
 
 
 
 
 
106
 
107
  def _evict_lru_session(self) -> None:
108
  """Evict least recently used session when at capacity."""
 
91
  cls._instance = None
92
 
93
  def _cleanup_old_sessions(self) -> None:
94
+ """Remove sessions with no recent activity (must be called with lock held)."""
95
  now = time.monotonic()
96
  cutoff = now - (self._window_seconds * 2)
97
  to_remove = [
 
103
  del self._sessions[sid]
104
  if sid in self._session_requests:
105
  del self._session_requests[sid]
106
+ if to_remove:
107
+ logger.debug(
108
+ "SessionTracker: cleaned up {} stale sessions ({} remaining)",
109
+ len(to_remove),
110
+ len(self._sessions),
111
+ )
112
+
113
+ async def start_cleanup_loop(self, interval: float = 60.0) -> None:
114
+ """Background task: periodically clean up stale sessions."""
115
+ while True:
116
+ await asyncio.sleep(interval)
117
+ async with self._lock:
118
+ self._cleanup_old_sessions()
119
 
120
  def _evict_lru_session(self) -> None:
121
  """Evict least recently used session when at capacity."""
providers/rate_limit.py CHANGED
@@ -21,10 +21,25 @@ class ModelHealthTracker:
21
 
22
  _instance: ClassVar[ModelHealthTracker | None] = None
23
 
24
- def __init__(self, failure_ttl: float = 30.0, max_failures: int = 3) -> None:
 
 
 
 
 
 
 
 
 
25
  self._failure_ttl = failure_ttl
26
  self._max_failures = max_failures
 
 
 
 
27
  self._failures: dict[str, list[float]] = {}
 
 
28
 
29
  @classmethod
30
  def get_instance(cls) -> ModelHealthTracker:
@@ -32,6 +47,16 @@ class ModelHealthTracker:
32
  cls._instance = cls()
33
  return cls._instance
34
 
 
 
 
 
 
 
 
 
 
 
35
  def record_failure(self, model_ref: str) -> None:
36
  """Record a failure timestamp for a model."""
37
  now = time.monotonic()
@@ -44,16 +69,17 @@ class ModelHealthTracker:
44
  """Check if model has had fewer than max_failures in the TTL window."""
45
  if model_ref not in self._failures:
46
  return True
47
- cutoff = time.monotonic() - self._failure_ttl
 
48
  recent = [t for t in self._failures[model_ref] if t > cutoff]
49
  self._failures[model_ref] = recent
50
- healthy = len(recent) < self._max_failures
51
  if not healthy:
52
  logger.debug(
53
  "HEALTH: model '{}' is unhealthy ({} failures in {}s)",
54
  model_ref,
55
  len(recent),
56
- self._failure_ttl,
57
  )
58
  return healthy
59
 
@@ -61,7 +87,8 @@ class ModelHealthTracker:
61
  """Get number of recent failures for a model."""
62
  if model_ref not in self._failures:
63
  return 0
64
- cutoff = time.monotonic() - self._failure_ttl
 
65
  return len([t for t in self._failures[model_ref] if t > cutoff])
66
 
67
  def clear_failures(self, model_ref: str) -> None:
 
21
 
22
  _instance: ClassVar[ModelHealthTracker | None] = None
23
 
24
+ def __init__(
25
+ self,
26
+ failure_ttl: float = 30.0,
27
+ max_failures: int = 3,
28
+ *,
29
+ failure_ttl_nim: float = 15.0,
30
+ max_failures_nim: int = 2,
31
+ failure_ttl_zen: float = 60.0,
32
+ max_failures_zen: int = 5,
33
+ ) -> None:
34
  self._failure_ttl = failure_ttl
35
  self._max_failures = max_failures
36
+ self._failure_ttl_nim = failure_ttl_nim
37
+ self._max_failures_nim = max_failures_nim
38
+ self._failure_ttl_zen = failure_ttl_zen
39
+ self._max_failures_zen = max_failures_zen
40
  self._failures: dict[str, list[float]] = {}
41
+ self._failure_ttls: dict[str, float] = {}
42
+ self._max_failures_map: dict[str, int] = {}
43
 
44
  @classmethod
45
  def get_instance(cls) -> ModelHealthTracker:
 
47
  cls._instance = cls()
48
  return cls._instance
49
 
50
+ def _params_for(self, model_ref: str) -> tuple[float, int]:
51
+ """Return (failure_ttl, max_failures) for a model based on provider."""
52
+ if model_ref in self._failure_ttls:
53
+ return self._failure_ttls[model_ref], self._max_failures_map[model_ref]
54
+ if model_ref.startswith("zen/"):
55
+ return self._failure_ttl_zen, self._max_failures_zen
56
+ if model_ref.startswith("nvidia_nim/"):
57
+ return self._failure_ttl_nim, self._max_failures_nim
58
+ return self._failure_ttl, self._max_failures
59
+
60
  def record_failure(self, model_ref: str) -> None:
61
  """Record a failure timestamp for a model."""
62
  now = time.monotonic()
 
69
  """Check if model has had fewer than max_failures in the TTL window."""
70
  if model_ref not in self._failures:
71
  return True
72
+ ttl, max_f = self._params_for(model_ref)
73
+ cutoff = time.monotonic() - ttl
74
  recent = [t for t in self._failures[model_ref] if t > cutoff]
75
  self._failures[model_ref] = recent
76
+ healthy = len(recent) < max_f
77
  if not healthy:
78
  logger.debug(
79
  "HEALTH: model '{}' is unhealthy ({} failures in {}s)",
80
  model_ref,
81
  len(recent),
82
+ ttl,
83
  )
84
  return healthy
85
 
 
87
  """Get number of recent failures for a model."""
88
  if model_ref not in self._failures:
89
  return 0
90
+ ttl, _ = self._params_for(model_ref)
91
+ cutoff = time.monotonic() - ttl
92
  return len([t for t in self._failures[model_ref] if t > cutoff])
93
 
94
  def clear_failures(self, model_ref: str) -> None: