Spaces:
Paused
Paused
| import asyncio | |
| from enum import Enum, auto | |
| from typing import Any, Callable, Dict, Optional | |
| from playwright.async_api import TimeoutError | |
| from playwright.async_api import expect as expect_async | |
| from browser_utils.operations import save_error_snapshot | |
| from browser_utils.thinking_normalizer import ( | |
| format_directive_log, | |
| normalize_reasoning_effort_with_stream_check, | |
| ) | |
| from config import ( | |
| CLICK_TIMEOUT_MS, | |
| DEFAULT_THINKING_LEVEL_FLASH, | |
| DEFAULT_THINKING_LEVEL_PRO, | |
| ENABLE_THINKING_MODE_TOGGLE_SELECTOR, | |
| SET_THINKING_BUDGET_TOGGLE_SELECTOR, | |
| THINKING_BUDGET_INPUT_SELECTOR, | |
| THINKING_BUDGET_TOGGLE_OLD_ROOT_SELECTOR, | |
| THINKING_BUDGET_TOGGLE_PARENT_SELECTOR, | |
| THINKING_LEVEL_OPTION_HIGH_SELECTOR, | |
| THINKING_LEVEL_OPTION_LOW_SELECTOR, | |
| THINKING_LEVEL_OPTION_MEDIUM_SELECTOR, | |
| THINKING_LEVEL_OPTION_MINIMAL_SELECTOR, | |
| THINKING_LEVEL_SELECT_SELECTOR, | |
| THINKING_MODE_TOGGLE_OLD_ROOT_SELECTOR, | |
| THINKING_MODE_TOGGLE_PARENT_SELECTOR, | |
| ) | |
| from models import ClientDisconnectedError | |
| from .base import BaseController | |
| class ThinkingCategory(Enum): | |
| """Model thinking capability categories.""" | |
| NON_THINKING = auto() # No thinking UI at all (gemini-2.0-*, gemini-1.5-*) | |
| THINKING_FLASH = auto() # Toggleable thinking mode + budget (gemini-2.5-flash*) | |
| THINKING_PRO = auto() # Always-on thinking, budget toggle/slider (gemini-2.5-pro*) | |
| THINKING_LEVEL = auto() # 2-level dropdown only (gemini-3-pro*) | |
| THINKING_LEVEL_FLASH = auto() # 4-level dropdown (gemini-3-flash*) | |
| class ThinkingController(BaseController): | |
| """Handles thinking mode and budget logic.""" | |
| async def _handle_thinking_budget( | |
| self, | |
| request_params: Dict[str, Any], | |
| page_params_cache: Dict[str, Any], | |
| params_cache_lock: asyncio.Lock, | |
| model_id_to_use: Optional[str], | |
| check_client_disconnected: Callable, | |
| is_streaming: bool = True, | |
| ): | |
| """Handle adjustments for thinking mode and budget.""" | |
| reasoning_effort = request_params.get("reasoning_effort") | |
| try: | |
| async with params_cache_lock: | |
| if ( | |
| "reasoning_effort" in page_params_cache | |
| and page_params_cache["reasoning_effort"] == reasoning_effort | |
| ): | |
| self.logger.debug( | |
| f"[Thinking] Reasoning effort {reasoning_effort} matches cache, skipping" | |
| ) | |
| return | |
| # Determine processing logic based on model category | |
| category = self._get_thinking_category(model_id_to_use) | |
| if category == ThinkingCategory.NON_THINKING: | |
| self.logger.debug( | |
| "[Thinking] This model does not support thinking mode, skipping config" | |
| ) | |
| page_params_cache["reasoning_effort"] = reasoning_effort | |
| return | |
| directive = normalize_reasoning_effort_with_stream_check( | |
| reasoning_effort, is_streaming | |
| ) | |
| self.logger.debug( | |
| f"[Thinking] Directive: {format_directive_log(directive)}" | |
| ) | |
| # More resilient level check: check if dropdown exists even if category doesn't strictly require it | |
| actually_has_dropdown = await self._has_thinking_dropdown() | |
| uses_level = ( | |
| category | |
| in ( | |
| ThinkingCategory.THINKING_LEVEL, | |
| ThinkingCategory.THINKING_LEVEL_FLASH, | |
| ) | |
| or actually_has_dropdown | |
| ) | |
| if actually_has_dropdown and category not in ( | |
| ThinkingCategory.THINKING_LEVEL, | |
| ThinkingCategory.THINKING_LEVEL_FLASH, | |
| ): | |
| self.logger.warning( | |
| f"[Thinking] Detected level dropdown for model category {category}. Switching to level-based logic." | |
| ) | |
| def _should_enable_from_raw(rv: Any) -> bool: | |
| try: | |
| if isinstance(rv, str): | |
| rs = rv.strip().lower() | |
| if rs in ["high", "medium", "low", "minimal", "-1"]: | |
| return True | |
| if rs == "none": | |
| return False | |
| v = int(rs) | |
| return v > 0 | |
| if isinstance(rv, int): | |
| return rv > 0 or rv == -1 | |
| except Exception: | |
| return False | |
| return False | |
| desired_enabled = directive.thinking_enabled or _should_enable_from_raw( | |
| reasoning_effort | |
| ) | |
| # Special logic: for models using levels (Gemini 3 Pro), if reasoning_effort is not specified, | |
| # we default to enabled (or at least check and apply default level) | |
| if reasoning_effort is None and uses_level: | |
| desired_enabled = True | |
| has_main_toggle = category == ThinkingCategory.THINKING_FLASH | |
| if has_main_toggle: | |
| self.logger.info( | |
| f"Setting main thinking toggle to: {'ON' if desired_enabled else 'OFF'}" | |
| ) | |
| await self._control_thinking_mode_toggle( | |
| should_be_enabled=desired_enabled, | |
| check_client_disconnected=check_client_disconnected, | |
| ) | |
| else: | |
| self.logger.info( | |
| "This model has no main thinking toggle, skipping toggle setting." | |
| ) | |
| if not desired_enabled: | |
| # Skip models without budget toggle | |
| if category in ( | |
| ThinkingCategory.THINKING_LEVEL, | |
| ThinkingCategory.THINKING_LEVEL_FLASH, | |
| ): | |
| page_params_cache["reasoning_effort"] = reasoning_effort | |
| return | |
| # Flash/Flash Lite models: after turning off main thinking toggle, budget toggle is hidden | |
| if has_main_toggle: | |
| self.logger.info( | |
| "Flash model main thinking toggle turned off, skipping budget toggle operation (hidden)" | |
| ) | |
| page_params_cache["reasoning_effort"] = reasoning_effort | |
| return | |
| # If thinking is disabled, ensure budget toggle is off (legacy UI compatibility) | |
| await self._control_thinking_budget_toggle( | |
| should_be_checked=False, | |
| check_client_disconnected=check_client_disconnected, | |
| ) | |
| page_params_cache["reasoning_effort"] = reasoning_effort | |
| return | |
| # 2) Thinking enabled: Set level or budget based on model type | |
| if uses_level: | |
| rv = reasoning_effort | |
| level_to_set = None | |
| is_flash_4_level = category == ThinkingCategory.THINKING_LEVEL_FLASH | |
| if isinstance(rv, str): | |
| rs = rv.strip().lower() | |
| if is_flash_4_level: | |
| # Gemini 3 Flash: 4 levels (minimal, low, medium, high) | |
| if rs in ["minimal", "low", "medium", "high"]: | |
| level_to_set = rs | |
| elif rs in ["none", "-1"]: | |
| level_to_set = "high" | |
| else: | |
| try: | |
| v = int(rs) | |
| if v >= 16000: | |
| level_to_set = "high" | |
| elif v >= 8000: | |
| level_to_set = "medium" | |
| elif v >= 1024: | |
| level_to_set = "low" | |
| else: | |
| level_to_set = "minimal" | |
| except Exception: | |
| level_to_set = None | |
| else: | |
| # Gemini 3 Pro: 2 levels (low, high) | |
| if rs == "low" or rs == "minimal": | |
| level_to_set = "low" | |
| elif rs in ["high", "medium", "none", "-1"]: | |
| level_to_set = "high" | |
| else: | |
| try: | |
| v = int(rs) | |
| level_to_set = "high" if v >= 8000 else "low" | |
| except Exception: | |
| level_to_set = None | |
| elif isinstance(rv, int): | |
| if is_flash_4_level: | |
| # Gemini 3 Flash: 4 levels | |
| if rv >= 16000 or rv == -1: | |
| level_to_set = "high" | |
| elif rv >= 8000: | |
| level_to_set = "medium" | |
| elif rv >= 1024: | |
| level_to_set = "low" | |
| else: | |
| level_to_set = "minimal" | |
| else: | |
| # Gemini 3 Pro: 2 levels | |
| level_to_set = "high" if rv >= 8000 or rv == -1 else "low" | |
| if level_to_set is None and rv is None: | |
| # Use model-specific default | |
| level_to_set = ( | |
| DEFAULT_THINKING_LEVEL_FLASH | |
| if is_flash_4_level | |
| else DEFAULT_THINKING_LEVEL_PRO | |
| ) | |
| # Ensure Pro only gets valid levels (high/low) | |
| if not is_flash_4_level and level_to_set not in ["high", "low"]: | |
| level_to_set = ( | |
| "high" if level_to_set in ["high", "medium"] else "low" | |
| ) | |
| if level_to_set is None: | |
| self.logger.info( | |
| "Unable to parse reasoning level, keeping current level." | |
| ) | |
| else: | |
| await self._set_thinking_level( | |
| level_to_set, check_client_disconnected | |
| ) | |
| page_params_cache["reasoning_effort"] = reasoning_effort | |
| return | |
| # Fallback path | |
| if desired_enabled and not directive.thinking_enabled: | |
| self.logger.info("Attempting to turn off main thinking toggle...") | |
| success = await self._control_thinking_mode_toggle( | |
| should_be_enabled=False, | |
| check_client_disconnected=check_client_disconnected, | |
| ) | |
| if not success: | |
| self.logger.warning( | |
| "Main thinking toggle unavailable, using fallback: Setting budget to 0" | |
| ) | |
| await self._control_thinking_budget_toggle( | |
| should_be_checked=True, | |
| check_client_disconnected=check_client_disconnected, | |
| ) | |
| await self._set_thinking_budget_value( | |
| 0, check_client_disconnected | |
| ) | |
| page_params_cache["reasoning_effort"] = reasoning_effort | |
| return | |
| # Scenario 2 & 3: Enable thinking mode | |
| if not has_main_toggle: | |
| self.logger.info("Enabling main thinking toggle...") | |
| await self._control_thinking_mode_toggle( | |
| should_be_enabled=True, | |
| check_client_disconnected=check_client_disconnected, | |
| ) | |
| # Scenario 2: Enable thinking, no budget limit | |
| if not directive.budget_enabled: | |
| self.logger.info("Disabling manual budget limit...") | |
| await self._control_thinking_budget_toggle( | |
| should_be_checked=False, | |
| check_client_disconnected=check_client_disconnected, | |
| ) | |
| # Scenario 3: Enable thinking, with budget limit | |
| else: | |
| value_to_set = directive.budget_value or 0 | |
| model_lower = (model_id_to_use or "").lower() | |
| if "gemini-2.5-pro" in model_lower: | |
| value_to_set = min(value_to_set, 32768) | |
| elif "flash-lite" in model_lower: | |
| value_to_set = min(value_to_set, 24576) | |
| elif "flash" in model_lower: | |
| value_to_set = min(value_to_set, 24576) | |
| self.logger.info( | |
| f"Enabling manual budget limit and setting budget value: {value_to_set} tokens" | |
| ) | |
| await self._control_thinking_budget_toggle( | |
| should_be_checked=True, | |
| check_client_disconnected=check_client_disconnected, | |
| ) | |
| await self._set_thinking_budget_value( | |
| value_to_set, check_client_disconnected | |
| ) | |
| page_params_cache["reasoning_effort"] = reasoning_effort | |
| except asyncio.CancelledError: | |
| self.logger.info( | |
| f"[{self.req_id}] Thinking budget adjustment task was cancelled." | |
| ) | |
| raise | |
| async def _has_thinking_dropdown(self) -> bool: | |
| try: | |
| locator = self.page.locator(THINKING_LEVEL_SELECT_SELECTOR) | |
| count = await locator.count() | |
| if count == 0: | |
| return False | |
| try: | |
| await expect_async(locator.first).to_be_visible(timeout=2000) | |
| return True | |
| except asyncio.CancelledError: | |
| self.logger.info(f"[{self.req_id}] Thinking dropdown check cancelled.") | |
| raise | |
| except Exception: | |
| return True | |
| except asyncio.CancelledError: | |
| self.logger.info(f"[{self.req_id}] Thinking dropdown check cancelled.") | |
| raise | |
| except Exception: | |
| return False | |
| def _get_thinking_category(self, model_id: Optional[str]) -> ThinkingCategory: | |
| """Return thinking category based on model ID.""" | |
| if not model_id: | |
| return ThinkingCategory.NON_THINKING | |
| mid = model_id.lower() | |
| if "gemini-3" in mid and "flash" in mid: | |
| return ThinkingCategory.THINKING_LEVEL_FLASH | |
| if "gemini-3" in mid and "pro" in mid: | |
| return ThinkingCategory.THINKING_LEVEL | |
| if "gemini-2.5-pro" in mid: | |
| return ThinkingCategory.THINKING_PRO | |
| if "gemini-2.5-flash" in mid: | |
| return ThinkingCategory.THINKING_FLASH | |
| if mid == "gemini-flash-latest" or mid == "gemini-flash-lite-latest": | |
| return ThinkingCategory.THINKING_FLASH | |
| return ThinkingCategory.NON_THINKING | |
| async def _set_thinking_level( | |
| self, level: str, check_client_disconnected: Callable | |
| ): | |
| """Set thinking level in the dropdown.""" | |
| level_lower = level.lower() | |
| if level_lower == "high": | |
| target_option_selector = THINKING_LEVEL_OPTION_HIGH_SELECTOR | |
| elif level_lower == "medium": | |
| target_option_selector = THINKING_LEVEL_OPTION_MEDIUM_SELECTOR | |
| elif level_lower == "low": | |
| target_option_selector = THINKING_LEVEL_OPTION_LOW_SELECTOR | |
| elif level_lower == "minimal": | |
| target_option_selector = THINKING_LEVEL_OPTION_MINIMAL_SELECTOR | |
| else: | |
| target_option_selector = THINKING_LEVEL_OPTION_HIGH_SELECTOR | |
| try: | |
| trigger = self.page.locator(THINKING_LEVEL_SELECT_SELECTOR) | |
| await expect_async(trigger).to_be_visible(timeout=5000) | |
| await trigger.scroll_into_view_if_needed() | |
| await trigger.click(timeout=CLICK_TIMEOUT_MS) | |
| await self._check_disconnect( | |
| check_client_disconnected, "After opening Thinking Level" | |
| ) | |
| option = self.page.locator(target_option_selector) | |
| await expect_async(option).to_be_visible(timeout=5000) | |
| await option.click(timeout=CLICK_TIMEOUT_MS) | |
| await asyncio.sleep(0.2) | |
| try: | |
| await expect_async( | |
| self.page.locator( | |
| '[role="listbox"][aria-label="Thinking Level"], [role="listbox"][aria-label="Thinking level"]' | |
| ).first | |
| ).to_be_hidden(timeout=2000) | |
| except asyncio.CancelledError: | |
| self.logger.info(f"[{self.req_id}] Thinking level set cancelled.") | |
| raise | |
| except Exception: | |
| try: | |
| await self.page.keyboard.press("Escape") | |
| except Exception: | |
| pass | |
| await asyncio.sleep(0.1) | |
| value_text = await trigger.locator( | |
| ".mat-mdc-select-value-text .mat-mdc-select-min-line" | |
| ).inner_text(timeout=3000) | |
| if value_text.strip().lower() == level.lower(): | |
| self.logger.info(f"Thinking Level successfully set to {level}") | |
| else: | |
| self.logger.warning( | |
| f"Thinking Level verification failed, page value: {value_text}, expected: {level}" | |
| ) | |
| except Exception as e: | |
| if isinstance(e, asyncio.CancelledError): | |
| self.logger.info(f"[{self.req_id}] Thinking level set cancelled.") | |
| raise | |
| self.logger.error(f"Error setting Thinking Level: {e}") | |
| if isinstance(e, ClientDisconnectedError): | |
| raise | |
| async def _set_thinking_budget_value( | |
| self, token_budget: int, check_client_disconnected: Callable | |
| ): | |
| """Set specific thinking budget value.""" | |
| self.logger.info(f"Setting thinking budget value: {token_budget} tokens") | |
| budget_input_locator = self.page.locator(THINKING_BUDGET_INPUT_SELECTOR) | |
| try: | |
| await expect_async(budget_input_locator).to_be_visible(timeout=5000) | |
| await self._check_disconnect( | |
| check_client_disconnected, | |
| "Thinking budget adjustment - after input visible", | |
| ) | |
| adjusted_budget = token_budget | |
| try: | |
| await self.page.evaluate( | |
| "([selector, desired]) => {\n" | |
| " const num = Number(desired);\n" | |
| " const el = document.querySelector(selector);\n" | |
| " if (!el) return false;\n" | |
| " const container = el.closest('[data-test-slider]') || el.parentElement;\n" | |
| " const inputs = container ? container.querySelectorAll('input') : [el];\n" | |
| " const ranges = container ? container.querySelectorAll('input[type=\"range\"]') : [];\n" | |
| " inputs.forEach(inp => {\n" | |
| " try {\n" | |
| " if (Number.isFinite(num)) {\n" | |
| " const curMaxAttr = inp.getAttribute('max');\n" | |
| " const curMax = curMaxAttr ? Number(curMaxAttr) : undefined;\n" | |
| " if (curMax !== undefined && curMax < num) {\n" | |
| " inp.setAttribute('max', String(num));\n" | |
| " }\n" | |
| " if (inp.max && Number(inp.max) < num) {\n" | |
| " inp.max = String(num);\n" | |
| " }\n" | |
| " inp.value = String(num);\n" | |
| " inp.dispatchEvent(new Event('input', { bubbles: true }));\n" | |
| " inp.dispatchEvent(new Event('change', { bubbles: true }));\n" | |
| " inp.dispatchEvent(new Event('blur', { bubbles: true }));\n" | |
| " }\n" | |
| " } catch (_) {}\n" | |
| " });\n" | |
| " ranges.forEach(r => {\n" | |
| " try {\n" | |
| " if (Number.isFinite(num)) {\n" | |
| " const curMaxAttr = r.getAttribute('max');\n" | |
| " const curMax = curMaxAttr ? Number(curMaxAttr) : undefined;\n" | |
| " if (curMax !== undefined && curMax < num) {\n" | |
| " r.setAttribute('max', String(num));\n" | |
| " }\n" | |
| " if (r.max && Number(r.max) < num) {\n" | |
| " r.max = String(num);\n" | |
| " }\n" | |
| " r.value = String(num);\n" | |
| " r.dispatchEvent(new Event('input', { bubbles: true }));\n" | |
| " r.dispatchEvent(new Event('change', { bubbles: true }));\n" | |
| " }\n" | |
| " } catch (_) {}\n" | |
| " });\n" | |
| " return true;\n" | |
| "}", | |
| [THINKING_BUDGET_INPUT_SELECTOR, adjusted_budget], | |
| ) | |
| except asyncio.CancelledError: | |
| self.logger.info( | |
| f"[{self.req_id}] Thinking budget value set cancelled." | |
| ) | |
| raise | |
| except Exception: | |
| pass | |
| self.logger.info(f"Setting thinking budget to: {adjusted_budget}") | |
| await budget_input_locator.fill(str(adjusted_budget), timeout=5000) | |
| await self._check_disconnect( | |
| check_client_disconnected, "Thinking budget adjustment - after fill" | |
| ) | |
| # Verify | |
| try: | |
| await expect_async(budget_input_locator).to_have_value( | |
| str(adjusted_budget), timeout=3000 | |
| ) | |
| self.logger.info( | |
| f"Thinking budget successfully updated to: {adjusted_budget}" | |
| ) | |
| except Exception: | |
| new_value_str = await budget_input_locator.input_value(timeout=3000) | |
| try: | |
| new_value_int = int(new_value_str) | |
| except Exception: | |
| new_value_int = -1 | |
| if new_value_int == adjusted_budget: | |
| self.logger.info( | |
| f"Thinking budget successfully updated to: {new_value_str}" | |
| ) | |
| else: | |
| # Fallback: if page max is less than requested, try filling with page max | |
| try: | |
| page_max_str = await budget_input_locator.get_attribute("max") | |
| page_max_val = ( | |
| int(page_max_str) if page_max_str is not None else None | |
| ) | |
| except Exception: | |
| page_max_val = None | |
| if page_max_val is not None and page_max_val < adjusted_budget: | |
| self.logger.warning( | |
| f"Page max budget is {page_max_val}, requested budget {adjusted_budget} adjusted to {page_max_val}" | |
| ) | |
| try: | |
| await self.page.evaluate( | |
| "([selector, desired]) => {\n" | |
| " const num = Number(desired);\n" | |
| " const el = document.querySelector(selector);\n" | |
| " if (!el) return false;\n" | |
| " const container = el.closest('[data-test-slider]') || el.parentElement;\n" | |
| " const inputs = container ? container.querySelectorAll('input') : [el];\n" | |
| " inputs.forEach(inp => {\n" | |
| " try { inp.value = String(num); inp.dispatchEvent(new Event('input', { bubbles: true })); inp.dispatchEvent(new Event('change', { bubbles: true })); } catch (_) {}\n" | |
| " });\n" | |
| " return true;\n" | |
| "}", | |
| [THINKING_BUDGET_INPUT_SELECTOR, page_max_val], | |
| ) | |
| except asyncio.CancelledError: | |
| self.logger.info( | |
| f"[{self.req_id}] Thinking budget value set cancelled." | |
| ) | |
| raise | |
| except Exception: | |
| pass | |
| await budget_input_locator.fill(str(page_max_val), timeout=5000) | |
| try: | |
| await expect_async(budget_input_locator).to_have_value( | |
| str(page_max_val), timeout=2000 | |
| ) | |
| except asyncio.CancelledError: | |
| self.logger.info( | |
| f"[{self.req_id}] Thinking budget value set cancelled." | |
| ) | |
| raise | |
| except Exception: | |
| pass | |
| else: | |
| self.logger.warning( | |
| f"Thinking budget verification failed after update. Page shows: {new_value_str}, expected: {adjusted_budget}" | |
| ) | |
| except Exception as e: | |
| if isinstance(e, asyncio.CancelledError): | |
| self.logger.info( | |
| f"[{self.req_id}] Thinking budget value set cancelled." | |
| ) | |
| raise | |
| self.logger.error(f"Error adjusting thinking budget: {e}") | |
| if isinstance(e, ClientDisconnectedError): | |
| raise | |
| async def _control_thinking_mode_toggle( | |
| self, should_be_enabled: bool, check_client_disconnected: Callable | |
| ) -> bool: | |
| """Control main thinking toggle to enable/disable thinking mode.""" | |
| toggle_selector = ENABLE_THINKING_MODE_TOGGLE_SELECTOR | |
| self.logger.info( | |
| f"Controlling main thinking toggle, expected state: {'ON' if should_be_enabled else 'OFF'}..." | |
| ) | |
| try: | |
| toggle_locator = self.page.locator(toggle_selector) | |
| element_count = await toggle_locator.count() | |
| if element_count == 0: | |
| if not should_be_enabled: | |
| self.logger.info( | |
| "Main thinking toggle not found (unsupported), skipping disable." | |
| ) | |
| return True | |
| else: | |
| self.logger.warning( | |
| "Main thinking toggle not found (unsupported), cannot enable." | |
| ) | |
| return False | |
| await expect_async(toggle_locator).to_be_visible(timeout=5000) | |
| try: | |
| await toggle_locator.scroll_into_view_if_needed() | |
| except asyncio.CancelledError: | |
| self.logger.info( | |
| f"[{self.req_id}] Thinking mode toggle control cancelled." | |
| ) | |
| raise | |
| except Exception: | |
| pass | |
| await self._check_disconnect( | |
| check_client_disconnected, "Main thinking toggle - after visible" | |
| ) | |
| is_checked_str = await toggle_locator.get_attribute("aria-checked") | |
| current_state_is_enabled = is_checked_str == "true" | |
| self.logger.info( | |
| f"Main thinking toggle current state: {is_checked_str} (Enabled: {current_state_is_enabled})" | |
| ) | |
| if current_state_is_enabled != should_be_enabled: | |
| action = "enable" if should_be_enabled else "disable" | |
| self.logger.info( | |
| f"Main thinking toggle mismatch, clicking to {action} thinking mode..." | |
| ) | |
| try: | |
| await toggle_locator.click(timeout=CLICK_TIMEOUT_MS) | |
| except asyncio.CancelledError: | |
| self.logger.info( | |
| f"[{self.req_id}] Thinking mode toggle control cancelled." | |
| ) | |
| raise | |
| except Exception: | |
| try: | |
| alt_toggle = self.page.locator( | |
| THINKING_MODE_TOGGLE_PARENT_SELECTOR | |
| ) | |
| if await alt_toggle.count() > 0: | |
| await alt_toggle.click(timeout=CLICK_TIMEOUT_MS) | |
| else: | |
| root = self.page.locator( | |
| THINKING_MODE_TOGGLE_OLD_ROOT_SELECTOR | |
| ) | |
| label = root.locator("label.mdc-label") | |
| await expect_async(label).to_be_visible(timeout=2000) | |
| await label.click(timeout=CLICK_TIMEOUT_MS) | |
| except Exception: | |
| raise | |
| await self._check_disconnect( | |
| check_client_disconnected, | |
| f"Main thinking toggle - after click {action}", | |
| ) | |
| new_state_str = await toggle_locator.get_attribute("aria-checked") | |
| new_state_is_enabled = new_state_str == "true" | |
| if new_state_is_enabled == should_be_enabled: | |
| self.logger.info( | |
| f"Main thinking toggle successfully {action}d. New state: {new_state_str}" | |
| ) | |
| return True | |
| else: | |
| self.logger.warning( | |
| f"Main thinking toggle {action} verification failed. Expected: {should_be_enabled}, Actual: {new_state_str}" | |
| ) | |
| return False | |
| else: | |
| self.logger.info("Main thinking toggle already in expected state.") | |
| return True | |
| except TimeoutError: | |
| self.logger.warning( | |
| "Main thinking toggle element not found or invisible (unsupported)" | |
| ) | |
| return False | |
| except Exception as e: | |
| if isinstance(e, asyncio.CancelledError): | |
| self.logger.info( | |
| f"[{self.req_id}] Thinking mode toggle control cancelled." | |
| ) | |
| raise | |
| self.logger.error(f"Error operating main thinking toggle: {e}") | |
| await save_error_snapshot(f"thinking_mode_toggle_error_{self.req_id}") | |
| if isinstance(e, ClientDisconnectedError): | |
| raise | |
| return False | |
| async def _control_thinking_budget_toggle( | |
| self, should_be_checked: bool, check_client_disconnected: Callable | |
| ): | |
| """Control 'Thinking Budget' toggle state based on should_be_checked.""" | |
| toggle_selector = SET_THINKING_BUDGET_TOGGLE_SELECTOR | |
| self.logger.info( | |
| f"Controlling 'Thinking Budget' toggle, expected state: {'Checked' if should_be_checked else 'Unchecked'}..." | |
| ) | |
| try: | |
| toggle_locator = self.page.locator(toggle_selector) | |
| element_count = await toggle_locator.count() | |
| if element_count == 0: | |
| if not should_be_checked: | |
| self.logger.info( | |
| "Thinking budget toggle not found, skipping disable." | |
| ) | |
| return | |
| else: | |
| self.logger.warning( | |
| "Thinking budget toggle not found, cannot enable." | |
| ) | |
| return | |
| await expect_async(toggle_locator).to_be_visible(timeout=5000) | |
| try: | |
| await toggle_locator.scroll_into_view_if_needed() | |
| except asyncio.CancelledError: | |
| self.logger.info( | |
| f"[{self.req_id}] Thinking budget toggle control cancelled." | |
| ) | |
| raise | |
| except Exception: | |
| pass | |
| await self._check_disconnect( | |
| check_client_disconnected, "Thinking budget toggle - after visible" | |
| ) | |
| is_checked_str = await toggle_locator.get_attribute("aria-checked") | |
| current_state_is_checked = is_checked_str == "true" | |
| self.logger.info( | |
| f"Thinking budget toggle current 'aria-checked': {is_checked_str} (Checked: {current_state_is_checked})" | |
| ) | |
| if current_state_is_checked != should_be_checked: | |
| action = "enable" if should_be_checked else "disable" | |
| self.logger.info( | |
| f"Thinking budget toggle mismatch, clicking to {action}..." | |
| ) | |
| try: | |
| await toggle_locator.click(timeout=CLICK_TIMEOUT_MS) | |
| except asyncio.CancelledError: | |
| self.logger.info( | |
| f"[{self.req_id}] Thinking budget toggle control cancelled." | |
| ) | |
| raise | |
| except Exception: | |
| try: | |
| alt_toggle = self.page.locator( | |
| THINKING_BUDGET_TOGGLE_PARENT_SELECTOR | |
| ) | |
| if await alt_toggle.count() > 0: | |
| await alt_toggle.click(timeout=CLICK_TIMEOUT_MS) | |
| else: | |
| root = self.page.locator( | |
| THINKING_BUDGET_TOGGLE_OLD_ROOT_SELECTOR | |
| ) | |
| label = root.locator("label.mdc-label") | |
| await expect_async(label).to_be_visible(timeout=2000) | |
| await label.click(timeout=CLICK_TIMEOUT_MS) | |
| except Exception: | |
| raise | |
| await self._check_disconnect( | |
| check_client_disconnected, | |
| f"Thinking budget toggle - after click {action}", | |
| ) | |
| await asyncio.sleep(0.5) | |
| new_state_str = await toggle_locator.get_attribute("aria-checked") | |
| new_state_is_checked = new_state_str == "true" | |
| if new_state_is_checked == should_be_checked: | |
| self.logger.info( | |
| f"'Thinking Budget' toggle successfully {action}d. New state: {new_state_str}" | |
| ) | |
| else: | |
| self.logger.warning( | |
| f"'Thinking Budget' toggle verification failed after {action}. Expected: {should_be_checked}, Actual: {new_state_str}" | |
| ) | |
| else: | |
| self.logger.info("'Thinking Budget' toggle already in expected state.") | |
| except Exception as e: | |
| if isinstance(e, asyncio.CancelledError): | |
| self.logger.info( | |
| f"[{self.req_id}] Thinking budget toggle control cancelled." | |
| ) | |
| raise | |
| self.logger.error(f"Error operating 'Thinking Budget toggle': {e}") | |
| if isinstance(e, ClientDisconnectedError): | |
| raise | |