Spaces:
Paused
Paused
| """ | |
| Coding Agent Runner Manager | |
| Singleton manager for CodingAgentRunner sessions. | |
| Mirrors AgentRunnerManager pattern. | |
| """ | |
| import logging | |
| import threading | |
| import time | |
| import uuid | |
| from typing import Dict, List, Optional | |
| from .coding_agent_runner import CodingAgentRunner, CodingAgentConfig, CodingAgentState | |
| logger = logging.getLogger(__name__) | |
| class CodingAgentRunnerManager: | |
| """Singleton manager for coding agent sessions.""" | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __init__(self, max_sessions: int = 10, session_ttl: int = 3600): | |
| self._sessions: Dict[str, CodingAgentRunner] = {} | |
| self._session_keys: Dict[str, str] = {} # user:instance -> session_id | |
| self._max_sessions = max_sessions | |
| self._session_ttl = session_ttl | |
| self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) | |
| self._cleanup_thread.start() | |
| def get_instance(cls, **kwargs) -> "CodingAgentRunnerManager": | |
| if cls._instance is None: | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = cls(**kwargs) | |
| return cls._instance | |
| def clear_instance(cls): | |
| with cls._lock: | |
| if cls._instance: | |
| for runner in cls._instance._sessions.values(): | |
| runner.cleanup() | |
| cls._instance = None | |
| def create_session(self, user_id: str, instance_id: str, | |
| config: CodingAgentConfig, trace_dir: str = "") -> CodingAgentRunner: | |
| """Create a new coding agent session.""" | |
| key = f"{user_id}:{instance_id}" | |
| # Check for existing active session | |
| if key in self._session_keys: | |
| existing = self._sessions.get(self._session_keys[key]) | |
| if existing and existing.state in (CodingAgentState.RUNNING, CodingAgentState.PAUSED): | |
| return existing | |
| if len(self._sessions) >= self._max_sessions: | |
| self._evict_oldest() | |
| session_id = str(uuid.uuid4()) | |
| runner = CodingAgentRunner(session_id, config, trace_dir) | |
| self._sessions[session_id] = runner | |
| self._session_keys[key] = session_id | |
| logger.info(f"Created coding agent session {session_id} for {key}") | |
| return runner | |
| def get_session(self, session_id: str) -> Optional[CodingAgentRunner]: | |
| return self._sessions.get(session_id) | |
| def get_session_by_key(self, user_id: str, instance_id: str) -> Optional[CodingAgentRunner]: | |
| key = f"{user_id}:{instance_id}" | |
| sid = self._session_keys.get(key) | |
| if sid: | |
| return self._sessions.get(sid) | |
| return None | |
| def remove_session(self, session_id: str) -> None: | |
| runner = self._sessions.pop(session_id, None) | |
| if runner: | |
| runner.cleanup() | |
| # Remove from keys | |
| self._session_keys = { | |
| k: v for k, v in self._session_keys.items() if v != session_id | |
| } | |
| def list_sessions(self) -> List[Dict]: | |
| return [r.get_state_summary() for r in self._sessions.values()] | |
| def _evict_oldest(self): | |
| """Remove the oldest completed/error session.""" | |
| for sid, runner in sorted(self._sessions.items()): | |
| if runner.state in (CodingAgentState.COMPLETED, CodingAgentState.ERROR): | |
| self.remove_session(sid) | |
| return | |
| def _cleanup_loop(self): | |
| """Background cleanup of expired sessions.""" | |
| while True: | |
| time.sleep(60) | |
| expired = [] | |
| for sid, runner in list(self._sessions.items()): | |
| if runner.state in (CodingAgentState.COMPLETED, CodingAgentState.ERROR): | |
| if time.time() - runner._started_at > self._session_ttl: | |
| expired.append(sid) | |
| for sid in expired: | |
| self.remove_session(sid) | |
| logger.debug(f"Cleaned up expired session {sid}") | |