peijun1's picture
Deploy AI Studio Proxy API to Hugging Face Spaces
a5784e9
Raw
History Blame Contribute Delete
27.7 kB
import asyncio
from typing import Callable, List
from playwright.async_api import TimeoutError
from playwright.async_api import expect as expect_async
from browser_utils.operations import save_error_snapshot
from config import (
CDK_OVERLAY_CONTAINER_SELECTOR,
PROMPT_TEXTAREA_SELECTOR,
RESPONSE_CONTAINER_SELECTOR,
SUBMIT_BUTTON_SELECTOR,
UPLOAD_BUTTON_SELECTOR,
)
from config.selector_utils import (
AUTOSIZE_WRAPPER_SELECTORS,
build_combined_selector,
)
from logging_utils import set_request_id
from models import ClientDisconnectedError
from .base import BaseController
class InputController(BaseController):
"""Handles prompt input and submission."""
async def submit_prompt(
self, prompt: str, image_list: List, check_client_disconnected: Callable
):
"""Submit prompt to the page."""
set_request_id(self.req_id)
self.logger.debug(f"[Input] Filling prompt ({len(prompt)} chars)")
prompt_textarea_locator = self.page.locator(PROMPT_TEXTAREA_SELECTOR)
# Use centralized selectors supporting new and old UI structures
autosize_wrapper_locator = self.page.locator(
build_combined_selector(
AUTOSIZE_WRAPPER_SELECTORS[:2]
) # .text-wrapper element
)
legacy_autosize_wrapper = self.page.locator(
build_combined_selector(
AUTOSIZE_WRAPPER_SELECTORS[2:]
) # ms-autosize-textarea element
)
submit_button_locator = self.page.locator(SUBMIT_BUTTON_SELECTOR)
try:
await expect_async(prompt_textarea_locator).to_be_visible(timeout=5000)
await self._check_disconnect(
check_client_disconnected, "After Input Visible"
)
# Fill text using JavaScript
await prompt_textarea_locator.evaluate(
"""
(element, text) => {
element.value = text;
element.dispatchEvent(new Event('input', { bubbles: true, cancelable: true }));
element.dispatchEvent(new Event('change', { bubbles: true, cancelable: true }));
}
""",
prompt,
)
autosize_target = autosize_wrapper_locator
if await autosize_target.count() == 0:
autosize_target = legacy_autosize_wrapper
if await autosize_target.count() > 0:
try:
await autosize_target.first.evaluate(
'(element, text) => { element.setAttribute("data-value", text); }',
prompt,
)
except Exception as autosize_err:
self.logger.debug(
f"autosize wrapper update skipped: {autosize_err}"
)
await self._check_disconnect(check_client_disconnected, "After Input Fill")
# Attachment upload handled below if needed
if len(image_list) > 0:
ok = await self._open_upload_menu_and_choose_file(image_list)
if not ok:
self.logger.error(
"Error during file upload: Failed to set files via menu method"
)
# Wait for submit button to be enabled (using configurable fast-fail timeout)
from config.timeouts import SUBMIT_BUTTON_ENABLE_TIMEOUT_MS
wait_timeout_ms_submit_enabled = SUBMIT_BUTTON_ENABLE_TIMEOUT_MS
start_time = asyncio.get_event_loop().time()
self.logger.debug(
f"[Input] Waiting for submit button (max {wait_timeout_ms_submit_enabled}ms)"
)
try:
while True:
await self._check_disconnect(
check_client_disconnected, "Waiting for Submit Button Enabled"
)
try:
# Use short timeout polling to respond to interruption signals
if await submit_button_locator.is_enabled(timeout=500):
self.logger.debug("[Input] Submit button enabled")
break
except Exception:
# Ignore temporary errors (e.g. element not present yet)
pass
if (
asyncio.get_event_loop().time() - start_time
) * 1000 > wait_timeout_ms_submit_enabled:
raise TimeoutError(
f"Submit button not enabled within {wait_timeout_ms_submit_enabled}ms"
)
await asyncio.sleep(0.5)
except Exception as e_pw_enabled:
self.logger.error(
f"Timeout or error waiting for submit button enabled: {e_pw_enabled}"
)
await save_error_snapshot(f"submit_button_enable_timeout_{self.req_id}")
raise
await self._check_disconnect(
check_client_disconnected, "After Submit Button Enabled"
)
await asyncio.sleep(0.3)
# Try clicking button first, then Enter, then Combo keys
button_clicked = False
try:
self.logger.debug("[Input] Attempting to click submit button...")
# Handle potential dialogs before submit
await self._handle_post_upload_dialog()
# Try to clear tooltip overlays
await self._dismiss_tooltip_overlays()
try:
await submit_button_locator.click(timeout=5000)
except Exception:
# If normal click fails (possibly blocked by tooltip), use JavaScript click
self.logger.debug(
"[Input] Normal click failed, attempting JavaScript click..."
)
js_clicked = await self._js_click_submit_button(
submit_button_locator
)
if not js_clicked:
# Finally try force click
self.logger.debug(
"[Input] JavaScript click failed, attempting force click..."
)
await submit_button_locator.click(timeout=5000, force=True)
self.logger.debug("[Input] Submit button click complete")
button_clicked = True
except Exception as click_err:
self.logger.error(f"Submit button click failed: {click_err}")
await save_error_snapshot(f"submit_button_click_fail_{self.req_id}")
if not button_clicked:
self.logger.info(
"Button submit failed, attempting Enter key submission..."
)
submitted_successfully = await self._try_enter_submit(
prompt_textarea_locator, check_client_disconnected
)
if not submitted_successfully:
self.logger.info(
"Enter submission failed, attempting combo key submission..."
)
combo_ok = await self._try_combo_submit(
prompt_textarea_locator, check_client_disconnected
)
if not combo_ok:
self.logger.error("Combo key submission also failed.")
raise Exception(
"Submit failed: Button, Enter, and Combo key all failed"
)
await self._check_disconnect(check_client_disconnected, "After Submit")
except Exception as e_input_submit:
if isinstance(e_input_submit, asyncio.CancelledError):
raise
self.logger.error(
f"Error during input and submit process: {e_input_submit}"
)
if not isinstance(e_input_submit, ClientDisconnectedError):
await save_error_snapshot(f"input_submit_error_{self.req_id}")
raise
async def _open_upload_menu_and_choose_file(self, files_list: List[str]) -> bool:
"""Select 'Upload' from the 'Insert assets' menu and set files."""
try:
# If a transparent overlay from a previous menu/dialog exists, try to close it
try:
tb = self.page.locator(
"div.cdk-overlay-backdrop.cdk-overlay-transparent-backdrop.cdk-overlay-backdrop-showing"
)
if await tb.count() > 0 and await tb.first.is_visible(timeout=300):
await self.page.keyboard.press("Escape")
await asyncio.sleep(0.2)
except Exception:
pass
trigger = self.page.locator(UPLOAD_BUTTON_SELECTOR).first
await expect_async(trigger).to_be_visible(timeout=3000)
await trigger.click()
menu_container = self.page.locator(CDK_OVERLAY_CONTAINER_SELECTOR)
# Wait for menu to show
try:
await expect_async(
menu_container.locator("div[role='menu']").first
).to_be_visible(timeout=3000)
except Exception:
# Try clicking again
try:
await trigger.click()
await expect_async(
menu_container.locator("div[role='menu']").first
).to_be_visible(timeout=3000)
except Exception:
self.logger.warning("Failed to show upload menu panel.")
return False
# Use menu item with aria-label or text match
try:
# Prefer new UI match
upload_btn = menu_container.locator(
"div[role='menu'] button[role='menuitem'][aria-label='Upload a file']"
)
if await upload_btn.count() == 0:
# Fallback to old UI match
upload_btn = menu_container.locator(
"div[role='menu'] button[role='menuitem'][aria-label='Upload File']"
)
if await upload_btn.count() == 0:
# Fallback to text match (new UI)
upload_btn = menu_container.locator(
"div[role='menu'] button[role='menuitem']:has-text('Upload a file')"
)
if await upload_btn.count() == 0:
# Fallback to text match (old UI)
upload_btn = menu_container.locator(
"div[role='menu'] button[role='menuitem']:has-text('Upload File')"
)
if await upload_btn.count() == 0:
self.logger.warning(
"Could not find 'Upload a file' or 'Upload File' menu item."
)
return False
btn = upload_btn.first
await expect_async(btn).to_be_visible(timeout=2000)
# Prefer internal hidden input[type=file]
input_loc = btn.locator('input[type="file"]')
if await input_loc.count() > 0:
await input_loc.set_input_files(files_list)
self.logger.info(
f"Files successfully set via hidden input in menu item (Upload): {len(files_list)} files"
)
else:
# Fallback to native file chooser
async with self.page.expect_file_chooser() as fc_info:
await btn.click()
file_chooser = await fc_info.value
await file_chooser.set_files(files_list)
self.logger.info(
f"Files successfully set via native file chooser: {len(files_list)} files"
)
except Exception as e_set:
self.logger.error(f"Failed to set files: {e_set}")
return False
# Close leftover menu overlay
try:
backdrop = self.page.locator(
"div.cdk-overlay-backdrop.cdk-overlay-backdrop-showing, div.cdk-overlay-backdrop.cdk-overlay-transparent-backdrop.cdk-overlay-backdrop-showing"
)
if await backdrop.count() > 0:
await self.page.keyboard.press("Escape")
await asyncio.sleep(0.2)
except Exception:
pass
# Handle potential authorization popups
await self._handle_post_upload_dialog()
return True
except Exception as e:
if isinstance(e, asyncio.CancelledError):
raise
self.logger.error(f"Failed to set files via upload menu: {e}")
return False
async def _handle_post_upload_dialog(self):
"""Handle authorization/copyright confirmation dialogs that may appear after upload."""
try:
overlay_container = self.page.locator(CDK_OVERLAY_CONTAINER_SELECTOR)
if await overlay_container.count() == 0:
return
# Candidate agreement button texts
agree_texts = [
"Agree",
"I agree",
"Allow",
"Continue",
"OK",
"Confirm",
"Yes",
]
# Search for visible buttons within the overlay container
for text in agree_texts:
try:
btn = overlay_container.locator(f"button:has-text('{text}')")
if await btn.count() > 0 and await btn.first.is_visible(
timeout=300
):
await btn.first.click()
self.logger.info(
f"Post-upload dialog: Clicked button '{text}'."
)
await asyncio.sleep(0.3)
break
except Exception:
continue
# If copyright acknowledgment button exists (via aria-label)
try:
acknow_btn_locator = self.page.locator(
'button[aria-label*="copyright" i], button[aria-label*="acknowledge" i]'
)
if (
await acknow_btn_locator.count() > 0
and await acknow_btn_locator.first.is_visible(timeout=300)
):
await acknow_btn_locator.first.click()
self.logger.info(
"Post-upload dialog: Clicked copyright acknowledgment button (aria-label match)."
)
await asyncio.sleep(0.3)
except Exception:
pass
# Wait for overlay to disappear
try:
overlay_backdrop = self.page.locator(
"div.cdk-overlay-backdrop.cdk-overlay-backdrop-showing"
)
if await overlay_backdrop.count() > 0:
try:
await expect_async(overlay_backdrop).to_be_hidden(timeout=3000)
self.logger.info("Post-upload dialog overlay hidden.")
except Exception:
self.logger.warning(
"Post-upload dialog overlay still exists, subsequent submit might be blocked."
)
except Exception:
pass
except asyncio.CancelledError:
raise
except Exception:
pass
async def _dismiss_tooltip_overlays(self):
"""Close tooltip overlays that may block clicks - directly remove from DOM."""
try:
# Try to move mouse to make tooltips disappear naturally
await self.page.mouse.move(0, 0)
await asyncio.sleep(0.1)
# Use JavaScript to force remove potential tooltip/overlay elements
removed_count = await self.page.evaluate("""
() => {
const selectors = [
'.mdc-tooltip',
'.mat-mdc-tooltip',
'.mdc-tooltip__surface',
'.mat-mdc-tooltip-surface',
'.cdk-overlay-pane:has(.mdc-tooltip)',
'.mat-tooltip-panel',
'[role="tooltip"]'
];
let count = 0;
for (const sel of selectors) {
const elements = document.querySelectorAll(sel);
elements.forEach(el => {
el.remove();
count++;
});
}
return count;
}
""")
if removed_count > 0:
self.logger.debug(f"[Input] Removed {removed_count} tooltip elements")
await asyncio.sleep(0.1)
except asyncio.CancelledError:
raise
except Exception as e:
self.logger.debug(f"[Input] Tooltip cleanup exception: {e}")
async def _js_click_submit_button(self, submit_button_locator) -> bool:
"""Use JavaScript to trigger the submit button click event directly."""
try:
await submit_button_locator.evaluate("el => el.click()")
self.logger.debug("[Input] JavaScript click on submit button successful")
return True
except asyncio.CancelledError:
raise
except Exception as e:
self.logger.debug(f"[Input] JavaScript click failed: {e}")
return False
async def _try_enter_submit(
self, prompt_textarea_locator, check_client_disconnected: Callable
) -> bool:
"""Submit using the Enter key."""
try:
await prompt_textarea_locator.focus(timeout=5000)
await self._check_disconnect(check_client_disconnected, "After Input Focus")
await asyncio.sleep(0.1)
# Record content before submit for verification
original_content = ""
try:
original_content = (
await prompt_textarea_locator.input_value(timeout=2000) or ""
)
except Exception:
pass
# Try Enter key submission
self.logger.info("Attempting Enter key submission")
try:
await self.page.keyboard.press("Enter")
except asyncio.CancelledError:
raise
except Exception:
try:
await prompt_textarea_locator.press("Enter")
except Exception:
pass
await self._check_disconnect(check_client_disconnected, "After Enter Press")
await asyncio.sleep(2.0)
# Verify submission
submission_success = False
try:
# Method 1: Check if input area is cleared
current_content = (
await prompt_textarea_locator.input_value(timeout=2000) or ""
)
if original_content and not current_content.strip():
self.logger.info(
"Verification method 1: Input cleared, Enter key submission successful"
)
submission_success = True
# Method 2: Check submit button status
if not submission_success:
submit_button_locator = self.page.locator(SUBMIT_BUTTON_SELECTOR)
try:
is_disabled = await submit_button_locator.is_disabled(
timeout=2000
)
if is_disabled:
self.logger.info(
"Verification method 2: Submit button disabled, Enter key submission successful"
)
submission_success = True
except Exception:
pass
# Method 3: Check for response container
if not submission_success:
try:
response_container = self.page.locator(
RESPONSE_CONTAINER_SELECTOR
)
container_count = await response_container.count()
if container_count > 0:
last_container = response_container.last
is_vis = await last_container.is_visible(timeout=1000)
if is_vis:
self.logger.info(
"Verification method 3: Response container detected, Enter key submission successful"
)
submission_success = True
except Exception:
pass
except Exception as verify_err:
self.logger.warning(
f"Error during Enter key submission verification: {verify_err}"
)
submission_success = True
if submission_success:
self.logger.info("Enter key submission successful")
return True
else:
self.logger.warning("Enter key submission verification failed")
return False
except asyncio.CancelledError:
raise
except Exception as shortcut_err:
self.logger.warning(f"Enter key submission failed: {shortcut_err}")
return False
async def _try_combo_submit(
self, prompt_textarea_locator, check_client_disconnected: Callable
) -> bool:
"""Attempt submission using combo keys (Meta/Control + Enter)."""
import os
try:
host_os_from_launcher = os.environ.get("HOST_OS_FOR_SHORTCUT")
is_mac_determined = False
if host_os_from_launcher == "Darwin":
is_mac_determined = True
elif host_os_from_launcher in ["Windows", "Linux"]:
is_mac_determined = False
else:
try:
user_agent_data_platform = await self.page.evaluate(
"() => navigator.userAgentData?.platform || ''"
)
except Exception:
user_agent_string = await self.page.evaluate(
"() => navigator.userAgent || ''"
)
user_agent_string_lower = user_agent_string.lower()
if (
"macintosh" in user_agent_string_lower
or "mac os x" in user_agent_string_lower
):
user_agent_data_platform = "macOS"
else:
user_agent_data_platform = "Other"
is_mac_determined = "mac" in user_agent_data_platform.lower()
shortcut_modifier = "Meta" if is_mac_determined else "Control"
shortcut_key = "Enter"
await prompt_textarea_locator.focus(timeout=5000)
await self._check_disconnect(check_client_disconnected, "After Input Focus")
await asyncio.sleep(0.1)
# Record content before submit for verification
original_content = ""
try:
original_content = (
await prompt_textarea_locator.input_value(timeout=2000) or ""
)
except Exception:
pass
self.logger.info(
f"Attempting combo submission: {shortcut_modifier}+{shortcut_key}"
)
try:
await self.page.keyboard.press(f"{shortcut_modifier}+{shortcut_key}")
except asyncio.CancelledError:
raise
except Exception:
try:
await self.page.keyboard.down(shortcut_modifier)
await asyncio.sleep(0.05)
await self.page.keyboard.press(shortcut_key)
await asyncio.sleep(0.05)
await self.page.keyboard.up(shortcut_modifier)
except Exception:
pass
await self._check_disconnect(check_client_disconnected, "After Combo Press")
await asyncio.sleep(2.0)
submission_success = False
try:
current_content = (
await prompt_textarea_locator.input_value(timeout=2000) or ""
)
if original_content and not current_content.strip():
self.logger.info(
"Verification method 1: Input cleared, combo submission successful"
)
submission_success = True
if not submission_success:
submit_button_locator = self.page.locator(SUBMIT_BUTTON_SELECTOR)
try:
is_disabled = await submit_button_locator.is_disabled(
timeout=2000
)
if is_disabled:
self.logger.info(
"Verification method 2: Submit button disabled, combo submission successful"
)
submission_success = True
except Exception:
pass
if not submission_success:
try:
response_container = self.page.locator(
RESPONSE_CONTAINER_SELECTOR
)
container_count = await response_container.count()
if container_count > 0:
last_container = response_container.last
is_vis = await last_container.is_visible(timeout=1000)
if is_vis:
self.logger.info(
"Verification method 3: Response container detected, combo submission successful"
)
submission_success = True
except Exception:
pass
except Exception as verify_err:
if isinstance(verify_err, asyncio.CancelledError):
raise
self.logger.warning(
f"Error during combo submission verification: {verify_err}"
)
submission_success = True
if submission_success:
self.logger.info("Combo submission successful")
return True
else:
self.logger.warning("Combo submission verification failed")
return False
except Exception as combo_err:
if isinstance(combo_err, asyncio.CancelledError):
raise
self.logger.warning(f"Combo submission failed: {combo_err}")
return False