Speedofmastery's picture
Merge Landrun + Browser-Use + Chromium with AI agent support (without binary files)
d7b3d84
"""Event-driven CDP session management.
Manages CDP sessions by listening to Target.attachedToTarget and Target.detachedFromTarget
events, ensuring the session pool always reflects the current browser state.
"""
import asyncio
from typing import TYPE_CHECKING
from cdp_use.cdp.target import AttachedToTargetEvent, DetachedFromTargetEvent, SessionID, TargetID
if TYPE_CHECKING:
from browser_use.browser.session import BrowserSession, CDPSession
class SessionManager:
"""Event-driven CDP session manager.
Automatically synchronizes the CDP session pool with browser state via CDP events.
Key features:
- Sessions added/removed automatically via Target attach/detach events
- Multiple sessions can attach to the same target
- Targets only removed when ALL sessions detach
- No stale sessions - pool always reflects browser reality
"""
def __init__(self, browser_session: 'BrowserSession'):
self.browser_session = browser_session
self.logger = browser_session.logger
# Target -> set of sessions attached to it
self._target_sessions: dict[TargetID, set[SessionID]] = {}
# Session -> target mapping for reverse lookup
self._session_to_target: dict[SessionID, TargetID] = {}
# Target -> type cache (page, iframe, worker, etc.) - types are immutable
self._target_types: dict[TargetID, str] = {}
# Lock for thread-safe access
self._lock = asyncio.Lock()
# Lock for recovery to prevent concurrent recovery attempts
self._recovery_lock = asyncio.Lock()
async def start_monitoring(self) -> None:
"""Start monitoring Target attach/detach events.
Registers CDP event handlers to keep the session pool synchronized with browser state.
"""
if not self.browser_session._cdp_client_root:
raise RuntimeError('CDP client not initialized')
# Capture cdp_client_root in closure to avoid type errors
cdp_client = self.browser_session._cdp_client_root
# Register synchronous event handlers (CDP requirement)
def on_attached(event: AttachedToTargetEvent, session_id: SessionID | None = None):
event_session_id = event['sessionId']
target_type = event['targetInfo'].get('type', 'unknown')
# Enable auto-attach for this session's children
async def _enable_auto_attach():
try:
await cdp_client.send.Target.setAutoAttach(
params={'autoAttach': True, 'waitForDebuggerOnStart': False, 'flatten': True}, session_id=event_session_id
)
self.logger.debug(f'[SessionManager] Auto-attach enabled for {target_type} session {event_session_id[:8]}...')
except Exception as e:
error_str = str(e)
# Expected for short-lived targets (workers, temp iframes) that detach before task executes
if '-32001' in error_str or 'Session with given id not found' in error_str:
self.logger.debug(
f'[SessionManager] Auto-attach skipped for {target_type} session {event_session_id[:8]}... '
f'(already detached - normal for short-lived targets)'
)
else:
self.logger.debug(f'[SessionManager] Auto-attach failed for {target_type}: {e}')
# Schedule auto-attach and pool management
asyncio.create_task(_enable_auto_attach())
asyncio.create_task(self._handle_target_attached(event))
def on_detached(event: DetachedFromTargetEvent, session_id: SessionID | None = None):
asyncio.create_task(self._handle_target_detached(event))
self.browser_session._cdp_client_root.register.Target.attachedToTarget(on_attached)
self.browser_session._cdp_client_root.register.Target.detachedFromTarget(on_detached)
self.logger.debug('[SessionManager] Event monitoring started')
async def get_session_for_target(self, target_id: TargetID) -> 'CDPSession | None':
"""Get the current valid session for a target.
Args:
target_id: Target ID to get session for
Returns:
CDPSession if exists, None if target has detached
"""
async with self._lock:
return self.browser_session._cdp_session_pool.get(target_id)
async def validate_session(self, target_id: TargetID) -> bool:
"""Check if a target still has active sessions.
Args:
target_id: Target ID to validate
Returns:
True if target has active sessions, False if it should be removed
"""
async with self._lock:
if target_id not in self._target_sessions:
return False
return len(self._target_sessions[target_id]) > 0
async def clear(self) -> None:
"""Clear all session tracking for cleanup."""
async with self._lock:
self._target_sessions.clear()
self._session_to_target.clear()
self._target_types.clear()
self.logger.info('[SessionManager] Cleared all session tracking')
async def is_target_valid(self, target_id: TargetID) -> bool:
"""Check if a target is still valid and has active sessions.
Args:
target_id: Target ID to validate
Returns:
True if target is valid and has active sessions, False otherwise
"""
async with self._lock:
if target_id not in self._target_sessions:
return False
return len(self._target_sessions[target_id]) > 0
async def _handle_target_attached(self, event: AttachedToTargetEvent) -> None:
"""Handle Target.attachedToTarget event.
Called automatically by Chrome when a new target/session is created.
This is the ONLY place where sessions are added to the pool.
"""
target_id = event['targetInfo']['targetId']
session_id = event['sessionId']
target_type = event['targetInfo']['type']
waiting_for_debugger = event.get('waitingForDebugger', False)
self.logger.debug(
f'[SessionManager] Target attached: {target_id[:8]}... (session={session_id[:8]}..., '
f'type={target_type}, waitingForDebugger={waiting_for_debugger})'
)
async with self._lock:
# Track this session for the target
if target_id not in self._target_sessions:
self._target_sessions[target_id] = set()
self._target_sessions[target_id].add(session_id)
self._session_to_target[session_id] = target_id
# Cache target type (immutable, set once)
if target_id not in self._target_types:
self._target_types[target_id] = target_type
# Create CDPSession wrapper and add to pool
if target_id not in self.browser_session._cdp_session_pool:
from browser_use.browser.session import CDPSession
assert self.browser_session._cdp_client_root is not None, 'Root CDP client required'
cdp_session = CDPSession(
cdp_client=self.browser_session._cdp_client_root,
target_id=target_id,
session_id=session_id,
title=event['targetInfo'].get('title', 'Unknown title'),
url=event['targetInfo'].get('url', 'about:blank'),
)
self.browser_session._cdp_session_pool[target_id] = cdp_session
self.logger.debug(
f'[SessionManager] Created session for target {target_id[:8]}... '
f'(pool size: {len(self.browser_session._cdp_session_pool)})'
)
else:
# Update existing session with new session_id
existing = self.browser_session._cdp_session_pool[target_id]
existing.session_id = session_id
existing.title = event['targetInfo'].get('title', existing.title)
existing.url = event['targetInfo'].get('url', existing.url)
# Resume execution if waiting for debugger
if waiting_for_debugger:
try:
assert self.browser_session._cdp_client_root is not None
await self.browser_session._cdp_client_root.send.Runtime.runIfWaitingForDebugger(session_id=session_id)
self.logger.debug(f'[SessionManager] Resumed execution for session {session_id[:8]}...')
except Exception as e:
self.logger.warning(f'[SessionManager] Failed to resume execution: {e}')
async def _handle_target_detached(self, event: DetachedFromTargetEvent) -> None:
"""Handle Target.detachedFromTarget event.
Called automatically by Chrome when a target/session is destroyed.
This is the ONLY place where sessions are removed from the pool.
"""
session_id = event['sessionId']
target_id = event.get('targetId') # May be empty
# If targetId not in event, look it up via session mapping
if not target_id:
async with self._lock:
target_id = self._session_to_target.get(session_id)
if not target_id:
self.logger.warning(f'[SessionManager] Session detached but target unknown (session={session_id[:8]}...)')
return
agent_focus_lost = False
target_fully_removed = False
target_type = None
async with self._lock:
# Remove this session from target's session set
if target_id in self._target_sessions:
self._target_sessions[target_id].discard(session_id)
remaining_sessions = len(self._target_sessions[target_id])
self.logger.debug(
f'[SessionManager] Session detached: target={target_id[:8]}... '
f'session={session_id[:8]}... (remaining={remaining_sessions})'
)
# Only remove target when NO sessions remain
if remaining_sessions == 0:
self.logger.debug(f'[SessionManager] No sessions remain for target {target_id[:8]}..., removing from pool')
target_fully_removed = True
# Check if agent_focus points to this target
agent_focus_lost = (
self.browser_session.agent_focus and self.browser_session.agent_focus.target_id == target_id
)
# Remove from pool
if target_id in self.browser_session._cdp_session_pool:
self.browser_session._cdp_session_pool.pop(target_id)
self.logger.debug(
f'[SessionManager] Removed target {target_id[:8]}... from pool '
f'(pool size: {len(self.browser_session._cdp_session_pool)})'
)
# Clean up tracking
del self._target_sessions[target_id]
else:
# Target not tracked - already removed or never attached
self.logger.debug(
f'[SessionManager] Session detached from untracked target: target={target_id[:8]}... '
f'session={session_id[:8]}... (target was already removed or attach event was missed)'
)
# Get target type before cleaning up cache (needed for TabClosedEvent dispatch)
target_type = self._target_types.get(target_id)
# Clean up target type cache if target fully removed
if target_id not in self._target_sessions and target_id in self._target_types:
del self._target_types[target_id]
# Remove from reverse mapping
if session_id in self._session_to_target:
del self._session_to_target[session_id]
# Dispatch TabClosedEvent only for page/tab targets that are fully removed (not iframes/workers or partial detaches)
if target_fully_removed:
if target_type in ('page', 'tab'):
from browser_use.browser.events import TabClosedEvent
self.browser_session.event_bus.dispatch(TabClosedEvent(target_id=target_id))
self.logger.debug(f'[SessionManager] Dispatched TabClosedEvent for page target {target_id[:8]}...')
elif target_type:
self.logger.debug(
f'[SessionManager] Target {target_id[:8]}... fully removed (type={target_type}) - not dispatching TabClosedEvent'
)
# Auto-recover agent_focus outside the lock to avoid blocking other operations
if agent_focus_lost:
await self._recover_agent_focus(target_id)
async def _recover_agent_focus(self, crashed_target_id: TargetID) -> None:
"""Auto-recover agent_focus when the focused target crashes/detaches.
Uses recovery lock to prevent concurrent recovery attempts from creating multiple emergency tabs.
Args:
crashed_target_id: The target ID that was lost
"""
# Prevent concurrent recovery attempts
async with self._recovery_lock:
# Check if another recovery already fixed agent_focus
if self.browser_session.agent_focus and self.browser_session.agent_focus.target_id != crashed_target_id:
self.logger.debug(
f'[SessionManager] Agent focus already recovered by concurrent operation '
f'(now: {self.browser_session.agent_focus.target_id[:8]}...), skipping recovery'
)
return
self.logger.warning(
f'[SessionManager] Agent focus target {crashed_target_id[:8]}... detached! '
f'Auto-recovering by switching to another target...'
)
try:
# Try to find another valid page target
all_pages = await self.browser_session._cdp_get_all_pages()
new_target_id = None
is_existing_tab = False
if all_pages:
# Switch to most recent page that's not the crashed one
new_target_id = all_pages[-1]['targetId']
is_existing_tab = True
self.logger.info(f'[SessionManager] Switching agent_focus to existing tab {new_target_id[:8]}...')
else:
# No pages exist - create a new one
self.logger.warning('[SessionManager] No tabs remain! Creating new tab for agent...')
new_target_id = await self.browser_session._cdp_create_new_page('about:blank')
self.logger.info(f'[SessionManager] Created new tab {new_target_id[:8]}... for agent')
# Dispatch TabCreatedEvent so watchdogs can initialize
from browser_use.browser.events import TabCreatedEvent
self.browser_session.event_bus.dispatch(TabCreatedEvent(url='about:blank', target_id=new_target_id))
# Wait for attach event to create session, then update agent_focus
new_session = None
for attempt in range(20): # Wait up to 2 seconds
await asyncio.sleep(0.1)
new_session = await self.get_session_for_target(new_target_id)
if new_session:
break
if new_session:
self.browser_session.agent_focus = new_session
self.logger.info(f'[SessionManager] ✅ Agent focus recovered: {new_target_id[:8]}...')
# Visually activate the tab in browser (only for existing tabs)
if is_existing_tab:
try:
assert self.browser_session._cdp_client_root is not None
await self.browser_session._cdp_client_root.send.Target.activateTarget(params={'targetId': new_target_id})
self.logger.debug(f'[SessionManager] Activated tab {new_target_id[:8]}... in browser UI')
except Exception as e:
self.logger.debug(f'[SessionManager] Failed to activate tab visually: {e}')
# Dispatch focus changed event
from browser_use.browser.events import AgentFocusChangedEvent
self.browser_session.event_bus.dispatch(AgentFocusChangedEvent(target_id=new_target_id, url=new_session.url))
return
# Recovery failed - create emergency fallback tab
self.logger.error(
f'[SessionManager] ❌ Failed to get session for {new_target_id[:8]}... after 2s, creating emergency fallback tab'
)
fallback_target_id = await self.browser_session._cdp_create_new_page('about:blank')
self.logger.warning(f'[SessionManager] Created emergency fallback tab {fallback_target_id[:8]}...')
# Try one more time with fallback
for _ in range(20):
await asyncio.sleep(0.1)
fallback_session = await self.get_session_for_target(fallback_target_id)
if fallback_session:
self.browser_session.agent_focus = fallback_session
self.logger.warning(f'[SessionManager] ⚠️ Agent focus set to emergency fallback: {fallback_target_id[:8]}...')
from browser_use.browser.events import AgentFocusChangedEvent, TabCreatedEvent
self.browser_session.event_bus.dispatch(TabCreatedEvent(url='about:blank', target_id=fallback_target_id))
self.browser_session.event_bus.dispatch(
AgentFocusChangedEvent(target_id=fallback_target_id, url='about:blank')
)
return
# Complete failure - this should never happen
self.logger.critical(
'[SessionManager] 🚨 CRITICAL: Failed to recover agent_focus even with fallback! Agent may be in broken state.'
)
except Exception as e:
self.logger.error(f'[SessionManager] ❌ Error during agent_focus recovery: {type(e).__name__}: {e}')