# --- browser_utils/initialization/core.py --- import asyncio import logging import os from typing import Any, Dict, Optional, Tuple from playwright.async_api import ( Browser as AsyncBrowser, ) from playwright.async_api import ( BrowserContext as AsyncBrowserContext, ) from playwright.async_api import ( Error as PlaywrightAsyncError, ) from playwright.async_api import ( Page as AsyncPage, ) from playwright.async_api import ( expect as expect_async, ) from config import ( AI_STUDIO_URL_PATTERN, INPUT_SELECTOR, MODEL_NAME_SELECTOR, USER_INPUT_END_MARKER_SERVER, USER_INPUT_START_MARKER_SERVER, GlobalState, ) from config.selector_utils import ( INPUT_WRAPPER_SELECTORS, ) from .auth import wait_for_model_list_and_handle_auth_save from .debug import setup_debug_listeners from .network import setup_network_interception_and_scripts logger = logging.getLogger("AIStudioProxyServer") async def _wait_for_shutdown(): """Helper to wait for GlobalState.IS_SHUTTING_DOWN event.""" loop = asyncio.get_running_loop() await loop.run_in_executor(None, GlobalState.IS_SHUTTING_DOWN.wait) async def initialize_page_logic( # pragma: no cover browser: AsyncBrowser, storage_state_path: Optional[str] = None ) -> Tuple[AsyncPage, bool]: """ Initialize page logic, connecting to existing browser Args: browser: Playwright browser instance storage_state_path: Optional authentication file path. If provided, it will be prioritized. """ logger.debug("[Init] Initializing page logic") temp_context: Optional[AsyncBrowserContext] = None storage_state_path_to_use: Optional[str] = None launch_mode = os.environ.get("LAUNCH_MODE", "debug") loop = asyncio.get_running_loop() # Prioritize the passed storage_state_path if storage_state_path: if os.path.exists(storage_state_path): storage_state_path_to_use = storage_state_path logger.debug(f"Using specified auth file: {storage_state_path_to_use}") else: logger.error(f"Specified auth file does not exist: {storage_state_path}") # If it's a clearly specified path but does not exist, it should be an error raise RuntimeError( f"Specified auth file does not exist: {storage_state_path}" ) else: # Fall back to existing environment variable logic if launch_mode == "headless" or launch_mode == "virtual_headless": # Check for Auto-Auth Rotation on Startup if ( os.environ.get("AUTO_AUTH_ROTATION_ON_STARTUP", "false").lower() == "true" ): logger.info( " 🤖 Auto-Auth Rotation on Startup is ENABLED. Selecting profile..." ) try: # Local import to avoid circular dependencies from browser_utils.auth_rotation import ( _get_next_profile, check_profile_cookie_health, ) next_profile = _get_next_profile() if next_profile: os.environ["ACTIVE_AUTH_JSON_PATH"] = next_profile logger.info(f" ✅ Auto-selected profile: {next_profile}") # Check cookie health of selected profile health = check_profile_cookie_health(next_profile) if health["health_status"] == "critical": logger.warning( " ⚠️ Selected profile has expired authentication cookies. " "Consider refreshing by logging in again in debug mode." ) else: logger.warning( " ⚠️ Auto-Auth Rotation: No available profiles found. Continuing with environment defaults." ) except ImportError: logger.error( " ❌ Auto-Auth Rotation failed: Could not import auth_rotation module." ) except Exception as e: logger.error( f" ❌ Error during Auto-Auth Rotation on Startup: {e}", exc_info=True, ) auth_filename = os.environ.get("ACTIVE_AUTH_JSON_PATH") logger.info( f"[DEBUG] Headless Init: ACTIVE_AUTH_JSON_PATH='{auth_filename}'" ) if auth_filename: constructed_path = auth_filename if os.path.exists(constructed_path): storage_state_path_to_use = constructed_path else: logger.error( f"{launch_mode} mode auth file invalid or does not exist: '{constructed_path}'" ) # DIAGNOSTIC: Check if we should have rotated logger.info( f"[DEBUG] Auth file missing. Auto-Rotation Flag: {os.environ.get('AUTO_AUTH_ROTATION_ON_STARTUP')}" ) raise RuntimeError( f"{launch_mode} mode auth file invalid: '{constructed_path}'" ) else: logger.error( f"{launch_mode} mode requires ACTIVE_AUTH_JSON_PATH environment variable, but it's not set or is empty." ) raise RuntimeError( f"{launch_mode} mode requires ACTIVE_AUTH_JSON_PATH." ) elif launch_mode == "debug": logger.info( "Debug mode: Attempting to load auth file from environment variable ACTIVE_AUTH_JSON_PATH..." ) auth_filepath_from_env = os.environ.get("ACTIVE_AUTH_JSON_PATH") if auth_filepath_from_env and os.path.exists(auth_filepath_from_env): storage_state_path_to_use = auth_filepath_from_env logger.debug( f"Auth file to be used in debug mode (from environment variable): {storage_state_path_to_use}" ) elif auth_filepath_from_env: logger.warning( f"The file pointed to by ACTIVE_AUTH_JSON_PATH in debug mode does not exist: '{auth_filepath_from_env}'. Auth file not loaded." ) else: logger.info( "Auth file not provided via environment variable in debug mode. Current browser state will be used." ) elif launch_mode == "direct_debug_no_browser": logger.info( "direct_debug_no_browser mode: storage_state not loaded, no browser operations performed." ) else: logger.warning( f"Warning: Unknown launch mode '{launch_mode}'. storage_state not loaded." ) try: # Consolidate into one log message auth_file = ( os.path.basename(storage_state_path_to_use) if storage_state_path_to_use else None ) context_options: Dict[str, Any] = {"viewport": {"width": 460, "height": 800}} if storage_state_path_to_use: context_options["storage_state"] = storage_state_path_to_use from api_utils.server_state import state state.current_auth_profile_path = storage_state_path_to_use logger.info( f" (Using storage_state='{os.path.basename(storage_state_path_to_use)}')" ) else: from api_utils.server_state import state state.current_auth_profile_path = None logger.info(" (Not using storage_state)") # Proxy settings need to be retrieved from the server module from api_utils.server_state import state if state.PLAYWRIGHT_PROXY_SETTINGS: context_options["proxy"] = state.PLAYWRIGHT_PROXY_SETTINGS logger.debug( f"[Browser] Context configured with proxy: {state.PLAYWRIGHT_PROXY_SETTINGS['server']}" ) context_options["ignore_https_errors"] = True # Single consolidated log if auth_file: logger.info(f"[Browser] Context created (Auth: {auth_file})") else: logger.debug("[Browser] Context created (No Auth)") temp_context = await browser.new_context(**context_options) # Set up network interception and script injection await setup_network_interception_and_scripts(temp_context) found_page: Optional[AsyncPage] = None pages = temp_context.pages target_url_base = f"https://{AI_STUDIO_URL_PATTERN}" target_full_url = f"{target_url_base}prompts/new_chat" login_url_pattern = "accounts.google.com" current_url = "" # Import _handle_model_list_response - delayed import to avoid circular dependency from browser_utils.operations import _handle_model_list_response for p_iter in pages: try: page_url_to_check = p_iter.url if ( not p_iter.is_closed() and target_url_base in page_url_to_check and "/prompts/" in page_url_to_check ): found_page = p_iter current_url = page_url_to_check logger.debug(f"Found opened AI Studio page: {current_url}") if found_page: logger.debug( f"Adding model list response listener to existing page {found_page.url}." ) found_page.on("response", _handle_model_list_response) # Setup debug listeners for error snapshots setup_debug_listeners(found_page) break except PlaywrightAsyncError as pw_err_url: logger.warning(f"Playwright error checking page URL: {pw_err_url}") except AttributeError as attr_err_url: logger.warning(f"Attribute error checking page URL: {attr_err_url}") except asyncio.CancelledError: raise except Exception as e_url_check: logger.warning( f" Other unexpected error checking page URL: {e_url_check} (Type: {type(e_url_check).__name__})" ) if not found_page: logger.info(f"[Navigation] Opening new page: {target_full_url}") found_page = await temp_context.new_page() if found_page: logger.debug( "Adding model list response listener to new page (before navigation)." ) found_page.on("response", _handle_model_list_response) # Setup debug listeners for error snapshots setup_debug_listeners(found_page) try: await found_page.goto( target_full_url, wait_until="domcontentloaded", timeout=90000 ) current_url = found_page.url logger.debug( f"New page navigation attempt complete. Current URL: {current_url}" ) except asyncio.CancelledError: raise except Exception as new_page_nav_err: # Import save_error_snapshot function from browser_utils.operations import save_error_snapshot await save_error_snapshot("init_new_page_nav_fail") error_str = str(new_page_nav_err) if "NS_ERROR_NET_INTERRUPT" in error_str: logger.error( "\n" + "=" * 30 + " Network Navigation Error Tips " + "=" * 30 ) logger.error( f"Navigation to '{target_full_url}' failed with network interrupt error (NS_ERROR_NET_INTERRUPT)." ) logger.error( "This usually means the connection was unexpectedly disconnected while the browser was trying to load the page." ) logger.error("Possible causes and troubleshooting suggestions:") logger.error( " 1. Network connection: Please check if your local network connection is stable and try to access the target URL in a normal browser." ) logger.error( " 2. AI Studio service: Confirm if the aistudio.google.com service itself is available." ) logger.error( " 3. Firewall/Proxy/VPN: Check local firewall, antivirus, proxy, or VPN settings." ) logger.error( " 4. Camoufox service: Confirm if the launch_camoufox.py script is running normally." ) logger.error( " 5. System resource issues: Ensure the system has enough memory and CPU resources." ) logger.error("=" * 74 + "\n") raise RuntimeError( f"Failed to navigate to new page: {new_page_nav_err}" ) from new_page_nav_err if login_url_pattern in current_url: if launch_mode == "headless": logger.error( "Detected redirect to login page in headless mode, authentication may have expired. Please update the auth file." ) raise RuntimeError( "Auth failed in headless mode, auth file update required." ) else: print(f"\n{'=' * 20} Action Required {'=' * 20}", flush=True) login_prompt = " Detected login may be required. If the browser shows a login page, please complete the Google login in the browser window, then press Enter here to continue..." # NEW: If SUPPRESS_LOGIN_WAIT is set, skip waiting for user input. if os.environ.get("SUPPRESS_LOGIN_WAIT", "").lower() in ( "1", "true", "yes", ): logger.info( "SUPPRESS_LOGIN_WAIT flag detected, skipping wait for user input." ) else: print(USER_INPUT_START_MARKER_SERVER, flush=True) await loop.run_in_executor(None, input, login_prompt) print(USER_INPUT_END_MARKER_SERVER, flush=True) logger.info("Checking login status...") try: await found_page.wait_for_url( f"**/{AI_STUDIO_URL_PATTERN}**", timeout=180000 ) current_url = found_page.url if login_url_pattern in current_url: logger.error( "Page still seems to be on the login page after manual login attempt." ) raise RuntimeError( "Still on login page after manual login attempt." ) logger.info( "Login successful! Please do not operate the browser window and wait for further instructions." ) # Call auth save logic after successful login if os.environ.get("AUTO_SAVE_AUTH", "false").lower() == "true": await wait_for_model_list_and_handle_auth_save( temp_context, launch_mode, loop ) except asyncio.CancelledError: raise except Exception as wait_login_err: from browser_utils.operations import save_error_snapshot await save_error_snapshot("init_login_wait_fail") logger.error( f"Failed to detect AI Studio URL after login prompt or error saving status: {wait_login_err}", exc_info=True, ) raise RuntimeError( f"Failed to detect AI Studio URL after login prompt: {wait_login_err}" ) from wait_login_err elif target_url_base not in current_url or "/prompts/" not in current_url: from browser_utils.operations import save_error_snapshot await save_error_snapshot("init_unexpected_page") logger.error( f"Unexpected page URL after initial navigation: {current_url}. Expected it to contain '{target_url_base}' and '/prompts/'." ) raise RuntimeError( f"Unexpected page after initial navigation: {current_url}." ) await found_page.bring_to_front() try: # Use centralized selector fallback logic to find input container # Supports current and old UI structures (ms-prompt-input-wrapper / ms-chunk-editor / ms-prompt-box) # Use find_first_visible_locator to wait for element visibility, solving timing issues in headless mode from config.selector_utils import find_first_visible_locator # Wrap in a way that respects the shutdown signal async def find_locator_task(): return await find_first_visible_locator( found_page, INPUT_WRAPPER_SELECTORS, description="Input Container", timeout_per_selector=30000, ) find_task = asyncio.create_task(find_locator_task()) shutdown_task = asyncio.create_task(_wait_for_shutdown()) done, pending = await asyncio.wait( [find_task, shutdown_task], return_when=asyncio.FIRST_COMPLETED ) if shutdown_task in done: logger.info( "🛑 Shutdown signal received during initialization. Aborting." ) find_task.cancel() raise RuntimeError("Initialization aborted due to shutdown signal.") shutdown_task.cancel() input_wrapper_locator, matched_selector = await find_task if not input_wrapper_locator: raise RuntimeError( "Could not find input container element. Tried selectors: " + ", ".join(INPUT_WRAPPER_SELECTORS) ) # Container confirmed visible by find_first_visible_locator, check input box directly await expect_async(found_page.locator(INPUT_SELECTOR)).to_be_visible( timeout=10000 ) logger.debug( f"[Selector] Input area located and visible ({matched_selector})" ) model_name_locator = found_page.locator(MODEL_NAME_SELECTOR) try: model_name_on_page = await model_name_locator.first.inner_text( timeout=5000 ) except PlaywrightAsyncError as e: logger.error(f"Error getting model name (model_name_locator): {e}") raise result_page_instance = found_page result_page_ready = True logger.info( f"[Page] Logic initialization successful | Current Model: {model_name_on_page}" ) return result_page_instance, result_page_ready except asyncio.CancelledError: raise except Exception as input_visible_err: from browser_utils.operations import save_error_snapshot await save_error_snapshot("init_fail_input_timeout") logger.error( f"Page initialization failed: core input area did not become visible within expected time. Last URL was {found_page.url}", exc_info=True, ) raise RuntimeError( f"Page initialization failed: core input area did not become visible within expected time. Last URL was {found_page.url}" ) from input_visible_err except asyncio.CancelledError: logger.warning("Page initialization cancelled.") if temp_context: try: await temp_context.close() except asyncio.CancelledError: raise except Exception: pass raise except Exception as e_init_page: logger.critical( f"Serious unexpected error during page logic initialization: {e_init_page}", exc_info=True, ) if temp_context: try: logger.info( " Attempting to close temporary browser context due to initialization error." ) # [ID-04] Optimize Browser Lifecycle Management: Add timeout to context close await asyncio.wait_for(temp_context.close(), timeout=2.0) logger.info(" Temporary browser context closed.") except asyncio.TimeoutError: logger.warning( " 🚨 [ID-04] Browser context close timeout (2s), skipping forced close to speed up shutdown." ) except asyncio.CancelledError: raise except Exception as close_err: logger.warning(f"Error closing temporary browser context: {close_err}") from browser_utils.operations import save_error_snapshot await save_error_snapshot("init_unexpected_error") raise RuntimeError( f"Unexpected page initialization error: {e_init_page}" ) from e_init_page async def close_page_logic() -> Tuple[None, bool]: # pragma: no cover """Close page logic""" # Need to access global variables from api_utils.server_state import state logger.info("--- Running page logic shutdown --- ") if state.page_instance and not state.page_instance.is_closed(): try: # [ID-04] Optimize Browser Lifecycle Management: 2-second timeout for graceful close await asyncio.wait_for(state.page_instance.close(), timeout=2.0) logger.info(" Page closed") except asyncio.TimeoutError: logger.warning( " 🚨 [ID-04] Browser page close timeout (2s), skipping forced close to speed up shutdown." ) except PlaywrightAsyncError as pw_err: logger.warning(f"Playwright error closing page: {pw_err}") except asyncio.CancelledError: raise except Exception as other_err: logger.error( f" Unexpected error closing page: {other_err} (Type: {type(other_err).__name__})", exc_info=True, ) state.page_instance = None state.is_page_ready = False logger.info("Page logic state reset.") return None, False async def signal_camoufox_shutdown() -> None: # pragma: no cover """Send shutdown signal to Camoufox server""" logger.info( "Attempting to send shutdown signal to Camoufox server (this feature may have been handled by parent process)..." ) ws_endpoint = os.environ.get("CAMOUFOX_WS_ENDPOINT") if not ws_endpoint: logger.warning( "Could not send shutdown signal: CAMOUFOX_WS_ENDPOINT environment variable not found." ) return # Need to access global browser instance from api_utils.server_state import state if not state.browser_instance or not state.browser_instance.is_connected(): logger.warning( "Browser instance disconnected or not initialized, skipping shutdown signal send." ) return try: await asyncio.sleep(0.2) logger.info("(Simulated) Shutdown signal handled.") except asyncio.CancelledError: raise except Exception as e: logger.error( f"Captured exception while sending shutdown signal: {e}", exc_info=True ) async def _is_temporary_chat_active(page: AsyncPage) -> bool: """Best-effort check for whether Temporary chat is currently enabled. Supports both the legacy toggle-class signal and the newer UI variants: - menu item checkmark (`data-test-incognito-checkmark`) - header indicator (`ms-incognito-mode-indicator`) """ try: # Newer UI: header indicator appears only when Temporary chat is active. indicator_locator = page.locator( "ms-incognito-mode-indicator [data-test-id='main-text'], " "ms-incognito-mode-indicator .main-text" ) indicator_count = await indicator_locator.count() for i in range(indicator_count): try: indicator_item = indicator_locator.nth(i) if not await indicator_item.is_visible(timeout=500): continue indicator_text = (await indicator_item.inner_text()).strip().lower() if indicator_text == "temporary chat": return True except Exception: continue # New menu UI: active state is represented by a checkmark inside the menu item. try: checkmark_locator = page.locator("[data-test-incognito-checkmark]") if await checkmark_locator.count() > 0: return True except Exception: pass # Menu button variants (legacy and gray-release UI). toggle_locator = page.locator( "button[data-test-incognito-toggle], " 'button[aria-label="Temporary chat toggle"], ' 'button[aria-label="Toggle temporary chat"]' ) if await toggle_locator.count() > 0: # Legacy/alternative signals. for attr_name in ("aria-pressed", "aria-checked"): try: attr_value = await toggle_locator.get_attribute(attr_name) if attr_value and attr_value.lower() == "true": return True except Exception: pass try: button_classes = await toggle_locator.get_attribute("class") or "" if "ms-button-active" in button_classes: return True except Exception: pass return False except Exception: return False async def enable_temporary_chat_mode(page: AsyncPage) -> bool: # pragma: no cover """ Check and enable "Temporary chat" mode in the AI Studio interface. Supports both direct UI visibility and collapsed menu visibility. """ incognito_selector = ( "button[data-test-incognito-toggle], " 'button[aria-label="Temporary chat toggle"], ' 'button[aria-label="Toggle temporary chat"]' ) menu_trigger_selector = 'button[aria-label="View more actions"]' incognito_locator = page.locator(incognito_selector) menu_trigger = page.locator(menu_trigger_selector) async def _close_menu_if_needed() -> None: """Best-effort menu cleanup that tolerates UI variants without a menu trigger.""" try: if await menu_trigger.count() == 0: return if await menu_trigger.is_visible(): if await menu_trigger.get_attribute("aria-expanded") == "true": logger.debug("[UI] Closing menu to restore UI state") await page.keyboard.press("Escape") except Exception: pass try: if await _is_temporary_chat_active(page): logger.debug("[UI] Temporary chat mode already active") await _close_menu_if_needed() return True # Fast path logger.debug("[UI] Searching for temporary chat button (Fast path)") try: await incognito_locator.wait_for(state="visible", timeout=3000) except Exception: # Fallback logger.debug("[UI] Button not visible, attempting to open menu") if await menu_trigger.is_visible(): await menu_trigger.click() # Wait for the menu item to appear await incognito_locator.wait_for(state="visible", timeout=5000) else: logger.warning("[UI] Neither button nor menu trigger found") return False if await _is_temporary_chat_active(page): logger.debug("[UI] Temporary chat mode already active") else: logger.debug("[UI] Enabling temporary chat mode") await incognito_locator.click(timeout=5000, force=True) await asyncio.sleep(1) enabled = await _is_temporary_chat_active(page) # Recovery: Close menu if still expanded await _close_menu_if_needed() return enabled except asyncio.CancelledError: raise except Exception as e: logger.warning(f"[UI] Error in temporary chat mode: {e}") # Final safety attempt to clear any stuck UI await _close_menu_if_needed() return await _is_temporary_chat_active(page)