"""Event-driven CDP session management. Manages CDP sessions by listening to Target.attachedToTarget and Target.detachedFromTarget events, ensuring the session pool always reflects the current browser state. """ import asyncio from typing import TYPE_CHECKING from cdp_use.cdp.target import AttachedToTargetEvent, DetachedFromTargetEvent, SessionID, TargetID if TYPE_CHECKING: from browser_use.browser.session import BrowserSession, CDPSession class SessionManager: """Event-driven CDP session manager. Automatically synchronizes the CDP session pool with browser state via CDP events. Key features: - Sessions added/removed automatically via Target attach/detach events - Multiple sessions can attach to the same target - Targets only removed when ALL sessions detach - No stale sessions - pool always reflects browser reality """ def __init__(self, browser_session: 'BrowserSession'): self.browser_session = browser_session self.logger = browser_session.logger # Target -> set of sessions attached to it self._target_sessions: dict[TargetID, set[SessionID]] = {} # Session -> target mapping for reverse lookup self._session_to_target: dict[SessionID, TargetID] = {} # Target -> type cache (page, iframe, worker, etc.) - types are immutable self._target_types: dict[TargetID, str] = {} # Lock for thread-safe access self._lock = asyncio.Lock() # Lock for recovery to prevent concurrent recovery attempts self._recovery_lock = asyncio.Lock() async def start_monitoring(self) -> None: """Start monitoring Target attach/detach events. Registers CDP event handlers to keep the session pool synchronized with browser state. """ if not self.browser_session._cdp_client_root: raise RuntimeError('CDP client not initialized') # Capture cdp_client_root in closure to avoid type errors cdp_client = self.browser_session._cdp_client_root # Register synchronous event handlers (CDP requirement) def on_attached(event: AttachedToTargetEvent, session_id: SessionID | None = None): event_session_id = event['sessionId'] target_type = event['targetInfo'].get('type', 'unknown') # Enable auto-attach for this session's children async def _enable_auto_attach(): try: await cdp_client.send.Target.setAutoAttach( params={'autoAttach': True, 'waitForDebuggerOnStart': False, 'flatten': True}, session_id=event_session_id ) self.logger.debug(f'[SessionManager] Auto-attach enabled for {target_type} session {event_session_id[:8]}...') except Exception as e: error_str = str(e) # Expected for short-lived targets (workers, temp iframes) that detach before task executes if '-32001' in error_str or 'Session with given id not found' in error_str: self.logger.debug( f'[SessionManager] Auto-attach skipped for {target_type} session {event_session_id[:8]}... ' f'(already detached - normal for short-lived targets)' ) else: self.logger.debug(f'[SessionManager] Auto-attach failed for {target_type}: {e}') # Schedule auto-attach and pool management asyncio.create_task(_enable_auto_attach()) asyncio.create_task(self._handle_target_attached(event)) def on_detached(event: DetachedFromTargetEvent, session_id: SessionID | None = None): asyncio.create_task(self._handle_target_detached(event)) self.browser_session._cdp_client_root.register.Target.attachedToTarget(on_attached) self.browser_session._cdp_client_root.register.Target.detachedFromTarget(on_detached) self.logger.debug('[SessionManager] Event monitoring started') async def get_session_for_target(self, target_id: TargetID) -> 'CDPSession | None': """Get the current valid session for a target. Args: target_id: Target ID to get session for Returns: CDPSession if exists, None if target has detached """ async with self._lock: return self.browser_session._cdp_session_pool.get(target_id) async def validate_session(self, target_id: TargetID) -> bool: """Check if a target still has active sessions. Args: target_id: Target ID to validate Returns: True if target has active sessions, False if it should be removed """ async with self._lock: if target_id not in self._target_sessions: return False return len(self._target_sessions[target_id]) > 0 async def clear(self) -> None: """Clear all session tracking for cleanup.""" async with self._lock: self._target_sessions.clear() self._session_to_target.clear() self._target_types.clear() self.logger.info('[SessionManager] Cleared all session tracking') async def is_target_valid(self, target_id: TargetID) -> bool: """Check if a target is still valid and has active sessions. Args: target_id: Target ID to validate Returns: True if target is valid and has active sessions, False otherwise """ async with self._lock: if target_id not in self._target_sessions: return False return len(self._target_sessions[target_id]) > 0 async def _handle_target_attached(self, event: AttachedToTargetEvent) -> None: """Handle Target.attachedToTarget event. Called automatically by Chrome when a new target/session is created. This is the ONLY place where sessions are added to the pool. """ target_id = event['targetInfo']['targetId'] session_id = event['sessionId'] target_type = event['targetInfo']['type'] waiting_for_debugger = event.get('waitingForDebugger', False) self.logger.debug( f'[SessionManager] Target attached: {target_id[:8]}... (session={session_id[:8]}..., ' f'type={target_type}, waitingForDebugger={waiting_for_debugger})' ) async with self._lock: # Track this session for the target if target_id not in self._target_sessions: self._target_sessions[target_id] = set() self._target_sessions[target_id].add(session_id) self._session_to_target[session_id] = target_id # Cache target type (immutable, set once) if target_id not in self._target_types: self._target_types[target_id] = target_type # Create CDPSession wrapper and add to pool if target_id not in self.browser_session._cdp_session_pool: from browser_use.browser.session import CDPSession assert self.browser_session._cdp_client_root is not None, 'Root CDP client required' cdp_session = CDPSession( cdp_client=self.browser_session._cdp_client_root, target_id=target_id, session_id=session_id, title=event['targetInfo'].get('title', 'Unknown title'), url=event['targetInfo'].get('url', 'about:blank'), ) self.browser_session._cdp_session_pool[target_id] = cdp_session self.logger.debug( f'[SessionManager] Created session for target {target_id[:8]}... ' f'(pool size: {len(self.browser_session._cdp_session_pool)})' ) else: # Update existing session with new session_id existing = self.browser_session._cdp_session_pool[target_id] existing.session_id = session_id existing.title = event['targetInfo'].get('title', existing.title) existing.url = event['targetInfo'].get('url', existing.url) # Resume execution if waiting for debugger if waiting_for_debugger: try: assert self.browser_session._cdp_client_root is not None await self.browser_session._cdp_client_root.send.Runtime.runIfWaitingForDebugger(session_id=session_id) self.logger.debug(f'[SessionManager] Resumed execution for session {session_id[:8]}...') except Exception as e: self.logger.warning(f'[SessionManager] Failed to resume execution: {e}') async def _handle_target_detached(self, event: DetachedFromTargetEvent) -> None: """Handle Target.detachedFromTarget event. Called automatically by Chrome when a target/session is destroyed. This is the ONLY place where sessions are removed from the pool. """ session_id = event['sessionId'] target_id = event.get('targetId') # May be empty # If targetId not in event, look it up via session mapping if not target_id: async with self._lock: target_id = self._session_to_target.get(session_id) if not target_id: self.logger.warning(f'[SessionManager] Session detached but target unknown (session={session_id[:8]}...)') return agent_focus_lost = False target_fully_removed = False target_type = None async with self._lock: # Remove this session from target's session set if target_id in self._target_sessions: self._target_sessions[target_id].discard(session_id) remaining_sessions = len(self._target_sessions[target_id]) self.logger.debug( f'[SessionManager] Session detached: target={target_id[:8]}... ' f'session={session_id[:8]}... (remaining={remaining_sessions})' ) # Only remove target when NO sessions remain if remaining_sessions == 0: self.logger.debug(f'[SessionManager] No sessions remain for target {target_id[:8]}..., removing from pool') target_fully_removed = True # Check if agent_focus points to this target agent_focus_lost = ( self.browser_session.agent_focus and self.browser_session.agent_focus.target_id == target_id ) # Remove from pool if target_id in self.browser_session._cdp_session_pool: self.browser_session._cdp_session_pool.pop(target_id) self.logger.debug( f'[SessionManager] Removed target {target_id[:8]}... from pool ' f'(pool size: {len(self.browser_session._cdp_session_pool)})' ) # Clean up tracking del self._target_sessions[target_id] else: # Target not tracked - already removed or never attached self.logger.debug( f'[SessionManager] Session detached from untracked target: target={target_id[:8]}... ' f'session={session_id[:8]}... (target was already removed or attach event was missed)' ) # Get target type before cleaning up cache (needed for TabClosedEvent dispatch) target_type = self._target_types.get(target_id) # Clean up target type cache if target fully removed if target_id not in self._target_sessions and target_id in self._target_types: del self._target_types[target_id] # Remove from reverse mapping if session_id in self._session_to_target: del self._session_to_target[session_id] # Dispatch TabClosedEvent only for page/tab targets that are fully removed (not iframes/workers or partial detaches) if target_fully_removed: if target_type in ('page', 'tab'): from browser_use.browser.events import TabClosedEvent self.browser_session.event_bus.dispatch(TabClosedEvent(target_id=target_id)) self.logger.debug(f'[SessionManager] Dispatched TabClosedEvent for page target {target_id[:8]}...') elif target_type: self.logger.debug( f'[SessionManager] Target {target_id[:8]}... fully removed (type={target_type}) - not dispatching TabClosedEvent' ) # Auto-recover agent_focus outside the lock to avoid blocking other operations if agent_focus_lost: await self._recover_agent_focus(target_id) async def _recover_agent_focus(self, crashed_target_id: TargetID) -> None: """Auto-recover agent_focus when the focused target crashes/detaches. Uses recovery lock to prevent concurrent recovery attempts from creating multiple emergency tabs. Args: crashed_target_id: The target ID that was lost """ # Prevent concurrent recovery attempts async with self._recovery_lock: # Check if another recovery already fixed agent_focus if self.browser_session.agent_focus and self.browser_session.agent_focus.target_id != crashed_target_id: self.logger.debug( f'[SessionManager] Agent focus already recovered by concurrent operation ' f'(now: {self.browser_session.agent_focus.target_id[:8]}...), skipping recovery' ) return self.logger.warning( f'[SessionManager] Agent focus target {crashed_target_id[:8]}... detached! ' f'Auto-recovering by switching to another target...' ) try: # Try to find another valid page target all_pages = await self.browser_session._cdp_get_all_pages() new_target_id = None is_existing_tab = False if all_pages: # Switch to most recent page that's not the crashed one new_target_id = all_pages[-1]['targetId'] is_existing_tab = True self.logger.info(f'[SessionManager] Switching agent_focus to existing tab {new_target_id[:8]}...') else: # No pages exist - create a new one self.logger.warning('[SessionManager] No tabs remain! Creating new tab for agent...') new_target_id = await self.browser_session._cdp_create_new_page('about:blank') self.logger.info(f'[SessionManager] Created new tab {new_target_id[:8]}... for agent') # Dispatch TabCreatedEvent so watchdogs can initialize from browser_use.browser.events import TabCreatedEvent self.browser_session.event_bus.dispatch(TabCreatedEvent(url='about:blank', target_id=new_target_id)) # Wait for attach event to create session, then update agent_focus new_session = None for attempt in range(20): # Wait up to 2 seconds await asyncio.sleep(0.1) new_session = await self.get_session_for_target(new_target_id) if new_session: break if new_session: self.browser_session.agent_focus = new_session self.logger.info(f'[SessionManager] ✅ Agent focus recovered: {new_target_id[:8]}...') # Visually activate the tab in browser (only for existing tabs) if is_existing_tab: try: assert self.browser_session._cdp_client_root is not None await self.browser_session._cdp_client_root.send.Target.activateTarget(params={'targetId': new_target_id}) self.logger.debug(f'[SessionManager] Activated tab {new_target_id[:8]}... in browser UI') except Exception as e: self.logger.debug(f'[SessionManager] Failed to activate tab visually: {e}') # Dispatch focus changed event from browser_use.browser.events import AgentFocusChangedEvent self.browser_session.event_bus.dispatch(AgentFocusChangedEvent(target_id=new_target_id, url=new_session.url)) return # Recovery failed - create emergency fallback tab self.logger.error( f'[SessionManager] ❌ Failed to get session for {new_target_id[:8]}... after 2s, creating emergency fallback tab' ) fallback_target_id = await self.browser_session._cdp_create_new_page('about:blank') self.logger.warning(f'[SessionManager] Created emergency fallback tab {fallback_target_id[:8]}...') # Try one more time with fallback for _ in range(20): await asyncio.sleep(0.1) fallback_session = await self.get_session_for_target(fallback_target_id) if fallback_session: self.browser_session.agent_focus = fallback_session self.logger.warning(f'[SessionManager] ⚠️ Agent focus set to emergency fallback: {fallback_target_id[:8]}...') from browser_use.browser.events import AgentFocusChangedEvent, TabCreatedEvent self.browser_session.event_bus.dispatch(TabCreatedEvent(url='about:blank', target_id=fallback_target_id)) self.browser_session.event_bus.dispatch( AgentFocusChangedEvent(target_id=fallback_target_id, url='about:blank') ) return # Complete failure - this should never happen self.logger.critical( '[SessionManager] 🚨 CRITICAL: Failed to recover agent_focus even with fallback! Agent may be in broken state.' ) except Exception as e: self.logger.error(f'[SessionManager] ❌ Error during agent_focus recovery: {type(e).__name__}: {e}')