Spaces:
Paused
Paused
File size: 14,934 Bytes
a5784e9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 | """
PageController Module
Encapsulates all complex logic for direct interaction with Playwright pages.
"""
import asyncio
import re
from typing import Any, Callable, Dict, List, Optional, Tuple
from playwright.async_api import Page as AsyncPage
from playwright.async_api import expect as expect_async
from config import (
CLEAR_CHAT_BUTTON_SELECTOR,
CLEAR_CHAT_CONFIRM_BUTTON_SELECTOR,
CLICK_TIMEOUT_MS,
DEFAULT_MAX_OUTPUT_TOKENS,
DEFAULT_STOP_SEQUENCES,
DEFAULT_TEMPERATURE,
DEFAULT_TOP_P,
EDIT_MESSAGE_BUTTON_SELECTOR,
ENABLE_URL_CONTEXT,
PROMPT_TEXTAREA_SELECTOR,
SUBMIT_BUTTON_SELECTOR,
UPLOAD_BUTTON_SELECTOR,
)
from models import ClientDisconnectedError, QuotaExceededError
from .initialization import enable_temporary_chat_mode
from .operations import (
_get_final_response_content,
_wait_for_response_completion,
check_quota_limit,
get_response_via_copy_button,
get_response_via_edit_button,
)
from .page_controller_modules.base import BaseController
from .page_controller_modules.chat import ChatController
from .page_controller_modules.function_calling import FunctionCallingController
from .page_controller_modules.input import InputController
from .page_controller_modules.parameters import ParameterController
from .page_controller_modules.response import ResponseController
from .page_controller_modules.thinking import ThinkingController
class PageController(
ParameterController,
InputController,
ChatController,
ResponseController,
ThinkingController,
FunctionCallingController,
BaseController,
):
"""Encapsulates all operations for interacting with the AI Studio page."""
def __init__(self, page: AsyncPage, logger, req_id: str):
self.page = page
self.logger = logger
self.req_id = req_id
async def _check_disconnect(self, check_client_disconnected: Callable, stage: str):
if check_client_disconnected(stage):
raise ClientDisconnectedError(
f"[{self.req_id}] Client disconnected at stage: {stage}"
)
async def adjust_parameters(
self,
request_params: Dict[str, Any],
page_params_cache: Dict[str, Any],
params_cache_lock: asyncio.Lock,
model_id_to_use: Optional[str],
parsed_model_list: List[Dict[str, Any]],
check_client_disconnected: Callable,
is_streaming: bool = True,
):
self.logger.info(f"[{self.req_id}] Adjusting parameters...")
await self._check_disconnect(
check_client_disconnected, "Start Parameter Adjustment"
)
temp = request_params.get("temperature", DEFAULT_TEMPERATURE)
await self._adjust_temperature(
temp, page_params_cache, params_cache_lock, check_client_disconnected
)
max_tokens = request_params.get("max_output_tokens", DEFAULT_MAX_OUTPUT_TOKENS)
await self._adjust_max_tokens(
max_tokens,
page_params_cache,
params_cache_lock,
model_id_to_use,
parsed_model_list,
check_client_disconnected,
)
stop = request_params.get("stop", DEFAULT_STOP_SEQUENCES)
await self._adjust_stop_sequences(
stop, page_params_cache, params_cache_lock, check_client_disconnected
)
top_p = request_params.get("top_p", DEFAULT_TOP_P)
await self._adjust_top_p(top_p, check_client_disconnected)
await self._ensure_tools_panel_expanded(check_client_disconnected)
# Force disable URL context if function calling is active
is_fc_enabled = await self.is_function_calling_enabled(
check_client_disconnected
)
if is_fc_enabled:
await self._adjust_url_context(False, check_client_disconnected)
elif ENABLE_URL_CONTEXT:
await self._adjust_url_context(True, check_client_disconnected)
await self._handle_thinking_budget(
request_params,
page_params_cache,
params_cache_lock,
model_id_to_use,
check_client_disconnected,
is_streaming,
)
await self._adjust_google_search(
request_params, model_id_to_use, check_client_disconnected
)
async def clear_chat_history(self, check_client_disconnected: Callable):
"""Clear chat history and invalidate function calling cache."""
self.logger.info(f"[{self.req_id}] Clearing chat history...")
# Invalidate FC cache since we're starting a new chat
self.invalidate_fc_cache("new_chat")
btn = self.page.locator(CLEAR_CHAT_BUTTON_SELECTOR)
if await btn.is_enabled(timeout=5000):
await btn.click(timeout=CLICK_TIMEOUT_MS)
confirm = self.page.locator(CLEAR_CHAT_CONFIRM_BUTTON_SELECTOR)
if await confirm.is_visible(timeout=2000):
await confirm.click(timeout=CLICK_TIMEOUT_MS)
await enable_temporary_chat_mode(self.page)
async def submit_prompt(
self, prompt: str, image_list: List, check_client_disconnected: Callable
):
"""Submit prompt to the page with retries and keyboard fallbacks."""
max_retries = 2
for attempt in range(max_retries):
try:
self.logger.info(
f"[{self.req_id}] Filling and submitting prompt (Attempt {attempt + 1}/{max_retries})..."
)
textarea = self.page.locator(PROMPT_TEXTAREA_SELECTOR)
await expect_async(textarea).to_be_visible(timeout=10000)
await self._check_disconnect(
check_client_disconnected, "After Input Visible"
)
# Fill textarea using centralized logic (inherited from InputController if possible, or direct)
await textarea.evaluate(
"(el, t) => { el.value = t; el.dispatchEvent(new Event('input', {bubbles:true})); el.dispatchEvent(new Event('change', {bubbles:true})); }",
prompt,
)
await self._check_disconnect(
check_client_disconnected, "After Input Fill"
)
if image_list:
await self._open_upload_menu_and_choose_file(image_list)
# Wait for submit button to be enabled
submit = self.page.locator(SUBMIT_BUTTON_SELECTOR)
button_clicked = False
is_btn_enabled = False
try:
await expect_async(submit).to_be_enabled(timeout=10000)
is_btn_enabled = True
except Exception:
self.logger.warning(
f"[{self.req_id}] Submit button not enabled within timeout, trying keyboard fallback."
)
await self._check_disconnect(
check_client_disconnected, "After Submit Button Check"
)
if is_btn_enabled:
try:
# Defensive workarounds before click: handle dialogs, backdrops and tooltips
await self._handle_post_upload_dialog()
await self._dismiss_backdrops()
if hasattr(self, "_dismiss_tooltip_overlays"):
await self._dismiss_tooltip_overlays()
await submit.click(timeout=5000)
button_clicked = True
self.logger.info(f"[{self.req_id}] Submit button clicked.")
await check_quota_limit(self.page, self.req_id)
except QuotaExceededError:
raise
except Exception as click_err:
self.logger.warning(
f"[{self.req_id}] Button click failed: {click_err}. Trying keyboard fallback."
)
if not button_clicked:
# Keyboard fallbacks (using logic inherited from InputController)
self.logger.info(
f"[{self.req_id}] Attempting Enter key submission..."
)
if await self._try_enter_submit(
textarea, check_client_disconnected
):
button_clicked = True
else:
self.logger.info(
f"[{self.req_id}] Attempting Combo key submission..."
)
if await self._try_combo_submit(
textarea, check_client_disconnected
):
button_clicked = True
if not button_clicked:
raise Exception(
"Failed to submit prompt via button or keyboard shortcuts."
)
await self._check_disconnect(check_client_disconnected, "After Submit")
return
except QuotaExceededError:
raise
except Exception as e:
self.logger.warning(
f"[{self.req_id}] Error during submit (Attempt {attempt + 1}): {e}"
)
if attempt < max_retries - 1:
await self._safe_reload_page()
await asyncio.sleep(2)
else:
raise e
async def _open_upload_menu_and_choose_file(self, files_list: List[str]) -> bool:
"""Upload files via menu."""
await self.page.locator(UPLOAD_BUTTON_SELECTOR).first.click()
btn = self.page.locator("div[role='menu'] button[role='menuitem']").filter(
has_text="Upload File"
)
if await btn.count() == 0:
btn = self.page.locator("div[role='menu'] button[role='menuitem']").filter(
has_text="Upload a file"
)
async with self.page.expect_file_chooser() as fc_info:
await btn.first.click()
await (await fc_info.value).set_files(files_list)
return True
async def _safe_reload_page(self):
"""Reload page safely."""
await self.page.reload(timeout=30000)
await self.page.wait_for_load_state("domcontentloaded", timeout=30000)
async def get_response(
self,
check_client_disconnected: Callable,
prompt_length: int = 0,
timeout: Optional[float] = None,
) -> str:
"""Retrieve response content."""
submit_btn = self.page.locator(SUBMIT_BUTTON_SELECTOR)
edit_btn = self.page.locator(EDIT_MESSAGE_BUTTON_SELECTOR)
input_field = self.page.locator(PROMPT_TEXTAREA_SELECTOR)
await _wait_for_response_completion(
self.page,
input_field,
submit_btn,
edit_btn,
self.req_id,
check_client_disconnected,
None,
prompt_length=prompt_length,
timeout=timeout,
)
content = await _get_final_response_content(
self.page, self.req_id, check_client_disconnected
)
if not content or not content.strip():
verified = await self.verify_response_integrity(check_client_disconnected)
return verified.get("content", "")
return content
async def verify_response_integrity(
self, check_client_disconnected: Callable, trigger_reason: str = ""
) -> Dict[str, str]:
"""Verify integrity via DOM."""
await asyncio.sleep(1)
final = await self._extract_complete_response_content()
content, reasoning = self._separate_thinking_and_response(final)
return {"content": content, "reasoning_content": reasoning}
async def get_response_with_integrity_check(
self,
check_client_disconnected: Callable,
prompt_length: int = 0,
timeout: Optional[float] = None,
) -> Dict[str, Any]:
"""Retrieve response content with full integrity check and function calls."""
content = await self.get_response(
check_client_disconnected, prompt_length, timeout
)
# Parse function calls from DOM as well
has_fc, function_calls, text_content = await self.parse_function_calls(
check_client_disconnected
)
c, r = self._separate_thinking_and_response(content)
result = {
"content": c,
"reasoning_content": r,
"recovery_method": "direct",
"has_function_calls": has_fc,
"function_calls": function_calls,
}
if has_fc:
# If function calls found, use the text content (with calls removed) as content
# But we need to separate thinking from it too
c_fc, r_fc = self._separate_thinking_and_response(text_content)
result["content"] = c_fc
# Keep original reasoning if not found in text_content
if r_fc:
result["reasoning_content"] = r_fc
return result
def _separate_thinking_and_response(self, content: str) -> Tuple[str, str]:
"""Separate thinking and response."""
if not content:
return "", ""
m = re.findall(r"\[THINKING\](.*?)\[/THINKING\]", content, re.DOTALL)
r = "\n".join(m).strip()
c = re.sub(
r"\[THINKING\](.*?)\[/THINKING\]", "", content, flags=re.DOTALL
).strip()
return c, r
async def _emergency_stability_wait(
self, check_client_disconnected: Callable
) -> bool:
"""Wait for DOM stability."""
await asyncio.sleep(2)
return True
async def _check_generation_activity(self) -> bool:
"""Check if generation is in progress."""
stop_btn = self.page.locator('button[aria-label="Stop generating"]')
return await stop_btn.is_visible(timeout=500)
async def _extract_dom_content(self) -> str:
"""Extract content from DOM."""
from config.selectors import FINAL_RESPONSE_SELECTOR
elem = self.page.locator(FINAL_RESPONSE_SELECTOR).last
return await elem.inner_text() if await elem.count() > 0 else ""
async def _extract_complete_response_content(self) -> str:
"""Extract complete response content."""
c = await get_response_via_edit_button(self.page, self.req_id, lambda x: None)
if not c:
c = await get_response_via_copy_button(
self.page, self.req_id, lambda x: None
)
return c if c else await self._extract_dom_content()
async def get_body_text_only_from_dom(self) -> str:
"""Extract body text only."""
return await self._extract_dom_content()
|