""" Agent Session Manager Thread-safe singleton that tracks active agent interaction sessions. Each session maps a (user_id, instance_id) pair to an AgentSession containing the proxy, conversation history, and step count. Follows the same singleton pattern as ItemStateManager and UserStateManager. """ import threading import time import logging from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple from .base import AgentMessage, BaseAgentProxy logger = logging.getLogger(__name__) @dataclass class AgentSession: """An active agent interaction session.""" user_id: str instance_id: str proxy: BaseAgentProxy task_description: str proxy_context: dict = field(default_factory=dict) messages: List[AgentMessage] = field(default_factory=list) step_count: int = 0 started_at: float = field(default_factory=time.time) finished: bool = False class AgentSessionManager: """Thread-safe manager for active agent sessions.""" def __init__(self, config: dict): self.config = config self._sessions: Dict[Tuple[str, str], AgentSession] = {} self._lock = threading.RLock() def create_session( self, user_id: str, instance_id: str, proxy: BaseAgentProxy, task_description: str, ) -> AgentSession: """Create a new session for a user/instance pair.""" with self._lock: key = (user_id, instance_id) if key in self._sessions and not self._sessions[key].finished: logger.warning( f"Session already exists for {key}, returning existing" ) return self._sessions[key] proxy_context = proxy.start_session(task_description) session = AgentSession( user_id=user_id, instance_id=instance_id, proxy=proxy, task_description=task_description, proxy_context=proxy_context, ) self._sessions[key] = session logger.debug(f"Created agent session for {key}") return session def get_session( self, user_id: str, instance_id: str ) -> Optional[AgentSession]: """Get an active session, or None if not found.""" with self._lock: return self._sessions.get((user_id, instance_id)) def remove_session(self, user_id: str, instance_id: str): """Remove a session and clean up proxy resources.""" with self._lock: key = (user_id, instance_id) session = self._sessions.pop(key, None) if session: try: session.proxy.end_session(session.proxy_context) except Exception as e: logger.warning(f"Error ending proxy session for {key}: {e}") logger.debug(f"Removed agent session for {key}") # Singleton management _AGENT_SESSION_MANAGER: Optional[AgentSessionManager] = None _AGENT_SESSION_MANAGER_LOCK = threading.Lock() def init_agent_session_manager(config: dict) -> AgentSessionManager: """Initialize the singleton AgentSessionManager.""" global _AGENT_SESSION_MANAGER if _AGENT_SESSION_MANAGER is None: with _AGENT_SESSION_MANAGER_LOCK: if _AGENT_SESSION_MANAGER is None: _AGENT_SESSION_MANAGER = AgentSessionManager(config) return _AGENT_SESSION_MANAGER def get_agent_session_manager() -> AgentSessionManager: """Get the singleton AgentSessionManager.""" global _AGENT_SESSION_MANAGER if _AGENT_SESSION_MANAGER is None: raise ValueError("AgentSessionManager has not been initialized yet!") return _AGENT_SESSION_MANAGER def clear_agent_session_manager(): """Clear the singleton instance (for testing).""" global _AGENT_SESSION_MANAGER with _AGENT_SESSION_MANAGER_LOCK: _AGENT_SESSION_MANAGER = None