Spaces:
Paused
Paused
| import asyncio | |
| from typing import Callable, List | |
| from playwright.async_api import TimeoutError | |
| from playwright.async_api import expect as expect_async | |
| from browser_utils.operations import save_error_snapshot | |
| from config import ( | |
| CDK_OVERLAY_CONTAINER_SELECTOR, | |
| PROMPT_TEXTAREA_SELECTOR, | |
| RESPONSE_CONTAINER_SELECTOR, | |
| SUBMIT_BUTTON_SELECTOR, | |
| UPLOAD_BUTTON_SELECTOR, | |
| ) | |
| from config.selector_utils import ( | |
| AUTOSIZE_WRAPPER_SELECTORS, | |
| build_combined_selector, | |
| ) | |
| from logging_utils import set_request_id | |
| from models import ClientDisconnectedError | |
| from .base import BaseController | |
| class InputController(BaseController): | |
| """Handles prompt input and submission.""" | |
| async def submit_prompt( | |
| self, prompt: str, image_list: List, check_client_disconnected: Callable | |
| ): | |
| """Submit prompt to the page.""" | |
| set_request_id(self.req_id) | |
| self.logger.debug(f"[Input] Filling prompt ({len(prompt)} chars)") | |
| prompt_textarea_locator = self.page.locator(PROMPT_TEXTAREA_SELECTOR) | |
| # Use centralized selectors supporting new and old UI structures | |
| autosize_wrapper_locator = self.page.locator( | |
| build_combined_selector( | |
| AUTOSIZE_WRAPPER_SELECTORS[:2] | |
| ) # .text-wrapper element | |
| ) | |
| legacy_autosize_wrapper = self.page.locator( | |
| build_combined_selector( | |
| AUTOSIZE_WRAPPER_SELECTORS[2:] | |
| ) # ms-autosize-textarea element | |
| ) | |
| submit_button_locator = self.page.locator(SUBMIT_BUTTON_SELECTOR) | |
| try: | |
| await expect_async(prompt_textarea_locator).to_be_visible(timeout=5000) | |
| await self._check_disconnect( | |
| check_client_disconnected, "After Input Visible" | |
| ) | |
| # Fill text using JavaScript | |
| await prompt_textarea_locator.evaluate( | |
| """ | |
| (element, text) => { | |
| element.value = text; | |
| element.dispatchEvent(new Event('input', { bubbles: true, cancelable: true })); | |
| element.dispatchEvent(new Event('change', { bubbles: true, cancelable: true })); | |
| } | |
| """, | |
| prompt, | |
| ) | |
| autosize_target = autosize_wrapper_locator | |
| if await autosize_target.count() == 0: | |
| autosize_target = legacy_autosize_wrapper | |
| if await autosize_target.count() > 0: | |
| try: | |
| await autosize_target.first.evaluate( | |
| '(element, text) => { element.setAttribute("data-value", text); }', | |
| prompt, | |
| ) | |
| except Exception as autosize_err: | |
| self.logger.debug( | |
| f"autosize wrapper update skipped: {autosize_err}" | |
| ) | |
| await self._check_disconnect(check_client_disconnected, "After Input Fill") | |
| # Attachment upload handled below if needed | |
| if len(image_list) > 0: | |
| ok = await self._open_upload_menu_and_choose_file(image_list) | |
| if not ok: | |
| self.logger.error( | |
| "Error during file upload: Failed to set files via menu method" | |
| ) | |
| # Wait for submit button to be enabled (using configurable fast-fail timeout) | |
| from config.timeouts import SUBMIT_BUTTON_ENABLE_TIMEOUT_MS | |
| wait_timeout_ms_submit_enabled = SUBMIT_BUTTON_ENABLE_TIMEOUT_MS | |
| start_time = asyncio.get_event_loop().time() | |
| self.logger.debug( | |
| f"[Input] Waiting for submit button (max {wait_timeout_ms_submit_enabled}ms)" | |
| ) | |
| try: | |
| while True: | |
| await self._check_disconnect( | |
| check_client_disconnected, "Waiting for Submit Button Enabled" | |
| ) | |
| try: | |
| # Use short timeout polling to respond to interruption signals | |
| if await submit_button_locator.is_enabled(timeout=500): | |
| self.logger.debug("[Input] Submit button enabled") | |
| break | |
| except Exception: | |
| # Ignore temporary errors (e.g. element not present yet) | |
| pass | |
| if ( | |
| asyncio.get_event_loop().time() - start_time | |
| ) * 1000 > wait_timeout_ms_submit_enabled: | |
| raise TimeoutError( | |
| f"Submit button not enabled within {wait_timeout_ms_submit_enabled}ms" | |
| ) | |
| await asyncio.sleep(0.5) | |
| except Exception as e_pw_enabled: | |
| self.logger.error( | |
| f"Timeout or error waiting for submit button enabled: {e_pw_enabled}" | |
| ) | |
| await save_error_snapshot(f"submit_button_enable_timeout_{self.req_id}") | |
| raise | |
| await self._check_disconnect( | |
| check_client_disconnected, "After Submit Button Enabled" | |
| ) | |
| await asyncio.sleep(0.3) | |
| # Try clicking button first, then Enter, then Combo keys | |
| button_clicked = False | |
| try: | |
| self.logger.debug("[Input] Attempting to click submit button...") | |
| # Handle potential dialogs before submit | |
| await self._handle_post_upload_dialog() | |
| # Try to clear tooltip overlays | |
| await self._dismiss_tooltip_overlays() | |
| try: | |
| await submit_button_locator.click(timeout=5000) | |
| except Exception: | |
| # If normal click fails (possibly blocked by tooltip), use JavaScript click | |
| self.logger.debug( | |
| "[Input] Normal click failed, attempting JavaScript click..." | |
| ) | |
| js_clicked = await self._js_click_submit_button( | |
| submit_button_locator | |
| ) | |
| if not js_clicked: | |
| # Finally try force click | |
| self.logger.debug( | |
| "[Input] JavaScript click failed, attempting force click..." | |
| ) | |
| await submit_button_locator.click(timeout=5000, force=True) | |
| self.logger.debug("[Input] Submit button click complete") | |
| button_clicked = True | |
| except Exception as click_err: | |
| self.logger.error(f"Submit button click failed: {click_err}") | |
| await save_error_snapshot(f"submit_button_click_fail_{self.req_id}") | |
| if not button_clicked: | |
| self.logger.info( | |
| "Button submit failed, attempting Enter key submission..." | |
| ) | |
| submitted_successfully = await self._try_enter_submit( | |
| prompt_textarea_locator, check_client_disconnected | |
| ) | |
| if not submitted_successfully: | |
| self.logger.info( | |
| "Enter submission failed, attempting combo key submission..." | |
| ) | |
| combo_ok = await self._try_combo_submit( | |
| prompt_textarea_locator, check_client_disconnected | |
| ) | |
| if not combo_ok: | |
| self.logger.error("Combo key submission also failed.") | |
| raise Exception( | |
| "Submit failed: Button, Enter, and Combo key all failed" | |
| ) | |
| await self._check_disconnect(check_client_disconnected, "After Submit") | |
| except Exception as e_input_submit: | |
| if isinstance(e_input_submit, asyncio.CancelledError): | |
| raise | |
| self.logger.error( | |
| f"Error during input and submit process: {e_input_submit}" | |
| ) | |
| if not isinstance(e_input_submit, ClientDisconnectedError): | |
| await save_error_snapshot(f"input_submit_error_{self.req_id}") | |
| raise | |
| async def _open_upload_menu_and_choose_file(self, files_list: List[str]) -> bool: | |
| """Select 'Upload' from the 'Insert assets' menu and set files.""" | |
| try: | |
| # If a transparent overlay from a previous menu/dialog exists, try to close it | |
| try: | |
| tb = self.page.locator( | |
| "div.cdk-overlay-backdrop.cdk-overlay-transparent-backdrop.cdk-overlay-backdrop-showing" | |
| ) | |
| if await tb.count() > 0 and await tb.first.is_visible(timeout=300): | |
| await self.page.keyboard.press("Escape") | |
| await asyncio.sleep(0.2) | |
| except Exception: | |
| pass | |
| trigger = self.page.locator(UPLOAD_BUTTON_SELECTOR).first | |
| await expect_async(trigger).to_be_visible(timeout=3000) | |
| await trigger.click() | |
| menu_container = self.page.locator(CDK_OVERLAY_CONTAINER_SELECTOR) | |
| # Wait for menu to show | |
| try: | |
| await expect_async( | |
| menu_container.locator("div[role='menu']").first | |
| ).to_be_visible(timeout=3000) | |
| except Exception: | |
| # Try clicking again | |
| try: | |
| await trigger.click() | |
| await expect_async( | |
| menu_container.locator("div[role='menu']").first | |
| ).to_be_visible(timeout=3000) | |
| except Exception: | |
| self.logger.warning("Failed to show upload menu panel.") | |
| return False | |
| # Use menu item with aria-label or text match | |
| try: | |
| # Prefer new UI match | |
| upload_btn = menu_container.locator( | |
| "div[role='menu'] button[role='menuitem'][aria-label='Upload a file']" | |
| ) | |
| if await upload_btn.count() == 0: | |
| # Fallback to old UI match | |
| upload_btn = menu_container.locator( | |
| "div[role='menu'] button[role='menuitem'][aria-label='Upload File']" | |
| ) | |
| if await upload_btn.count() == 0: | |
| # Fallback to text match (new UI) | |
| upload_btn = menu_container.locator( | |
| "div[role='menu'] button[role='menuitem']:has-text('Upload a file')" | |
| ) | |
| if await upload_btn.count() == 0: | |
| # Fallback to text match (old UI) | |
| upload_btn = menu_container.locator( | |
| "div[role='menu'] button[role='menuitem']:has-text('Upload File')" | |
| ) | |
| if await upload_btn.count() == 0: | |
| self.logger.warning( | |
| "Could not find 'Upload a file' or 'Upload File' menu item." | |
| ) | |
| return False | |
| btn = upload_btn.first | |
| await expect_async(btn).to_be_visible(timeout=2000) | |
| # Prefer internal hidden input[type=file] | |
| input_loc = btn.locator('input[type="file"]') | |
| if await input_loc.count() > 0: | |
| await input_loc.set_input_files(files_list) | |
| self.logger.info( | |
| f"Files successfully set via hidden input in menu item (Upload): {len(files_list)} files" | |
| ) | |
| else: | |
| # Fallback to native file chooser | |
| async with self.page.expect_file_chooser() as fc_info: | |
| await btn.click() | |
| file_chooser = await fc_info.value | |
| await file_chooser.set_files(files_list) | |
| self.logger.info( | |
| f"Files successfully set via native file chooser: {len(files_list)} files" | |
| ) | |
| except Exception as e_set: | |
| self.logger.error(f"Failed to set files: {e_set}") | |
| return False | |
| # Close leftover menu overlay | |
| try: | |
| backdrop = self.page.locator( | |
| "div.cdk-overlay-backdrop.cdk-overlay-backdrop-showing, div.cdk-overlay-backdrop.cdk-overlay-transparent-backdrop.cdk-overlay-backdrop-showing" | |
| ) | |
| if await backdrop.count() > 0: | |
| await self.page.keyboard.press("Escape") | |
| await asyncio.sleep(0.2) | |
| except Exception: | |
| pass | |
| # Handle potential authorization popups | |
| await self._handle_post_upload_dialog() | |
| return True | |
| except Exception as e: | |
| if isinstance(e, asyncio.CancelledError): | |
| raise | |
| self.logger.error(f"Failed to set files via upload menu: {e}") | |
| return False | |
| async def _handle_post_upload_dialog(self): | |
| """Handle authorization/copyright confirmation dialogs that may appear after upload.""" | |
| try: | |
| overlay_container = self.page.locator(CDK_OVERLAY_CONTAINER_SELECTOR) | |
| if await overlay_container.count() == 0: | |
| return | |
| # Candidate agreement button texts | |
| agree_texts = [ | |
| "Agree", | |
| "I agree", | |
| "Allow", | |
| "Continue", | |
| "OK", | |
| "Confirm", | |
| "Yes", | |
| ] | |
| # Search for visible buttons within the overlay container | |
| for text in agree_texts: | |
| try: | |
| btn = overlay_container.locator(f"button:has-text('{text}')") | |
| if await btn.count() > 0 and await btn.first.is_visible( | |
| timeout=300 | |
| ): | |
| await btn.first.click() | |
| self.logger.info( | |
| f"Post-upload dialog: Clicked button '{text}'." | |
| ) | |
| await asyncio.sleep(0.3) | |
| break | |
| except Exception: | |
| continue | |
| # If copyright acknowledgment button exists (via aria-label) | |
| try: | |
| acknow_btn_locator = self.page.locator( | |
| 'button[aria-label*="copyright" i], button[aria-label*="acknowledge" i]' | |
| ) | |
| if ( | |
| await acknow_btn_locator.count() > 0 | |
| and await acknow_btn_locator.first.is_visible(timeout=300) | |
| ): | |
| await acknow_btn_locator.first.click() | |
| self.logger.info( | |
| "Post-upload dialog: Clicked copyright acknowledgment button (aria-label match)." | |
| ) | |
| await asyncio.sleep(0.3) | |
| except Exception: | |
| pass | |
| # Wait for overlay to disappear | |
| try: | |
| overlay_backdrop = self.page.locator( | |
| "div.cdk-overlay-backdrop.cdk-overlay-backdrop-showing" | |
| ) | |
| if await overlay_backdrop.count() > 0: | |
| try: | |
| await expect_async(overlay_backdrop).to_be_hidden(timeout=3000) | |
| self.logger.info("Post-upload dialog overlay hidden.") | |
| except Exception: | |
| self.logger.warning( | |
| "Post-upload dialog overlay still exists, subsequent submit might be blocked." | |
| ) | |
| except Exception: | |
| pass | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception: | |
| pass | |
| async def _dismiss_tooltip_overlays(self): | |
| """Close tooltip overlays that may block clicks - directly remove from DOM.""" | |
| try: | |
| # Try to move mouse to make tooltips disappear naturally | |
| await self.page.mouse.move(0, 0) | |
| await asyncio.sleep(0.1) | |
| # Use JavaScript to force remove potential tooltip/overlay elements | |
| removed_count = await self.page.evaluate(""" | |
| () => { | |
| const selectors = [ | |
| '.mdc-tooltip', | |
| '.mat-mdc-tooltip', | |
| '.mdc-tooltip__surface', | |
| '.mat-mdc-tooltip-surface', | |
| '.cdk-overlay-pane:has(.mdc-tooltip)', | |
| '.mat-tooltip-panel', | |
| '[role="tooltip"]' | |
| ]; | |
| let count = 0; | |
| for (const sel of selectors) { | |
| const elements = document.querySelectorAll(sel); | |
| elements.forEach(el => { | |
| el.remove(); | |
| count++; | |
| }); | |
| } | |
| return count; | |
| } | |
| """) | |
| if removed_count > 0: | |
| self.logger.debug(f"[Input] Removed {removed_count} tooltip elements") | |
| await asyncio.sleep(0.1) | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception as e: | |
| self.logger.debug(f"[Input] Tooltip cleanup exception: {e}") | |
| async def _js_click_submit_button(self, submit_button_locator) -> bool: | |
| """Use JavaScript to trigger the submit button click event directly.""" | |
| try: | |
| await submit_button_locator.evaluate("el => el.click()") | |
| self.logger.debug("[Input] JavaScript click on submit button successful") | |
| return True | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception as e: | |
| self.logger.debug(f"[Input] JavaScript click failed: {e}") | |
| return False | |
| async def _try_enter_submit( | |
| self, prompt_textarea_locator, check_client_disconnected: Callable | |
| ) -> bool: | |
| """Submit using the Enter key.""" | |
| try: | |
| await prompt_textarea_locator.focus(timeout=5000) | |
| await self._check_disconnect(check_client_disconnected, "After Input Focus") | |
| await asyncio.sleep(0.1) | |
| # Record content before submit for verification | |
| original_content = "" | |
| try: | |
| original_content = ( | |
| await prompt_textarea_locator.input_value(timeout=2000) or "" | |
| ) | |
| except Exception: | |
| pass | |
| # Try Enter key submission | |
| self.logger.info("Attempting Enter key submission") | |
| try: | |
| await self.page.keyboard.press("Enter") | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception: | |
| try: | |
| await prompt_textarea_locator.press("Enter") | |
| except Exception: | |
| pass | |
| await self._check_disconnect(check_client_disconnected, "After Enter Press") | |
| await asyncio.sleep(2.0) | |
| # Verify submission | |
| submission_success = False | |
| try: | |
| # Method 1: Check if input area is cleared | |
| current_content = ( | |
| await prompt_textarea_locator.input_value(timeout=2000) or "" | |
| ) | |
| if original_content and not current_content.strip(): | |
| self.logger.info( | |
| "Verification method 1: Input cleared, Enter key submission successful" | |
| ) | |
| submission_success = True | |
| # Method 2: Check submit button status | |
| if not submission_success: | |
| submit_button_locator = self.page.locator(SUBMIT_BUTTON_SELECTOR) | |
| try: | |
| is_disabled = await submit_button_locator.is_disabled( | |
| timeout=2000 | |
| ) | |
| if is_disabled: | |
| self.logger.info( | |
| "Verification method 2: Submit button disabled, Enter key submission successful" | |
| ) | |
| submission_success = True | |
| except Exception: | |
| pass | |
| # Method 3: Check for response container | |
| if not submission_success: | |
| try: | |
| response_container = self.page.locator( | |
| RESPONSE_CONTAINER_SELECTOR | |
| ) | |
| container_count = await response_container.count() | |
| if container_count > 0: | |
| last_container = response_container.last | |
| is_vis = await last_container.is_visible(timeout=1000) | |
| if is_vis: | |
| self.logger.info( | |
| "Verification method 3: Response container detected, Enter key submission successful" | |
| ) | |
| submission_success = True | |
| except Exception: | |
| pass | |
| except Exception as verify_err: | |
| self.logger.warning( | |
| f"Error during Enter key submission verification: {verify_err}" | |
| ) | |
| submission_success = True | |
| if submission_success: | |
| self.logger.info("Enter key submission successful") | |
| return True | |
| else: | |
| self.logger.warning("Enter key submission verification failed") | |
| return False | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception as shortcut_err: | |
| self.logger.warning(f"Enter key submission failed: {shortcut_err}") | |
| return False | |
| async def _try_combo_submit( | |
| self, prompt_textarea_locator, check_client_disconnected: Callable | |
| ) -> bool: | |
| """Attempt submission using combo keys (Meta/Control + Enter).""" | |
| import os | |
| try: | |
| host_os_from_launcher = os.environ.get("HOST_OS_FOR_SHORTCUT") | |
| is_mac_determined = False | |
| if host_os_from_launcher == "Darwin": | |
| is_mac_determined = True | |
| elif host_os_from_launcher in ["Windows", "Linux"]: | |
| is_mac_determined = False | |
| else: | |
| try: | |
| user_agent_data_platform = await self.page.evaluate( | |
| "() => navigator.userAgentData?.platform || ''" | |
| ) | |
| except Exception: | |
| user_agent_string = await self.page.evaluate( | |
| "() => navigator.userAgent || ''" | |
| ) | |
| user_agent_string_lower = user_agent_string.lower() | |
| if ( | |
| "macintosh" in user_agent_string_lower | |
| or "mac os x" in user_agent_string_lower | |
| ): | |
| user_agent_data_platform = "macOS" | |
| else: | |
| user_agent_data_platform = "Other" | |
| is_mac_determined = "mac" in user_agent_data_platform.lower() | |
| shortcut_modifier = "Meta" if is_mac_determined else "Control" | |
| shortcut_key = "Enter" | |
| await prompt_textarea_locator.focus(timeout=5000) | |
| await self._check_disconnect(check_client_disconnected, "After Input Focus") | |
| await asyncio.sleep(0.1) | |
| # Record content before submit for verification | |
| original_content = "" | |
| try: | |
| original_content = ( | |
| await prompt_textarea_locator.input_value(timeout=2000) or "" | |
| ) | |
| except Exception: | |
| pass | |
| self.logger.info( | |
| f"Attempting combo submission: {shortcut_modifier}+{shortcut_key}" | |
| ) | |
| try: | |
| await self.page.keyboard.press(f"{shortcut_modifier}+{shortcut_key}") | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception: | |
| try: | |
| await self.page.keyboard.down(shortcut_modifier) | |
| await asyncio.sleep(0.05) | |
| await self.page.keyboard.press(shortcut_key) | |
| await asyncio.sleep(0.05) | |
| await self.page.keyboard.up(shortcut_modifier) | |
| except Exception: | |
| pass | |
| await self._check_disconnect(check_client_disconnected, "After Combo Press") | |
| await asyncio.sleep(2.0) | |
| submission_success = False | |
| try: | |
| current_content = ( | |
| await prompt_textarea_locator.input_value(timeout=2000) or "" | |
| ) | |
| if original_content and not current_content.strip(): | |
| self.logger.info( | |
| "Verification method 1: Input cleared, combo submission successful" | |
| ) | |
| submission_success = True | |
| if not submission_success: | |
| submit_button_locator = self.page.locator(SUBMIT_BUTTON_SELECTOR) | |
| try: | |
| is_disabled = await submit_button_locator.is_disabled( | |
| timeout=2000 | |
| ) | |
| if is_disabled: | |
| self.logger.info( | |
| "Verification method 2: Submit button disabled, combo submission successful" | |
| ) | |
| submission_success = True | |
| except Exception: | |
| pass | |
| if not submission_success: | |
| try: | |
| response_container = self.page.locator( | |
| RESPONSE_CONTAINER_SELECTOR | |
| ) | |
| container_count = await response_container.count() | |
| if container_count > 0: | |
| last_container = response_container.last | |
| is_vis = await last_container.is_visible(timeout=1000) | |
| if is_vis: | |
| self.logger.info( | |
| "Verification method 3: Response container detected, combo submission successful" | |
| ) | |
| submission_success = True | |
| except Exception: | |
| pass | |
| except Exception as verify_err: | |
| if isinstance(verify_err, asyncio.CancelledError): | |
| raise | |
| self.logger.warning( | |
| f"Error during combo submission verification: {verify_err}" | |
| ) | |
| submission_success = True | |
| if submission_success: | |
| self.logger.info("Combo submission successful") | |
| return True | |
| else: | |
| self.logger.warning("Combo submission verification failed") | |
| return False | |
| except Exception as combo_err: | |
| if isinstance(combo_err, asyncio.CancelledError): | |
| raise | |
| self.logger.warning(f"Combo submission failed: {combo_err}") | |
| return False | |