Spaces:
Sleeping
Sleeping
| """ | |
| Браузерная автоматизация для регистрации AWS Builder ID | |
| """ | |
| import os | |
| import time | |
| import random | |
| import platform | |
| import functools | |
| from typing import Optional, Callable | |
| from DrissionPage import ChromiumPage, ChromiumOptions | |
| import sys | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| # Force unbuffered output for real-time logging | |
| print = functools.partial(print, flush=True) | |
| # Импортируем спуфинг | |
| from spoof import apply_pre_navigation_spoofing | |
| from spoofers.behavior import BehaviorSpoofModule | |
| from spoofers.profile_storage import ProfileStorage | |
| # Debug recorder | |
| from core.debug_recorder import get_recorder, record, init_recorder | |
| # Script collector for analysis (disabled by default, enable for debugging) | |
| # from debugger.collectors.script_collector import ScriptCollector, collect_scripts, save_collected_scripts | |
| COLLECT_SCRIPTS = False # Set to True to enable script collection for analysis | |
| def find_chrome_path() -> Optional[str]: | |
| """Find Chrome/Chromium executable path on different platforms""" | |
| system = platform.system() | |
| if system == 'Windows': | |
| # Common Chrome paths on Windows | |
| possible_paths = [ | |
| os.path.expandvars(r'%ProgramFiles%\Google\Chrome\Application\chrome.exe'), | |
| os.path.expandvars(r'%ProgramFiles(x86)%\Google\Chrome\Application\chrome.exe'), | |
| os.path.expandvars(r'%LocalAppData%\Google\Chrome\Application\chrome.exe'), | |
| os.path.expandvars(r'%ProgramFiles%\Chromium\Application\chrome.exe'), | |
| os.path.expandvars(r'%LocalAppData%\Chromium\Application\chrome.exe'), | |
| # Edge as fallback | |
| os.path.expandvars(r'%ProgramFiles(x86)%\Microsoft\Edge\Application\msedge.exe'), | |
| os.path.expandvars(r'%ProgramFiles%\Microsoft\Edge\Application\msedge.exe'), | |
| ] | |
| elif system == 'Darwin': # macOS | |
| possible_paths = [ | |
| '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome', | |
| '/Applications/Chromium.app/Contents/MacOS/Chromium', | |
| os.path.expanduser('~/Applications/Google Chrome.app/Contents/MacOS/Google Chrome'), | |
| ] | |
| else: # Linux | |
| possible_paths = [ | |
| '/usr/bin/google-chrome', | |
| '/usr/bin/google-chrome-stable', | |
| '/usr/bin/chromium', | |
| '/usr/bin/chromium-browser', | |
| '/snap/bin/chromium', | |
| ] | |
| for path in possible_paths: | |
| if os.path.exists(path): | |
| return path | |
| return None | |
| from core.config import get_config | |
| from core.paths import get_paths | |
| # Селекторы на основе анализа Playwright (декабрь 2025) | |
| # AWS использует data-testid для стабильных селекторов | |
| SELECTORS = { | |
| 'cookie_accept': [ | |
| 'text=Accept', # Английский | |
| 'text=Принять', # Русский | |
| 'text=Akzeptieren', # German | |
| 'text=同意する', # Japanese | |
| 'xpath://button[contains(text(), "Accept")]', | |
| 'xpath://button[contains(text(), "Принять")]', | |
| '[data-id="awsccc-cb-btn-accept"]', | |
| ], | |
| 'email_input': [ | |
| '@placeholder=username@example.com', | |
| 'aria:Email', | |
| '@type=email', | |
| '@data-testid=test-input', | |
| ], | |
| 'continue_btn': [ | |
| '@data-testid=test-primary-button', # Основная кнопка Continue | |
| '@data-testid=signup-next-button', # Continue на странице имени | |
| '@data-testid=email-verification-verify-button', # Continue на странице кода | |
| 'text=Continue', | |
| 'text=Weiter', # German | |
| 'text=続行', # Japanese | |
| ], | |
| 'name_input': [ | |
| '@placeholder=Maria José Silva', # Актуальный placeholder | |
| 'aria:Name', | |
| '@data-testid=name-input', | |
| ], | |
| 'code_input': [ | |
| '@placeholder=6-digit', # English placeholder | |
| '@placeholder=6-stellig', # German placeholder | |
| '@placeholder=6桁', # Japanese placeholder | |
| 'aria:Verification code', | |
| 'aria:Verifizierungscode', # German | |
| 'aria:確認コード', # Japanese | |
| 'css:input[maxlength="6"]', # Generic 6-digit input | |
| ], | |
| 'password_input': [ | |
| '@placeholder=Enter password', # English placeholder | |
| '@placeholder=Passwort eingeben', # German placeholder | |
| '@placeholder=パスワードを入力', # Japanese placeholder | |
| 'aria:Password', | |
| 'aria:Passwort', # German | |
| 'aria:パスワード', # Japanese | |
| 'css:input[type="password"]', # Generic password input | |
| ], | |
| 'confirm_password': [ | |
| '@placeholder=Re-enter password', # English placeholder | |
| '@placeholder=Passwort erneut eingeben', # German placeholder | |
| '@placeholder=パスワードを再入力', # Japanese placeholder | |
| 'aria:Confirm password', | |
| 'aria:Passwort bestätigen', # German | |
| 'aria:パスワードを確認', # Japanese | |
| 'css:input[type="password"]:nth-of-type(2)', # Second password field | |
| ], | |
| 'allow_access': [ | |
| 'text=Allow access', | |
| 'text=Zugriff erlauben', # German | |
| 'text=アクセスを許可', # Japanese | |
| '@data-testid=allow-access-button', | |
| ], | |
| } | |
| # Контексты страниц - заголовки для определения текущего шага | |
| # Поддерживаемые языки: English, German (de-DE), Japanese (ja-JP) | |
| PAGE_CONTEXTS = { | |
| 'email': ['Get started', 'Sign in', 'Erste Schritte', 'サインイン', '開始する'], | |
| 'name': ['Enter your name', 'Geben Sie Ihren Namen ein', '名前を入力してください'], | |
| 'verification': ['Verify your email', 'E-Mail-Adresse bestätigen', 'メールアドレスを確認'], | |
| 'password': ['Create your password', 'Ihr Passwort erstellen', 'パスワードを作成'], | |
| 'allow_access': ['Allow access', 'Authorization', 'Zugriff erlauben', 'アクセスを許可', '認可'], | |
| } | |
| BROWSER_ARGS = [ | |
| # '--disable-blink-features=AutomationControlled', # AWS детектит этот флаг! | |
| '--disable-dev-shm-usage', | |
| ] | |
| PASSWORD_LENGTH = 16 | |
| PASSWORD_CHARS = { | |
| 'upper': 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', | |
| 'lower': 'abcdefghijklmnopqrstuvwxyz', | |
| 'digits': '0123456789', | |
| 'special': '!@#$%^&*', # Расширен набор спецсимволов | |
| } | |
| # Таймауты по умолчанию (оптимизированы) | |
| DEFAULT_TIMEOUTS = { | |
| 'page_load': 5, # Уменьшено с 10 | |
| 'element_wait': 1, # Уменьшено с 3 | |
| 'page_transition': 3, # Уменьшено с 5 | |
| 'poll_interval': 0.1, # Интервал проверки элементов | |
| } | |
| def load_settings(): | |
| return get_config().to_dict() | |
| def get_setting(path, default=None): | |
| return get_config().get(path, default) | |
| BASE_DIR = get_paths().autoreg_dir | |
| class BrowserAutomation: | |
| """Автоматизация браузера для регистрации""" | |
| def __init__(self, headless: bool = None, email: str = None): | |
| """ | |
| Args: | |
| headless: Запуск без GUI (по умолчанию из настроек) | |
| email: Email аккаунта (для сохранения профиля спуфинга) | |
| """ | |
| self._email = email | |
| settings = load_settings() | |
| browser_settings = settings.get('browser', {}) | |
| # headless можно переопределить параметром | |
| if headless is None: | |
| headless = browser_settings.get('headless', False) | |
| # Force headless when no display is available (e.g., HF Spaces) | |
| if not os.environ.get('DISPLAY') and not os.environ.get('WAYLAND_DISPLAY'): | |
| headless = True | |
| self.settings = settings | |
| self.headless = headless | |
| self.verbose = settings.get('debug', {}).get('verbose', False) | |
| self.screenshots_on_error = browser_settings.get('screenshots_on_error', True) | |
| # Настройка браузера | |
| co = ChromiumOptions() | |
| # КРИТИЧНО: Уникальная user data directory для каждого экземпляра | |
| # Это предотвращает конфликт с уже открытым Chrome | |
| import tempfile | |
| import uuid | |
| temp_profile = os.path.join(tempfile.gettempdir(), f'kiro_chrome_{uuid.uuid4().hex[:8]}') | |
| co.set_user_data_path(temp_profile) | |
| co.auto_port() # Автоматически найти свободный порт | |
| print(f"[Browser] Using temp profile: {temp_profile}") | |
| # Найти и установить путь к Chrome (критично для Windows) | |
| chrome_path = find_chrome_path() | |
| if chrome_path: | |
| co.set_browser_path(chrome_path) | |
| print(f"[Browser] Using: {chrome_path}") | |
| else: | |
| print("[Browser] Warning: Chrome not found, using system default") | |
| if headless: | |
| co.headless() | |
| # Explicit headless flag for Chromium in headless Linux | |
| co.set_argument('--headless=new') | |
| # Дополнительные аргументы для стабильного headless | |
| co.set_argument('--disable-gpu') | |
| co.set_argument('--no-sandbox') | |
| co.set_argument('--disable-dev-shm-usage') | |
| # Скрываем automation infobars | |
| co.set_argument('--disable-infobars') | |
| co.set_argument('--no-first-run') | |
| co.set_argument('--no-default-browser-check') | |
| # Уменьшаем логи Chrome | |
| co.set_argument('--disable-logging') | |
| co.set_argument('--log-level=3') # Только fatal errors | |
| # НЕ используем --silent-launch - он ломает запуск Chrome! | |
| # Force English language to avoid Chinese error messages | |
| co.set_argument('--lang=en-US') | |
| co.set_argument('--accept-lang=en-US,en') | |
| # Set DrissionPage language to English (avoid Chinese error messages) | |
| os.environ['DRISSIONPAGE_LANG'] = 'en' | |
| # НЕ используем --disable-blink-features=AutomationControlled | |
| # Он показывает предупреждение которое палит автоматизацию! | |
| # Размер окна - большое для корректного отображения AWS UI | |
| co.set_argument('--window-size=1920,1080') | |
| co.set_argument('--start-maximized') | |
| if browser_settings.get('incognito', True): | |
| co.set_argument('--incognito') | |
| if browser_settings.get('devtools', False): | |
| co.set_argument('--auto-open-devtools-for-tabs') | |
| # Proxy support (from environment variables) | |
| proxy_url = os.environ.get('HTTPS_PROXY') or os.environ.get('HTTP_PROXY') | |
| if proxy_url: | |
| # Remove http:// prefix if present for Chrome proxy format | |
| proxy_server = proxy_url.replace('http://', '').replace('https://', '') | |
| co.set_argument(f'--proxy-server={proxy_server}') | |
| # Ignore SSL errors when using proxy (mitmproxy intercepts HTTPS) | |
| co.set_argument('--ignore-certificate-errors') | |
| co.set_argument('--ignore-ssl-errors') | |
| print(f"[Browser] Using proxy: {proxy_server}") | |
| for arg in BROWSER_ARGS: | |
| co.set_argument(arg) | |
| print(f"[Browser] Initializing ChromiumPage (headless={headless})...") | |
| try: | |
| self.page = ChromiumPage(co) | |
| print("[Browser] ChromiumPage initialized successfully") | |
| # КРИТИЧНО: Максимизируем окно для корректного отображения AWS UI | |
| if not headless: | |
| try: | |
| self.page.set.window.max() | |
| print(" [W] Window maximized") | |
| except: | |
| # Fallback: устанавливаем большой размер вручную | |
| try: | |
| self.page.set.window.size(1920, 1080) | |
| print(" [W] Window resized to 1920x1080") | |
| except: | |
| pass | |
| except Exception as e: | |
| error_msg = str(e).encode('ascii', 'replace').decode('ascii') | |
| print(f"[Browser] ERROR: Failed to initialize browser: {error_msg}") | |
| raise | |
| self._cookie_closed = False # Флаг чтобы не закрывать cookie много раз | |
| self._network_logs = [] # Логи сетевых запросов | |
| # ВАЖНО: Очищаем cookies и storage для чистой сессии | |
| try: | |
| # Очищаем cookies для всех AWS доменов | |
| self.page.run_cdp('Network.clearBrowserCookies') | |
| self.page.run_cdp('Network.clearBrowserCache') | |
| # Очищаем storage для AWS доменов | |
| aws_origins = [ | |
| 'https://profile.aws.amazon.com', | |
| 'https://signin.aws.amazon.com', | |
| 'https://us-east-1.signin.aws', | |
| 'https://oidc.us-east-1.amazonaws.com', | |
| 'https://view.awsapps.com', | |
| ] | |
| for origin in aws_origins: | |
| try: | |
| self.page.run_cdp('Storage.clearDataForOrigin', origin=origin, storageTypes='all') | |
| except: | |
| pass | |
| print(" [C] Cleared browser cookies, cache and storage") | |
| except Exception as e: | |
| print(f" [!] Failed to clear cookies: {e}") | |
| # КРИТИЧНО: Применяем спуфинг ДО навигации на страницу | |
| # Это гарантирует что AWS FWCIM получит подменённые данные | |
| # Проверяем env переменную SPOOFING_ENABLED (по умолчанию включено) | |
| spoofing_enabled = os.environ.get('SPOOFING_ENABLED', '1') == '1' | |
| if spoofing_enabled: | |
| try: | |
| # Используем ProfileStorage для консистентного fingerprint | |
| profile = None | |
| if self._email: | |
| from core.paths import get_paths | |
| storage = ProfileStorage(get_paths().tokens_dir) | |
| profile = storage.get_or_create(self._email) | |
| self._profile_storage = storage | |
| self._spoofer = apply_pre_navigation_spoofing(self.page, profile) | |
| print(" [S] Anti-fingerprint spoofing applied") | |
| except Exception as e: | |
| print(f" [!] Spoofing failed: {e}") | |
| self._spoofer = None | |
| else: | |
| print(" [S] Spoofing disabled by settings") | |
| self._spoofer = None | |
| # Инициализируем модуль человеческого поведения | |
| self._behavior = BehaviorSpoofModule() | |
| # Инициализируем debug recorder если включен | |
| if os.environ.get('DEBUG_RECORDING', '0') == '1': | |
| session_name = f"reg_{email.split('@')[0] if email else 'unknown'}_{int(time.time())}" | |
| self._recorder = init_recorder(session_name) | |
| else: | |
| self._recorder = None | |
| # Настройка реалистичного ввода (по умолчанию включено для обхода FWCIM) | |
| self._realistic_typing = browser_settings.get('realistic_typing', True) | |
| if self._realistic_typing: | |
| # Реалистичные задержки для обхода поведенческого анализа | |
| print(" [B] Realistic typing enabled (slower but safer)") | |
| else: | |
| # Быстрые задержки для скорости | |
| self._behavior.typing_delay_range = (0.03, 0.08) | |
| self._behavior.action_delay_range = (0.1, 0.3) | |
| print(" [B] Fast typing mode (faster but may be detected)") | |
| # Включаем перехват сетевых запросов | |
| self._setup_network_logging() | |
| self._log("Browser initialized", f"headless={headless}") | |
| def _setup_network_logging(self): | |
| """Настройка перехвата сетевых запросов через CDP""" | |
| try: | |
| # Включаем Network domain | |
| self.page.run_cdp('Network.enable') | |
| # Слушаем события запросов | |
| def on_request(params): | |
| url = params.get('request', {}).get('url', '') | |
| if 'send-otp' in url or 'api/' in url: | |
| self._network_logs.append({ | |
| 'type': 'request', | |
| 'url': url, | |
| 'method': params.get('request', {}).get('method'), | |
| 'headers': params.get('request', {}).get('headers'), | |
| 'postData': params.get('request', {}).get('postData'), | |
| 'requestId': params.get('requestId'), | |
| }) | |
| print(f" [W] API Request: {params.get('request', {}).get('method')} {url}") | |
| def on_response(params): | |
| url = params.get('response', {}).get('url', '') | |
| if 'send-otp' in url or 'api/' in url: | |
| status = params.get('response', {}).get('status') | |
| self._network_logs.append({ | |
| 'type': 'response', | |
| 'url': url, | |
| 'status': status, | |
| 'headers': params.get('response', {}).get('headers'), | |
| 'requestId': params.get('requestId'), | |
| }) | |
| print(f" [W] API Response: {status} {url}") | |
| # DrissionPage не поддерживает CDP events напрямую, используем альтернативу | |
| print(" [N] Network logging enabled (will capture via Performance API)") | |
| except Exception as e: | |
| print(f" [!] Network logging setup failed: {e}") | |
| def save_network_logs(self, filename: str = "network_logs.json"): | |
| """Сохраняет логи сетевых запросов в файл""" | |
| import json | |
| filepath = BASE_DIR / filename | |
| # Получаем логи через Performance API | |
| try: | |
| perf_logs = self.page.run_js(''' | |
| return performance.getEntriesByType('resource') | |
| .filter(e => e.name.includes('api/') || e.name.includes('send-otp')) | |
| .map(e => ({ | |
| url: e.name, | |
| duration: e.duration, | |
| startTime: e.startTime, | |
| transferSize: e.transferSize, | |
| type: e.initiatorType | |
| })); | |
| ''') | |
| self._network_logs.extend(perf_logs or []) | |
| except: | |
| pass | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(self._network_logs, f, indent=2, ensure_ascii=False) | |
| print(f" [F] Network logs saved: {filepath}") | |
| return filepath | |
| def _log(self, message: str, detail: str = ""): | |
| """Логирование с учётом verbose режима""" | |
| if self.verbose or not detail: | |
| print(f"[*] {message}" + (f" ({detail})" if detail else "")) | |
| def _find_element(self, selectors: list, timeout: int = None): | |
| """Ищет элемент по списку селекторов""" | |
| timeout = timeout or self.settings.get('timeouts', {}).get('element_wait', 3) | |
| for selector in selectors: | |
| try: | |
| elem = self.page.ele(selector, timeout=timeout) | |
| if elem: | |
| return elem | |
| except Exception: | |
| pass | |
| return None | |
| def _click_if_exists(self, selectors: list, timeout: int = 1) -> bool: | |
| """Кликает по элементу если он существует""" | |
| elem = self._find_element(selectors, timeout) | |
| if elem: | |
| self.human_click(elem) | |
| return True | |
| return False | |
| def wait_for_page_context(self, context_key: str, timeout: int = None) -> bool: | |
| """ | |
| Ждёт появления контекста страницы (заголовка). | |
| Оптимизировано: использует быстрый polling вместо медленных циклов. | |
| Args: | |
| context_key: Ключ из PAGE_CONTEXTS ('email', 'name', 'verification', 'password') | |
| timeout: Таймаут в секундах | |
| Returns: | |
| True если контекст найден | |
| """ | |
| timeout = timeout or DEFAULT_TIMEOUTS['page_transition'] | |
| poll_interval = DEFAULT_TIMEOUTS['poll_interval'] | |
| contexts = PAGE_CONTEXTS.get(context_key, []) | |
| if not contexts: | |
| return True | |
| print(f" [...] Waiting for page: {context_key}...") | |
| # Строим комбинированный селектор для всех контекстов | |
| combined_selectors = [f'text={ctx}' for ctx in contexts] | |
| start_time = time.time() | |
| while time.time() - start_time < timeout: | |
| for selector in combined_selectors: | |
| try: | |
| if self.page.ele(selector, timeout=poll_interval): | |
| elapsed = time.time() - start_time | |
| print(f" [OK] Page context found in {elapsed:.2f}s") | |
| time.sleep(0.1) # Минимальная пауза для React | |
| return True | |
| except: | |
| pass | |
| print(f" [!] Page context not found: {context_key}") | |
| return False | |
| def wait_for_url_change(self, old_url: str, timeout: int = None) -> bool: | |
| """ | |
| Ждёт изменения URL после действия. | |
| Оптимизировано: быстрый polling с минимальными задержками. | |
| Args: | |
| old_url: Предыдущий URL | |
| timeout: Таймаут в секундах | |
| Returns: | |
| True если URL изменился | |
| """ | |
| timeout = timeout or DEFAULT_TIMEOUTS['page_transition'] | |
| poll_interval = DEFAULT_TIMEOUTS['poll_interval'] | |
| start_time = time.time() | |
| while time.time() - start_time < timeout: | |
| if self.page.url != old_url: | |
| time.sleep(0.15) # Минимальная пауза для загрузки | |
| return True | |
| time.sleep(poll_interval) | |
| return False | |
| def wait_for_element(self, selectors: list, timeout: int = None) -> Optional[object]: | |
| """ | |
| Умное ожидание элемента с быстрым polling. | |
| Args: | |
| selectors: Список селекторов для поиска | |
| timeout: Таймаут в секундах | |
| Returns: | |
| Найденный элемент или None | |
| """ | |
| timeout = timeout or DEFAULT_TIMEOUTS['element_wait'] | |
| poll_interval = DEFAULT_TIMEOUTS['poll_interval'] | |
| start_time = time.time() | |
| while time.time() - start_time < timeout: | |
| for selector in selectors: | |
| try: | |
| elem = self.page.ele(selector, timeout=poll_interval) | |
| if elem: | |
| return elem | |
| except: | |
| pass | |
| return None | |
| # ======================================================================== | |
| # HUMAN-LIKE INPUT (Обход поведенческого анализа AWS FWCIM) | |
| # ======================================================================== | |
| def human_type(self, element, text: str, click_first: bool = True, fast: bool = None, field_type: str = 'default'): | |
| """ | |
| Вводит текст с человеческими задержками. | |
| Args: | |
| element: Элемент для ввода | |
| text: Текст для ввода | |
| click_first: Кликнуть на элемент перед вводом | |
| fast: Быстрый режим (None = использовать настройку realistic_typing) | |
| field_type: Тип поля для BehaviorSpoofModule ('email', 'password', 'name', 'code') | |
| """ | |
| # Определяем режим: если fast не указан, используем настройку | |
| use_fast = fast if fast is not None else not self._realistic_typing | |
| # ВСЕГДА используем fwcim_type для генерации правильных событий клавиатуры | |
| # FWCIM отслеживает keyCycles (keydown->keyup timing) - execCommand их не генерирует! | |
| # Параметр fast передаётся в fwcim_type для уменьшения задержек | |
| self._behavior.fwcim_type(self.page, element, text, field_type=field_type, fast=use_fast) | |
| def human_click(self, element, with_delay: bool = True): | |
| """ | |
| Кликает по элементу с человеческой задержкой. | |
| Args: | |
| element: Элемент для клика | |
| with_delay: Добавить задержку до/после клика | |
| """ | |
| if with_delay: | |
| self._behavior.human_delay(0.15, 0.4) | |
| # FWCIM-совместимый клик с mousedown/mouseup событиями | |
| self._behavior.fwcim_click(self.page, element) | |
| def simulate_human_activity(self): | |
| """ | |
| Симулирует активность реального пользователя. | |
| Вызывать периодически для обхода поведенческого анализа. | |
| """ | |
| # Случайные движения мыши | |
| self._behavior.random_mouse_movement(self, count=random.randint(1, 3)) | |
| # Иногда скроллим страницу | |
| if random.random() < 0.3: | |
| direction = random.choice(['up', 'down']) | |
| self._behavior.scroll_page(self, direction=direction) | |
| def generate_password(length: int = PASSWORD_LENGTH) -> str: | |
| """ | |
| Генерация криптографически безопасного пароля. | |
| Использует secrets для избежания предсказуемости. | |
| AWS проверяет пароли на утечки - нужна высокая энтропия. | |
| """ | |
| import secrets | |
| chars = ''.join(PASSWORD_CHARS.values()) | |
| # Гарантируем наличие всех типов символов | |
| password = [ | |
| secrets.choice(PASSWORD_CHARS['upper']), | |
| secrets.choice(PASSWORD_CHARS['lower']), | |
| secrets.choice(PASSWORD_CHARS['digits']), | |
| secrets.choice(PASSWORD_CHARS['special']), | |
| ] | |
| # Добавляем случайные символы | |
| password += [secrets.choice(chars) for _ in range(length - 4)] | |
| # Перемешиваем (secrets.SystemRandom для криптографической случайности) | |
| secrets.SystemRandom().shuffle(password) | |
| return ''.join(password) | |
| def _accept_cookie_banner(self): | |
| """Принимает cookie banner кликом на Accept/Принять.""" | |
| # Метод 1: DrissionPage - ищем кнопку Accept напрямую | |
| try: | |
| # Ищем оранжевую кнопку Accept на profile.aws.amazon.com | |
| accept_btn = self.page.ele('text=Accept', timeout=0.5) | |
| if accept_btn and accept_btn.tag == 'button': | |
| accept_btn.click() | |
| print(" [C] Cookie banner accepted (DrissionPage)") | |
| return True | |
| except: | |
| pass | |
| # Метод 2: JavaScript для AWS signin cookie banner | |
| try: | |
| result = self.page.run_js(''' | |
| // AWS signin cookie banner | |
| const acceptButtons = [ | |
| '[data-id="awsccc-cb-btn-accept"]', | |
| 'button[data-id*="accept"]', | |
| '#awsccc-cb-btn-accept', | |
| ]; | |
| for (const sel of acceptButtons) { | |
| try { | |
| const btn = document.querySelector(sel); | |
| if (btn && btn.offsetParent !== null) { | |
| btn.click(); | |
| return 'clicked'; | |
| } | |
| } catch(e) {} | |
| } | |
| // Fallback: ищем кнопку по тексту | |
| const allButtons = document.querySelectorAll('button'); | |
| for (const btn of allButtons) { | |
| const text = btn.textContent.trim(); | |
| if ((text === 'Accept' || text === 'Принять') && btn.offsetParent !== null) { | |
| btn.click(); | |
| return 'clicked'; | |
| } | |
| } | |
| return 'not_found'; | |
| ''') | |
| if result == 'clicked': | |
| print(" [C] Cookie banner accepted (JS)") | |
| return True | |
| except: | |
| pass | |
| return False | |
| def _hide_cookie_banner(self): | |
| """Скрывает cookie banner через CSS (fallback если клик не сработал).""" | |
| try: | |
| self.page.run_js(''' | |
| // Скрываем все возможные cookie элементы через CSS | |
| const selectors = [ | |
| '#awsccc-cb-content', | |
| '#awsccc-cb', | |
| '.awsccc-cs-overlay', | |
| '.awscc-cookie-banner', | |
| '.awsccc-cb-container' | |
| ]; | |
| selectors.forEach(sel => { | |
| const el = document.querySelector(sel); | |
| if (el) el.style.display = 'none'; | |
| }); | |
| // Также удаляем overlay который блокирует клики | |
| document.querySelectorAll('.awsccc-cs-overlay, .modal-backdrop').forEach(el => el.remove()); | |
| ''') | |
| except: | |
| pass | |
| def _collect_scripts(self): | |
| """Collect all JS scripts from current page for analysis. | |
| Disabled by default. Set COLLECT_SCRIPTS = True at top of file to enable. | |
| """ | |
| if not COLLECT_SCRIPTS: | |
| return | |
| try: | |
| # Import only when needed | |
| from debugger.collectors.script_collector import collect_scripts | |
| collect_scripts(self.page) | |
| except Exception as e: | |
| # Don't fail registration if script collection fails | |
| if self.verbose: | |
| print(f"[ScriptCollector] Error: {e}") | |
| def close_cookie_dialog(self, force: bool = False): | |
| """Принимает или скрывает диалог cookie.""" | |
| if self._cookie_closed and not force: | |
| return False | |
| # Сначала пробуем кликнуть Accept | |
| if self._accept_cookie_banner(): | |
| self._cookie_closed = True | |
| return True | |
| # Fallback: скрываем через CSS | |
| self._hide_cookie_banner() | |
| self._cookie_closed = True | |
| return True | |
| def enter_device_code(self, user_code: str, email: str = None, password: str = None) -> bool: | |
| """ | |
| Вводит код устройства на странице Device Authorization. | |
| Страница: view.awsapps.com/start/#/device | |
| Если показывается форма логина (после регистрации), сначала логинится. | |
| Args: | |
| user_code: Код устройства (например SQHH-RXJR) | |
| email: Email для логина (если нужен) | |
| password: Пароль для логина (если нужен) | |
| """ | |
| print(f"[KEY] Entering device code: {user_code}") | |
| # Ждём загрузки страницы device authorization | |
| print(" [...] Waiting for device authorization page...") | |
| for _ in range(15): | |
| try: | |
| # Проверяем что мы на странице device | |
| if self.page.ele('text=Authorization requested', timeout=0.5): | |
| print(" [OK] Device authorization page loaded") | |
| break | |
| if self.page.ele('text=Enter the code', timeout=0.5): | |
| print(" [OK] Device authorization page loaded") | |
| break | |
| # Форма логина на странице device | |
| if self.page.ele('text=Sign in', timeout=0.5): | |
| print(" [OK] Login form on device page") | |
| break | |
| except: | |
| pass | |
| time.sleep(0.5) | |
| # Закрываем cookie если есть | |
| self.close_cookie_dialog(force=True) | |
| time.sleep(1) | |
| # Проверяем - это форма логина или форма device code? | |
| # Если есть password поле но нет текстового поля - это логин | |
| all_inputs = self.page.eles('tag:input') | |
| has_password_field = any((inp.attr('type') or '').lower() == 'password' for inp in all_inputs) | |
| has_text_field = any((inp.attr('type') or '').lower() in ('', 'text') | |
| and (inp.attr('type') or '').lower() not in ('checkbox', 'hidden', 'submit', 'button') | |
| for inp in all_inputs) | |
| # Если есть password но нет text - это форма логина | |
| if has_password_field and not has_text_field and password: | |
| print(" [K] Login form detected, logging in first...") | |
| if not self._login_on_device_page(email, password): | |
| print(" [!] Login failed") | |
| return False | |
| time.sleep(2) | |
| # После логина перезагружаем inputs | |
| all_inputs = self.page.eles('tag:input') | |
| # Ждём появления поля ввода кода | |
| # На странице device authorization поле может быть без type или с type="" | |
| code_input = None | |
| for attempt in range(15): | |
| try: | |
| # Способ 1: Ищем все input'ы и фильтруем | |
| all_inputs = self.page.eles('tag:input') | |
| print(f" [S] Found {len(all_inputs)} inputs on attempt {attempt + 1}") | |
| for inp in all_inputs: | |
| inp_type = (inp.attr('type') or '').lower() | |
| inp_id = inp.attr('id') or '' | |
| inp_name = inp.attr('name') or '' | |
| inp_aria = inp.attr('aria-label') or '' | |
| print(f" Input: type='{inp_type}', id='{inp_id}', name='{inp_name}', aria='{inp_aria}'") | |
| # Пропускаем password, checkbox, hidden | |
| if inp_type in ('password', 'checkbox', 'hidden', 'submit', 'button'): | |
| continue | |
| # Это наше поле! | |
| code_input = inp | |
| print(f" [OK] Found code input field") | |
| break | |
| if code_input: | |
| break | |
| except Exception as e: | |
| print(f" [!] Error searching inputs: {e}") | |
| time.sleep(0.5) | |
| if not code_input: | |
| print(" [!] Device code input not found") | |
| self._debug_inputs() | |
| self.screenshot("error_device_code") | |
| return False | |
| # Вводим код | |
| code_input.clear() | |
| self.human_type(code_input, user_code) | |
| time.sleep(0.5) | |
| # Кликаем Confirm and continue | |
| confirm_btn = self._find_element([ | |
| 'text=Confirm and continue', | |
| 'xpath://button[contains(text(), "Confirm")]', | |
| '@data-testid=confirm-button', | |
| ], timeout=3) | |
| if confirm_btn: | |
| print(" [->] Clicking Confirm and continue...") | |
| self.human_click(confirm_btn) | |
| time.sleep(2) | |
| return True | |
| print(" [!] Confirm button not found") | |
| return False | |
| def _login_on_device_page(self, email: str, password: str) -> bool: | |
| """ | |
| Логинится на странице device (когда показывается форма логина после регистрации). | |
| AWS показывает только поле пароля, т.к. email уже известен. | |
| """ | |
| print(f" [K] Logging in on device page...") | |
| # ВАЖНО: Сначала закрываем cookie диалог - он перекрывает кнопки! | |
| self.close_cookie_dialog(force=True) | |
| time.sleep(0.5) | |
| # Ищем поле пароля | |
| pwd_field = None | |
| for selector in ['tag:input@@type=password', 'input[type="password"]']: | |
| try: | |
| pwd_field = self.page.ele(selector, timeout=2) | |
| if pwd_field: | |
| break | |
| except: | |
| pass | |
| if not pwd_field: | |
| print(" [!] Password field not found on device page") | |
| return False | |
| # Вводим пароль | |
| print(f" Entering password...") | |
| self._behavior.human_click(pwd_field) | |
| self.page.run_js('arguments[0].focus()', pwd_field) | |
| self._behavior.human_delay(0.1, 0.2) | |
| # Вводим через CDP с человеческими задержками | |
| for char in password: | |
| self.page.run_cdp('Input.insertText', text=char) | |
| self._behavior.human_delay(0.03, 0.1) | |
| time.sleep(0.5) | |
| # Ещё раз закрываем cookie если появился снова | |
| self.close_cookie_dialog(force=True) | |
| time.sleep(0.3) | |
| # Debug: показываем все кнопки на странице | |
| try: | |
| buttons = self.page.eles('tag:button') | |
| print(f" [S] Found {len(buttons)} buttons:") | |
| for i, btn in enumerate(buttons[:5]): | |
| btn_text = btn.text or '' | |
| btn_type = btn.attr('type') or '' | |
| btn_testid = btn.attr('data-testid') or '' | |
| print(f" Button {i}: text='{btn_text[:30]}', type='{btn_type}', testid='{btn_testid}'") | |
| except Exception as e: | |
| print(f" [!] Error listing buttons: {e}") | |
| # Кликаем Sign in / Continue - расширенный список селекторов | |
| sign_in_btn = self._find_element([ | |
| 'text=Sign in', | |
| 'text=Continue', | |
| 'text=Submit', | |
| 'text=Next', | |
| 'xpath://button[contains(text(), "Sign in")]', | |
| 'xpath://button[contains(text(), "Continue")]', | |
| 'xpath://button[contains(text(), "Submit")]', | |
| 'xpath://button[@type="submit"]', | |
| '@data-testid=test-primary-button', | |
| '@data-testid=signin-button', | |
| '@data-testid=submit-button', | |
| 'tag:button@@type=submit', | |
| ], timeout=3) | |
| if sign_in_btn: | |
| print(f" [->] Clicking Sign in button...") | |
| self.human_click(sign_in_btn) | |
| time.sleep(3) | |
| print(" [OK] Logged in") | |
| return True | |
| # Fallback: кликаем первую кнопку submit | |
| try: | |
| submit_btn = self.page.ele('tag:button@@type=submit', timeout=2) | |
| if submit_btn: | |
| print(f" [->] Clicking submit button (fallback)...") | |
| self.human_click(submit_btn) | |
| time.sleep(3) | |
| print(" [OK] Logged in (fallback)") | |
| return True | |
| except: | |
| pass | |
| print(" [!] Sign in button not found") | |
| self.screenshot("error_login_no_button") | |
| return False | |
| def enter_email(self, email: str) -> bool: | |
| """Вводит email. Оптимизировано для скорости.""" | |
| print(f"[M] Entering email: {email}") | |
| record('enter_email', {'email': email}, self.page, screenshot=False) | |
| # Закрываем cookie один раз | |
| self.close_cookie_dialog(force=True) | |
| # Минимальная пауза перед вводом (для React) | |
| time.sleep(0.15) | |
| # Быстрые селекторы в порядке приоритета | |
| selectors = [ | |
| '@placeholder=username@example.com', | |
| '@type=email', | |
| 'xpath://input[@data-testid="test-input"]', | |
| ] | |
| email_input = None | |
| for selector in selectors: | |
| try: | |
| email_input = self.page.ele(selector, timeout=0.5) | |
| if email_input: | |
| self._log(f"Found email field", selector) | |
| break | |
| except: | |
| pass | |
| if not email_input: | |
| self._debug_inputs() | |
| self.screenshot("error_no_email") | |
| raise Exception("Email field not found") | |
| # КРИТИЧНО: Используем fwcim_type для генерации правильных событий клавиатуры | |
| # FWCIM отслеживает keyCycles (keydown->keyup timing) - execCommand их не генерирует! | |
| # fast=True для скорости, но всё ещё генерирует события | |
| self._behavior.fwcim_type(self.page, email_input, email, field_type='email', fast=True) | |
| time.sleep(0.1) | |
| # Проверяем что ввелось правильно | |
| entered = email_input.attr('value') or '' | |
| if entered != email: | |
| print(f" [!] Email mismatch: expected '{email}', got '{entered}'") | |
| # Повторная попытка | |
| email_input.clear() | |
| email_input.input(email) | |
| return True | |
| def _debug_inputs(self): | |
| """Выводит отладочную информацию о input элементах""" | |
| print(" [S] Debug: searching for input elements...") | |
| try: | |
| inputs = self.page.eles('tag:input') | |
| for i, inp in enumerate(inputs[:5]): | |
| print(f" Input {i}: type={inp.attr('type')}, placeholder={inp.attr('placeholder')}") | |
| except Exception as e: | |
| print(f" Error: {e}") | |
| def click_continue(self) -> bool: | |
| """Нажимает кнопку Continue после email и ждёт страницу имени""" | |
| print("[->] Clicking Continue...") | |
| # Запоминаем URL до клика | |
| url_before = self.page.url | |
| # Быстрый клик Continue | |
| if not self._click_if_exists(SELECTORS['continue_btn'], timeout=1): | |
| raise Exception("Continue button not found") | |
| # ВАЖНО: Ждём загрузку страницы после клика | |
| try: | |
| self.page.wait.doc_loaded(timeout=10) | |
| except: | |
| pass | |
| # Ждём изменения URL или появления нового контента | |
| time.sleep(1) | |
| url_after = self.page.url | |
| print(f" [URL] Before: {url_before[:60]}...") | |
| print(f" [URL] After: {url_after[:60]}...") | |
| # Ждём пока страница profile.aws.amazon.com загрузится | |
| # Cookie popup появляется после загрузки | |
| if 'profile.aws.amazon.com' in self.page.url: | |
| print(" [...] Waiting for profile.aws.amazon.com to load...") | |
| for _ in range(10): | |
| time.sleep(0.5) | |
| # Ищем кнопку Accept которая появится после загрузки | |
| try: | |
| accept_btn = self.page.ele('text=Accept', timeout=0.3) | |
| if accept_btn: | |
| print(" [C] Found Accept button, clicking...") | |
| accept_btn.click() | |
| time.sleep(0.5) | |
| break | |
| except: | |
| pass | |
| # КРИТИЧНО: Принимаем cookie диалог - он появляется после клика Continue | |
| # и перекрывает страницу имени! Пробуем несколько раз с паузой | |
| for attempt in range(3): | |
| self._cookie_closed = False # Сбрасываем флаг чтобы попробовать снова | |
| if self.close_cookie_dialog(force=True): | |
| time.sleep(0.5) | |
| break | |
| time.sleep(0.5) | |
| # Ожидание страницы имени - ищем ТЕКСТ "Enter your name" или placeholder с "Silva" | |
| print(" [...] Waiting for name page...") | |
| name_page_selectors = [ | |
| 'text=Enter your name', # Заголовок страницы | |
| '@placeholder=Maria José Silva', # Placeholder поля имени | |
| 'xpath://input[contains(@placeholder, "Silva")]', | |
| 'xpath://input[@type="text"]', # Любое текстовое поле | |
| ] | |
| start_time = time.time() | |
| timeout = 20 # Увеличено для медленных соединений | |
| cookie_retry = 0 | |
| last_debug = 0 | |
| while time.time() - start_time < timeout: | |
| # Периодически пробуем принять cookie (может появиться с задержкой) | |
| if cookie_retry < 5 and time.time() - start_time > cookie_retry * 2: | |
| self.close_cookie_dialog(force=True) | |
| cookie_retry += 1 | |
| # Debug каждые 5 секунд | |
| elapsed = time.time() - start_time | |
| if elapsed - last_debug > 5: | |
| print(f" [DEBUG] {elapsed:.0f}s - URL: {self.page.url[:50]}...") | |
| # Показываем что есть на странице | |
| try: | |
| title = self.page.title | |
| print(f" [DEBUG] Title: {title}") | |
| # Ищем любые заголовки | |
| h1 = self.page.ele('tag:h1', timeout=0.2) | |
| if h1: | |
| print(f" [DEBUG] H1: {h1.text[:50] if h1.text else 'empty'}") | |
| except: | |
| pass | |
| last_debug = elapsed | |
| for selector in name_page_selectors: | |
| try: | |
| if self.page.ele(selector, timeout=0.3): | |
| elapsed = time.time() - start_time | |
| print(f" [OK] Name page loaded in {elapsed:.2f}s") | |
| # Collect scripts after page transition | |
| self._collect_scripts() | |
| return True | |
| except: | |
| pass | |
| # Финальная отладка перед ошибкой | |
| print(" [X] FAILED: Name page did not load!") | |
| print(f" [DEBUG] Final URL: {self.page.url}") | |
| try: | |
| print(f" [DEBUG] Final Title: {self.page.title}") | |
| body_text = self.page.ele('tag:body').text[:200] if self.page.ele('tag:body') else 'no body' | |
| print(f" [DEBUG] Body text: {body_text}") | |
| except Exception as e: | |
| print(f" [DEBUG] Error getting page info: {e}") | |
| self.screenshot("error_no_name_page") | |
| raise Exception("Name page did not load after email") | |
| def enter_name(self, name: str) -> bool: | |
| """Вводит имя. Оптимизировано для скорости.""" | |
| print(f"[N] Entering name: {name}") | |
| record('enter_name', {'name': name}, self.page, screenshot=False) | |
| # КРИТИЧНО: Закрываем cookie диалог ПЕРЕД поиском поля | |
| self._hide_cookie_banner() | |
| # ВАЖНО: Проверяем что мы на странице имени, а не email! | |
| # Ищем текст "Enter your name" или placeholder с "Silva" | |
| on_name_page = False | |
| for _ in range(10): | |
| try: | |
| if self.page.ele('text=Enter your name', timeout=0.3): | |
| on_name_page = True | |
| break | |
| if self.page.ele('@placeholder=Maria José Silva', timeout=0.3): | |
| on_name_page = True | |
| break | |
| except: | |
| pass | |
| time.sleep(0.3) | |
| if not on_name_page: | |
| print(" [!] WARNING: Not on name page! Current URL:", self.page.url[:60]) | |
| # Попробуем ещё раз кликнуть Continue и подождать | |
| self._click_if_exists(SELECTORS['continue_btn'], timeout=1) | |
| time.sleep(2) | |
| # Проверяем ещё раз | |
| for _ in range(5): | |
| try: | |
| if self.page.ele('@placeholder=Maria José Silva', timeout=0.5): | |
| on_name_page = True | |
| print(" [OK] Now on name page after retry") | |
| break | |
| except: | |
| pass | |
| time.sleep(0.5) | |
| if not on_name_page: | |
| print(" [X] Still not on name page, aborting!") | |
| self.screenshot("error_not_on_name_page") | |
| raise Exception("Failed to navigate to name page") | |
| name_input = None | |
| start_time = time.time() | |
| # ПРИОРИТЕТ 1: Ищем по placeholder "Maria José Silva" - это точно поле имени | |
| try: | |
| name_input = self.page.ele('@placeholder=Maria José Silva', timeout=0.5) | |
| if name_input: | |
| print(f" Found name field via placeholder (Maria José Silva)") | |
| except: | |
| pass | |
| # Fallback 1: placeholder с "Silva" (уникальный для страницы имени) | |
| if not name_input: | |
| try: | |
| name_input = self.page.ele('xpath://input[contains(@placeholder, "Silva")]', timeout=0.3) | |
| if name_input: | |
| print(f" Found name field via placeholder: Silva") | |
| except: | |
| pass | |
| # Fallback 2: data-testid | |
| if not name_input: | |
| try: | |
| name_input = self.page.ele('@data-testid=name-input', timeout=0.2) | |
| if name_input: | |
| print(f" Found name field via data-testid") | |
| except: | |
| pass | |
| # Fallback 3: CSS селектор (ТОЛЬКО если на странице имени!) | |
| if not name_input and on_name_page: | |
| try: | |
| name_input = self.page.ele('css:input[type="text"]:not([hidden])', timeout=0.3) | |
| if name_input: | |
| print(f" Found name field via CSS (fast)") | |
| except: | |
| pass | |
| elapsed = time.time() - start_time | |
| if elapsed > 0.5: | |
| print(f" [!] Name field search took {elapsed:.2f}s") | |
| if not name_input: | |
| print(" [X] Name field not found!") | |
| return False | |
| # КРИТИЧНО: Используем fwcim_type для генерации правильных событий клавиатуры | |
| # FWCIM отслеживает keyCycles (keydown->keyup timing) - execCommand их не генерирует! | |
| self._behavior.fwcim_type(self.page, name_input, name, field_type='name', fast=True) | |
| # Blur event после ввода | |
| self.page.run_js('arguments[0].dispatchEvent(new Event("blur", { bubbles: true }));', name_input) | |
| time.sleep(0.1) | |
| # Кликаем Continue | |
| print(" [->] Clicking Continue...") | |
| self._click_if_exists(SELECTORS['continue_btn'], timeout=1) | |
| # ВАЖНО: Ждём загрузку страницы после клика | |
| try: | |
| self.page.wait.doc_loaded(timeout=10) | |
| except: | |
| pass | |
| # Принимаем cookie если появился | |
| time.sleep(0.5) | |
| self.close_cookie_dialog(force=True) | |
| # Ожидание страницы верификации | |
| print(" [...] Waiting for verification page...") | |
| verification_selectors = [ | |
| 'text=Verify your email', | |
| 'text=Verification code', | |
| '@placeholder=6-digit', | |
| # German | |
| 'text=E-Mail-Adresse bestätigen', | |
| 'text=Verifizierungscode', | |
| '@placeholder=6-stellig', | |
| # Japanese | |
| 'text=メールアドレスを確認', | |
| 'text=確認コード', | |
| '@placeholder=6桁', | |
| # Spanish | |
| 'text=Verificar tu correo', | |
| 'text=Código de verificación', | |
| # French | |
| 'text=Vérifier votre e-mail', | |
| 'text=Code de vérification', | |
| # Generic - input field for 6-digit code | |
| 'css:input[maxlength="6"]', | |
| 'css:input[type="text"][placeholder*="6"]', | |
| ] | |
| start_time = time.time() | |
| timeout = 20 | |
| retry_count = 0 | |
| max_retries = 2 | |
| cookie_retry = 0 | |
| while time.time() - start_time < timeout: | |
| # Периодически принимаем cookie | |
| if cookie_retry < 5 and time.time() - start_time > cookie_retry * 2: | |
| self.close_cookie_dialog(force=True) | |
| cookie_retry += 1 | |
| # Проверяем на ошибку AWS | |
| if self._check_aws_error(): | |
| self._close_error_modal() | |
| if retry_count < max_retries: | |
| retry_count += 1 | |
| print(f" [R] Retry {retry_count}/{max_retries}") | |
| # Retry - ищем поле имени заново | |
| retry_input = self.page.ele('css:input[type="text"]:not([hidden])', timeout=1) | |
| if retry_input: | |
| # Используем fwcim_type для генерации правильных событий | |
| self._behavior.fwcim_type(self.page, retry_input, name, field_type='name', fast=True) | |
| self._click_if_exists(SELECTORS['continue_btn'], timeout=1) | |
| continue | |
| for selector in verification_selectors: | |
| try: | |
| if self.page.ele(selector, timeout=0.3): | |
| elapsed = time.time() - start_time | |
| print(f" [OK] Verification page loaded in {elapsed:.2f}s") | |
| # Collect scripts after page transition | |
| self._collect_scripts() | |
| return True | |
| except: | |
| pass | |
| # Отладка перед ошибкой | |
| print(" [X] FAILED: Verification page did not load!") | |
| print(f" [DEBUG] URL: {self.page.url}") | |
| try: | |
| print(f" [DEBUG] Title: {self.page.title}") | |
| except: | |
| pass | |
| self.screenshot("error_no_verification_page") | |
| raise Exception("Verification page did not load after entering name") | |
| def enter_verification_code(self, code: str) -> bool: | |
| """Вводит код верификации. Оптимизировано для скорости.""" | |
| print(f"[K] Entering code: {code}") | |
| record('enter_code', {'code': code}, self.page) | |
| # Закрываем cookie один раз | |
| self.close_cookie_dialog(force=True) | |
| code_input = self._find_element(SELECTORS['code_input'], timeout=10) | |
| if not code_input: | |
| raise Exception("Verification code field not found") | |
| # КРИТИЧНО: Используем fwcim_type для генерации правильных событий клавиатуры | |
| # FWCIM отслеживает keyCycles (keydown->keyup timing) - execCommand их не генерирует! | |
| # Код верификации - критичное поле, используем fast=True для скорости | |
| self._behavior.fwcim_type(self.page, code_input, code, field_type='code', fast=True) | |
| time.sleep(0.1) | |
| print(f" Entered code: '{code_input.attr('value') or ''}'") | |
| # Кликаем Continue/Verify с retry | |
| verify_selectors = [ | |
| '@data-testid=email-verification-verify-button', | |
| 'text=Verify', | |
| 'text=Continue', | |
| '@data-testid=test-primary-button', | |
| ] | |
| print(" [->] Clicking Verify/Continue...") | |
| clicked = False | |
| for attempt in range(3): | |
| for selector in verify_selectors: | |
| try: | |
| btn = self.page.ele(selector, timeout=0.5) | |
| if btn: | |
| self._behavior.human_js_click(self.page, btn, pre_delay=False) | |
| clicked = True | |
| print(f" [OK] Clicked: {selector}") | |
| break | |
| except: | |
| pass | |
| if clicked: | |
| break | |
| self._behavior.human_delay(0.15, 0.3) | |
| if not clicked: | |
| print(" [!] Verify button not found, trying Enter key...") | |
| try: | |
| self.page.run_cdp('Input.dispatchKeyEvent', type='keyDown', key='Enter', code='Enter', windowsVirtualKeyCode=13) | |
| self.page.run_cdp('Input.dispatchKeyEvent', type='keyUp', key='Enter', code='Enter', windowsVirtualKeyCode=13) | |
| except: | |
| pass | |
| # Оптимизированное ожидание страницы пароля | |
| print(" [...] Waiting for password page...") | |
| password_selectors = [ | |
| 'text=Create your password', | |
| 'text=Set your password', | |
| '@placeholder=Enter password', | |
| # German | |
| 'text=Ihr Passwort erstellen', | |
| '@placeholder=Passwort eingeben', | |
| # Japanese | |
| 'text=パスワードを作成', | |
| '@placeholder=パスワードを入力', | |
| # Spanish | |
| 'text=Crea tu contraseña', | |
| # French | |
| 'text=Créer votre mot de passe', | |
| # Generic | |
| 'css:input[type="password"]', | |
| ] | |
| start_time = time.time() | |
| timeout = 15 # Увеличено для надёжности | |
| while time.time() - start_time < timeout: | |
| for selector in password_selectors: | |
| try: | |
| if self.page.ele(selector, timeout=0.1): | |
| elapsed = time.time() - start_time | |
| print(f" [OK] Password page loaded in {elapsed:.2f}s") | |
| # Collect scripts after page transition | |
| self._collect_scripts() | |
| return True | |
| except: | |
| pass | |
| # Проверяем на ошибку AWS | |
| if self._check_aws_error(): | |
| print(" [!] AWS error detected, closing modal...") | |
| self._close_error_modal() | |
| time.sleep(0.3) | |
| print(" [X] FAILED: Password page did not load!") | |
| self.screenshot("error_no_password_page") | |
| raise Exception("Password page did not load after verification code") | |
| def enter_password(self, password: str) -> bool: | |
| """Вводит и подтверждает пароль""" | |
| print("[KEY] Entering password...") | |
| record('enter_password', {'length': len(password)}, self.page, screenshot=False) # Скриншот отключен - таймаутится | |
| step_start = time.time() | |
| # Быстрое ожидание контекста | |
| self.wait_for_page_context('password', timeout=5) | |
| time.sleep(0.15) # Минимальная пауза для React | |
| # Ищем password поля | |
| pwd_fields = self.page.eles('tag:input@@type=password', timeout=3) | |
| print(f" Found {len(pwd_fields)} password fields ({time.time() - step_start:.1f}s)") | |
| pwd1, pwd2 = None, None | |
| # Быстрая стратегия определения полей | |
| for field in pwd_fields: | |
| ph = (field.attr('placeholder') or '').lower() | |
| if 're-enter' in ph or 'confirm' in ph: | |
| pwd2 = field | |
| elif not pwd1: | |
| pwd1 = field | |
| if not pwd1 and pwd_fields: | |
| pwd1 = pwd_fields[0] | |
| if not pwd2 and len(pwd_fields) >= 2: | |
| pwd2 = pwd_fields[1] | |
| if not pwd1: | |
| print(" [!] No password fields found!") | |
| self.screenshot("error_no_password") | |
| return False | |
| # Ввод пароля с человеческим поведением (медленно - вспоминаем пароль) | |
| print(" Entering password...") | |
| typing_start = time.time() | |
| self.human_type(pwd1, password, field_type='password') | |
| print(f" Password entered ({time.time() - typing_start:.1f}s)") | |
| if pwd2: | |
| self._behavior.human_delay(0.2, 0.4) # Пауза перед повторным вводом | |
| print(" Confirming password...") | |
| confirm_start = time.time() | |
| self.human_type(pwd2, password, field_type='password') | |
| # Проверяем что оба пароля одинаковые через JavaScript | |
| time.sleep(0.3) # Даём время React обновить состояние | |
| pwd1_value = self.page.run_js('return arguments[0].value || ""', pwd1) | |
| pwd2_value = self.page.run_js('return arguments[0].value || ""', pwd2) | |
| # Также проверяем наличие ошибки на странице | |
| password_error = self._check_password_mismatch_error() | |
| if pwd1_value != pwd2_value or password_error: | |
| print(f" [!] Password mismatch detected! Clearing and re-entering...") | |
| # Очищаем оба поля через JavaScript | |
| self.page.run_js('arguments[0].value = ""; arguments[0].dispatchEvent(new Event("input", {bubbles: true}))', pwd1) | |
| self.page.run_js('arguments[0].value = ""; arguments[0].dispatchEvent(new Event("input", {bubbles: true}))', pwd2) | |
| time.sleep(0.2) | |
| # Вводим заново - используем быстрый режим для надёжности | |
| self.human_type(pwd1, password, field_type='password', fast=True) | |
| self._behavior.human_delay(0.3, 0.5) | |
| self.human_type(pwd2, password, field_type='password', fast=True) | |
| # Проверяем ещё раз | |
| time.sleep(0.3) | |
| pwd1_value = self.page.run_js('return arguments[0].value || ""', pwd1) | |
| pwd2_value = self.page.run_js('return arguments[0].value || ""', pwd2) | |
| password_error = self._check_password_mismatch_error() | |
| if pwd1_value != pwd2_value or password_error: | |
| print(f" [!] Password still mismatched after retry!") | |
| self.screenshot("error_password_mismatch") | |
| else: | |
| print(f" [OK] Passwords match after retry") | |
| print(f" Password confirmed ({time.time() - confirm_start:.1f}s)") | |
| time.sleep(0.15) | |
| # Проверяем наличие CAPTCHA перед кликом Continue | |
| if self._check_captcha(): | |
| print(" [!] CAPTCHA detected on password page!") | |
| self.screenshot("captcha_detected") | |
| # Ждём ручного решения капчи | |
| if not self._handle_captcha(timeout=120): | |
| print(" [!] CAPTCHA solving failed or timeout") | |
| return False | |
| print(" [OK] CAPTCHA solved, continuing...") | |
| print(f"[->] Clicking Continue... (total input time: {time.time() - step_start:.1f}s)") | |
| old_url = self.page.url | |
| click_time = time.time() | |
| self._click_if_exists(SELECTORS['continue_btn'], timeout=1) | |
| # Ждём немного и проверяем результат | |
| time.sleep(1) | |
| # Проверяем появилась ли капча после клика Continue | |
| if self._check_captcha(): | |
| print(" [!] CAPTCHA appeared after Continue click!") | |
| if not self._handle_captcha(timeout=120): | |
| print(" [!] CAPTCHA solving failed") | |
| return False | |
| # После решения капчи кликаем Continue снова | |
| print("[->] Clicking Continue after captcha...") | |
| self._click_if_exists(SELECTORS['continue_btn'], timeout=1) | |
| time.sleep(1) | |
| # Проверяем на ошибку "Invalid captcha" | |
| if self._check_captcha_error(): | |
| print(" [!] Invalid captcha error - captcha was rejected by server") | |
| self.screenshot("captcha_invalid") | |
| # Пробуем ещё раз с новой капчей | |
| print(" [R] Retrying with new captcha...") | |
| time.sleep(1) | |
| if self._check_captcha(): | |
| if not self._handle_captcha(timeout=120): | |
| return False | |
| self._click_if_exists(SELECTORS['continue_btn'], timeout=1) | |
| time.sleep(1) | |
| if self._check_captcha_error(): | |
| print(" [!] Captcha rejected again") | |
| return False | |
| if self._check_password_error(): | |
| print(" [!] Password rejected, generating new one...") | |
| return self.enter_password(self.generate_password(18)) | |
| # Ждём перехода - AWS может долго редиректить (показывает "Redirecting...") | |
| # Таймаут настраивается в config.timeouts.password_redirect | |
| password_redirect_timeout = self.settings.get('timeouts', {}).get('password_redirect', 60) | |
| print(f" [...] Waiting for redirect after password (timeout: {password_redirect_timeout}s)...") | |
| # Логируем прогресс ожидания | |
| # Ждём редирект на view.awsapps.com или callback URL | |
| redirect_start = time.time() | |
| last_log = 0 | |
| last_url = old_url | |
| workflow_success_detected = False | |
| while time.time() - redirect_start < password_redirect_timeout: | |
| current_url = self.page.url | |
| # Логируем изменение URL | |
| if current_url != last_url: | |
| elapsed = time.time() - redirect_start | |
| print(f" [{elapsed:.1f}s] URL: {current_url[:60]}...") | |
| last_url = current_url | |
| # Успешный редирект на Allow access или callback | |
| if 'view.awsapps.com' in current_url or 'awsapps.com/start' in current_url: | |
| elapsed = time.time() - redirect_start | |
| print(f" [OK] Redirected to awsapps.com after {elapsed:.1f}s") | |
| # Collect scripts after redirect | |
| self._collect_scripts() | |
| break | |
| if '127.0.0.1' in current_url and 'oauth/callback' in current_url: | |
| elapsed = time.time() - redirect_start | |
| print(f" [OK] Redirected to callback after {elapsed:.1f}s") | |
| # Collect scripts after redirect | |
| self._collect_scripts() | |
| break | |
| # КРИТИЧНО: Проверяем cookie workflow-step-id | |
| # AWS устанавливает его в "end-of-workflow-success" когда регистрация завершена | |
| # но JS на странице может не сработать из-за спуфинга | |
| if not workflow_success_detected: | |
| try: | |
| workflow_cookie = self.page.run_js(''' | |
| const cookies = document.cookie.split(';'); | |
| for (const c of cookies) { | |
| const [name, value] = c.trim().split('='); | |
| if (name === 'workflow-step-id') return decodeURIComponent(value); | |
| } | |
| return null; | |
| ''') | |
| if workflow_cookie == 'end-of-workflow-success': | |
| elapsed = time.time() - redirect_start | |
| print(f" [!] Cookie 'workflow-step-id=end-of-workflow-success' detected at {elapsed:.1f}s") | |
| print(f" [!] AWS registration complete but page stuck - forcing navigation...") | |
| workflow_success_detected = True | |
| # Пробуем найти redirect URL в странице или cookies | |
| redirect_url = self.page.run_js(r''' | |
| // Ищем redirect URL в meta refresh | |
| const meta = document.querySelector('meta[http-equiv="refresh"]'); | |
| if (meta) { | |
| const content = meta.getAttribute('content'); | |
| const match = content.match(/url=(.+)/i); | |
| if (match) return match[1]; | |
| } | |
| // Ищем в скриптах | |
| const scripts = document.querySelectorAll('script'); | |
| for (const s of scripts) { | |
| const text = s.textContent; | |
| const match = text.match(/window\.location\s*=\s*["']([^"']+)["']/); | |
| if (match) return match[1]; | |
| } | |
| // Ищем ссылку на awsapps | |
| const links = document.querySelectorAll('a[href*="awsapps"]'); | |
| if (links.length > 0) return links[0].href; | |
| return null; | |
| ''') | |
| if redirect_url: | |
| print(f" [>] Found redirect URL: {redirect_url[:60]}...") | |
| self.page.get(redirect_url) | |
| time.sleep(1) | |
| continue | |
| # Если не нашли URL - пробуем стандартный путь | |
| # Получаем workflowResultHandle из URL | |
| import re | |
| result_match = re.search(r'workflowResultHandle=([^&]+)', current_url) | |
| if result_match: | |
| result_handle = result_match.group(1) | |
| # Пробуем перейти на view.awsapps.com напрямую | |
| awsapps_url = f"https://view.awsapps.com/start/#/?workflowResultHandle={result_handle}" | |
| print(f" [>] Trying direct navigation to awsapps...") | |
| self.page.get(awsapps_url) | |
| time.sleep(2) | |
| continue | |
| except Exception as e: | |
| pass | |
| # Логируем каждые 10 секунд | |
| elapsed = int(time.time() - redirect_start) | |
| if elapsed > 0 and elapsed % 10 == 0 and elapsed != last_log: | |
| print(f" [...] Still waiting... ({elapsed}s)") | |
| last_log = elapsed | |
| # Проверяем текст на странице | |
| try: | |
| page_text = self.page.run_js('document.body.innerText.substring(0, 200)') | |
| if 'Redirecting' in page_text: | |
| print(f" [i] AWS shows 'Redirecting...' - page may be stuck") | |
| # Если страница застряла на Redirecting больше 15 секунд - пробуем refresh | |
| if elapsed > 15 and not workflow_success_detected: | |
| print(f" [!] Page stuck on Redirecting, trying page refresh...") | |
| self.page.refresh() | |
| time.sleep(2) | |
| except: | |
| pass | |
| time.sleep(0.2) | |
| total_time = time.time() - step_start | |
| self._log(f"URL after password ({total_time:.1f}s total)", self.page.url[:60]) | |
| return True | |
| def _check_captcha(self) -> bool: | |
| """Проверяет наличие CAPTCHA на странице""" | |
| captcha_indicators = [ | |
| 'text=Security check', | |
| 'text=Security Verification', | |
| 'text=Please click verify', | |
| 'text=solve the challenge', # AWS puzzle captcha | |
| 'text=Verify you are human', | |
| '#captcha', | |
| '[data-testid="ams-captcha"]', | |
| '.wOjKx2rsiwhq8wvEYj3p', # AWS captcha container class | |
| 'canvas', # Puzzle captcha uses canvas | |
| 'iframe[src*="captcha"]', | |
| ] | |
| for selector in captcha_indicators: | |
| try: | |
| if self.page.ele(selector, timeout=0.3): | |
| return True | |
| except: | |
| pass | |
| return False | |
| def _check_captcha_error(self) -> bool: | |
| """Проверяет наличие ошибки Invalid captcha""" | |
| try: | |
| if self.page.ele('text=Invalid captcha', timeout=0.5): | |
| return True | |
| except: | |
| pass | |
| return False | |
| def _handle_captcha(self, timeout: int = 120) -> bool: | |
| """ | |
| Обрабатывает CAPTCHA на странице. | |
| AWS использует визуальную капчу (puzzle) которую нужно решить вручную. | |
| Args: | |
| timeout: Время ожидания решения в секундах (по умолчанию 120) | |
| Returns: | |
| True если капча решена, False если не удалось | |
| """ | |
| print(" [CAPTCHA] ========================================") | |
| print(" [CAPTCHA] CAPTCHA DETECTED - MANUAL SOLUTION REQUIRED") | |
| print(" [CAPTCHA] ========================================") | |
| # Делаем скриншот капчи для анализа | |
| self.screenshot("captcha_puzzle") | |
| # Если headless - капчу не решить | |
| if self.headless: | |
| print(" [!] Cannot solve CAPTCHA in headless mode!") | |
| print(" [!] Restart with --no-headless flag") | |
| return False | |
| # Выводим инструкции | |
| print(f" [CAPTCHA] Please solve the captcha in the browser window") | |
| print(f" [CAPTCHA] Waiting up to {timeout} seconds...") | |
| print(" [CAPTCHA] ----------------------------------------") | |
| # Ждём пока пользователь решит капчу | |
| start_time = time.time() | |
| last_status = "" | |
| while time.time() - start_time < timeout: | |
| elapsed = int(time.time() - start_time) | |
| # Проверяем Success | |
| try: | |
| success_elem = self.page.ele('text=Success', timeout=0.3) | |
| if success_elem: | |
| print(" [CAPTCHA] [OK] Success detected!") | |
| time.sleep(0.5) | |
| return True | |
| except: | |
| pass | |
| # Проверяем что капча исчезла (страница перешла дальше) | |
| if not self._check_captcha(): | |
| # Дополнительная проверка - нет ли ошибки Invalid captcha | |
| if not self._check_captcha_error(): | |
| print(" [CAPTCHA] [OK] Captcha solved!") | |
| return True | |
| # Проверяем ошибку Invalid captcha | |
| if self._check_captcha_error(): | |
| if last_status != "invalid": | |
| print(" [CAPTCHA] [!] Invalid captcha - please try again") | |
| last_status = "invalid" | |
| # Выводим статус каждые 10 секунд | |
| if elapsed > 0 and elapsed % 10 == 0 and last_status != str(elapsed): | |
| remaining = timeout - elapsed | |
| print(f" [CAPTCHA] Waiting... {remaining}s remaining") | |
| last_status = str(elapsed) | |
| time.sleep(0.5) | |
| print(" [CAPTCHA] [!] Timeout - captcha not solved") | |
| return False | |
| def _check_password_error(self) -> bool: | |
| """Проверяет наличие ошибки о слабом/утёкшем пароле""" | |
| error_texts = [ | |
| 'publicly known', | |
| 'leaked', | |
| 'data set', | |
| 'try a different password', | |
| 'password is too weak', | |
| ] | |
| for text in error_texts: | |
| try: | |
| if self.page.ele(f'text={text}', timeout=0.5): | |
| return True | |
| except: | |
| pass | |
| return False | |
| def _check_password_mismatch_error(self) -> bool: | |
| """Проверяет наличие ошибки о несовпадении паролей""" | |
| error_texts = [ | |
| 'Passwords must match', | |
| 'Passwörter müssen übereinstimmen', # German | |
| 'パスワードが一致しません', # Japanese | |
| 'Las contraseñas deben coincidir', # Spanish | |
| 'Les mots de passe doivent correspondre', # French | |
| 'passwords do not match', | |
| 'password mismatch', | |
| ] | |
| for text in error_texts: | |
| try: | |
| if self.page.ele(f'text={text}', timeout=0.3): | |
| return True | |
| except: | |
| pass | |
| return False | |
| def has_login_form(self) -> bool: | |
| """Проверяет есть ли форма логина на странице""" | |
| try: | |
| # Ищем поле email или password на странице логина | |
| selectors = [ | |
| 'input[type="email"]', | |
| 'input[placeholder*="email" i]', | |
| 'input[placeholder*="username" i]', | |
| ] | |
| for selector in selectors: | |
| elements = self.page.query_selector_all(selector) | |
| if elements: | |
| return True | |
| return False | |
| except: | |
| return False | |
| def login_with_credentials(self, email: str, password: str) -> bool: | |
| """Логинится с email и паролем""" | |
| print(f"[K] Logging in as {email}...") | |
| try: | |
| self.close_cookie_dialog() | |
| # Вводим email | |
| email_selectors = [ | |
| 'input[type="email"]', | |
| 'input[placeholder*="email" i]', | |
| 'input[placeholder*="username" i]', | |
| 'input[name="email"]', | |
| ] | |
| email_field = None | |
| for selector in email_selectors: | |
| email_field = self.page.query_selector(selector) | |
| if email_field: | |
| break | |
| if email_field: | |
| self._behavior.human_click(email_field) | |
| email_field.fill(email) | |
| print(f" [OK] Email entered") | |
| # Нажимаем Continue если есть | |
| continue_btn = self.page.query_selector('button:has-text("Continue")') | |
| if continue_btn: | |
| self._behavior.human_click(continue_btn) | |
| self._behavior.human_delay(1.5, 2.5) | |
| # Вводим пароль | |
| password_selectors = [ | |
| 'input[type="password"]', | |
| 'input[placeholder*="password" i]', | |
| ] | |
| password_field = None | |
| for selector in password_selectors: | |
| password_field = self.page.query_selector(selector) | |
| if password_field: | |
| break | |
| if password_field: | |
| self._behavior.human_click(password_field) | |
| password_field.fill(password) | |
| print(f" [OK] Password entered") | |
| # Нажимаем Sign in / Continue | |
| sign_in_btn = self.page.query_selector('button:has-text("Sign in"), button:has-text("Continue")') | |
| if sign_in_btn: | |
| self._behavior.human_click(sign_in_btn) | |
| self._behavior.human_delay(2.0, 3.5) | |
| print(f" [OK] Logged in") | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f" [X] Login error: {e}") | |
| return False | |
| def click_confirm_and_continue(self) -> bool: | |
| """Нажимает Confirm and continue на странице device authorization""" | |
| print("[S] Looking for Confirm and continue button...") | |
| selectors = [ | |
| 'text=Confirm and continue', | |
| '@data-testid=confirm-button', | |
| 'xpath://button[contains(text(), "Confirm")]', | |
| 'tag:button@@text()=Confirm and continue', | |
| ] | |
| for attempt in range(5): | |
| for selector in selectors: | |
| try: | |
| btn = self.page.ele(selector, timeout=1) | |
| if btn: | |
| print(f" [OK] Found Confirm button (attempt {attempt + 1})") | |
| self._behavior.human_js_click(self.page, btn) | |
| return True | |
| except: | |
| pass | |
| self._behavior.human_delay(0.4, 0.7) | |
| print(" [!] Confirm and continue button not found") | |
| return False | |
| def click_allow_access(self) -> bool: | |
| """Нажимает Allow access. ОПТИМИЗИРОВАНО: быстрый поиск и клик.""" | |
| print("[OK] Looking for Allow access button...") | |
| print(f" Current URL: {self.page.url[:80]}...") | |
| record('click_allow_access', {'url': self.page.url}, self.page) | |
| # Скрываем cookie через CSS (мгновенно, без проверок) | |
| self._hide_cookie_banner() | |
| # ОПТИМИЗАЦИЯ: Ждём кнопку напрямую, без отдельного ожидания страницы | |
| # Используем waitUntil с коротким polling | |
| btn = None | |
| btn_selector = None # Сохраняем селектор для повторного поиска | |
| start_time = time.time() | |
| max_wait = 5 # Уменьшено с 10 итераций по 0.3с | |
| # Приоритетные селекторы - ТОЛЬКО специфичные для Allow access | |
| # НЕ используем общие селекторы типа button[type="submit"] - они находят cookie кнопки! | |
| fast_selectors = [ | |
| '@data-testid=allow-access-button', | |
| 'css:button[data-testid="allow-access-button"]', | |
| 'text=Allow access', # Точный текст | |
| ] | |
| fallback_selectors = [ | |
| # AWS UI специфичные | |
| 'css:button.awsui_button_vjswe_146je_157', # AWS UI button class | |
| 'xpath://button[contains(@class, "awsui") and contains(text(), "Allow")]', | |
| # Текстовые селекторы | |
| 'text=Allow', | |
| 'text=Authorize', | |
| 'xpath://button[contains(text(), "Allow access")]', | |
| 'xpath://button[contains(text(), "Allow")]', | |
| 'xpath://button[contains(text(), "Authorize")]', | |
| # Fallback для нового flow | |
| '@data-testid=confirm-button', | |
| '@data-testid=accept-button', | |
| # Общие селекторы ТОЛЬКО в конце | |
| 'css:button.awsui-button-variant-primary', | |
| 'css:button[class*="variant-primary"]', | |
| ] | |
| # Фаза 1: Быстрый поиск по data-testid (0.5 сек макс) | |
| for selector in fast_selectors: | |
| try: | |
| btn = self.page.ele(selector, timeout=0.25) | |
| if btn: | |
| btn_selector = selector | |
| print(f" [OK] Found button via {selector[:30]}") | |
| break | |
| except: | |
| pass | |
| # Фаза 2: Fallback селекторы если нужно | |
| if not btn: | |
| for _ in range(20): # ~5 сек максимум | |
| for selector in fallback_selectors: | |
| try: | |
| btn = self.page.ele(selector, timeout=0.15) | |
| if btn: | |
| btn_selector = selector | |
| print(f" [OK] Found button via fallback") | |
| break | |
| except: | |
| pass | |
| if btn: | |
| break | |
| time.sleep(0.1) | |
| if not btn: | |
| print(" [!] Allow access button not found") | |
| self.screenshot("error_no_allow_button") | |
| return False | |
| # Логируем что нашли | |
| try: | |
| btn_text = btn.text or btn.attr('value') or 'no text' | |
| btn_class = btn.attr('class') or 'no class' | |
| print(f" [DEBUG] Button text: '{btn_text}', class: '{btn_class[:50]}'") | |
| except: | |
| pass | |
| # Ждём пока страница стабилизируется | |
| time.sleep(1) | |
| # Клик с разными методами | |
| for attempt in range(3): | |
| # Проверяем disabled (с защитой от NoneElement) | |
| try: | |
| if btn and btn.attr('disabled'): | |
| print(f" [!] Button is disabled, waiting...") | |
| time.sleep(1) | |
| # Перезапрашиваем элемент | |
| if btn_selector: | |
| btn = self.page.ele(btn_selector, timeout=0.5) | |
| continue | |
| except: | |
| pass | |
| print(f"[UNLOCK] Clicking Allow access (attempt {attempt + 1})...") | |
| # Метод 1: Прямой клик через DrissionPage | |
| try: | |
| btn.click() | |
| time.sleep(0.5) | |
| if '127.0.0.1' in self.page.url: | |
| print(" [OK] Redirected to callback!") | |
| # Collect scripts after redirect | |
| self._collect_scripts() | |
| return True | |
| except Exception as e: | |
| print(f" [!] Direct click failed: {e}") | |
| # Метод 2: JS click | |
| try: | |
| self.page.run_js('arguments[0].click()', btn) | |
| time.sleep(0.5) | |
| if '127.0.0.1' in self.page.url: | |
| print(" [OK] Redirected to callback!") | |
| # Collect scripts after redirect | |
| self._collect_scripts() | |
| return True | |
| except Exception as e: | |
| print(f" [!] JS click failed: {e}") | |
| # Метод 3: Behavior click | |
| try: | |
| self._behavior.human_js_click(self.page, btn) | |
| time.sleep(0.5) | |
| if '127.0.0.1' in self.page.url: | |
| print(" [OK] Redirected to callback!") | |
| # Collect scripts after redirect | |
| self._collect_scripts() | |
| return True | |
| except: | |
| pass | |
| # Перезапрашиваем элемент (избегаем stale) | |
| if btn_selector: | |
| try: | |
| btn = self.page.ele(btn_selector, timeout=0.5) | |
| except: | |
| pass | |
| time.sleep(1) | |
| # Последняя проверка | |
| if '127.0.0.1' in self.page.url: | |
| return True | |
| print(" [!] Allow access button didn't work") | |
| self.screenshot("error_allow_access") | |
| return False | |
| def wait_for_callback(self, timeout: int = None) -> bool: | |
| """Ждёт редиректа на callback""" | |
| timeout = timeout or self.settings.get('timeouts', {}).get('oauth_callback', 60) | |
| print(f"[...] Waiting for callback redirect ({timeout}s)...") | |
| for _ in range(timeout): | |
| current_url = self.page.url | |
| if '127.0.0.1' in current_url and 'oauth/callback' in current_url: | |
| return True | |
| time.sleep(1) | |
| return False | |
| def current_url(self) -> str: | |
| return self.page.url | |
| def prewarm(self): | |
| """ | |
| Прогрев браузера - посещаем несколько сайтов перед AWS. | |
| Создаёт реальную историю и выглядит естественнее. | |
| """ | |
| print("[W] Warming up browser...") | |
| # Сайты для прогрева (быстрые, популярные) | |
| warmup_sites = [ | |
| 'https://www.google.com', | |
| 'https://github.com', | |
| ] | |
| # Выбираем 1-2 случайных сайта | |
| sites_to_visit = random.sample(warmup_sites, k=random.randint(1, 2)) | |
| for site in sites_to_visit: | |
| try: | |
| print(f" Visiting {site}...") | |
| self.page.get(site) | |
| # Ждём загрузку | |
| try: | |
| self.page.wait.doc_loaded(timeout=5) | |
| except: | |
| pass | |
| # Имитируем просмотр (1-3 сек) | |
| self._behavior.simulate_page_reading(self.page, duration=random.uniform(1.0, 3.0)) | |
| # Случайный скролл | |
| if random.random() < 0.5: | |
| self._behavior.scroll_page(self, direction='down', amount=random.randint(100, 300)) | |
| except Exception as e: | |
| print(f" [!] Warmup failed for {site}: {e}") | |
| print(" [OK] Browser warmed up") | |
| def navigate(self, url: str): | |
| """Переход по URL.""" | |
| print(f"[>] Opening page...") | |
| record('navigate', {'url': url}, self.page) | |
| self.page.get(url) | |
| # Ждём загрузку документа | |
| print(" [...] Waiting for page load...") | |
| try: | |
| self.page.wait.doc_loaded(timeout=8) | |
| except: | |
| pass | |
| # Ждём появления элементов страницы (email input или заголовок) | |
| page_ready = False | |
| for _ in range(20): # 2 секунды максимум | |
| try: | |
| if self.page.ele('@placeholder=username@example.com', timeout=0.1): | |
| page_ready = True | |
| break | |
| if self.page.ele('text=Get started', timeout=0.05): | |
| page_ready = True | |
| break | |
| except: | |
| pass | |
| if page_ready: | |
| print(f" [OK] Page elements loaded") | |
| else: | |
| print(f" [!] Page elements not detected, continuing anyway") | |
| # Collect JS scripts from this page | |
| self._collect_scripts() | |
| # Скрываем cookie | |
| self._hide_cookie_banner() | |
| def check_aws_error(self) -> bool: | |
| """Проверяет наличие ошибки AWS""" | |
| try: | |
| error_text = self.page.ele("text=It's not you, it's us", timeout=1) | |
| if error_text: | |
| print("[!] AWS temporary error, need to wait and retry") | |
| return True | |
| except: | |
| pass | |
| return False | |
| def _check_aws_error(self) -> bool: | |
| """Проверяет наличие ошибки AWS на странице (модальное окно)""" | |
| error_texts = [ | |
| 'error processing your request', | |
| 'Please try again', | |
| "It's not you, it's us", | |
| 'Something went wrong', | |
| ] | |
| for text in error_texts: | |
| try: | |
| if self.page.ele(f'text={text}', timeout=0.5): | |
| return True | |
| except: | |
| pass | |
| return False | |
| def _close_error_modal(self) -> bool: | |
| """Закрывает модальное окно с ошибкой AWS""" | |
| print(" [!] Attempting to close error modal...") | |
| # AWS Cloudscape UI - специфичные селекторы из HTML | |
| close_selectors = [ | |
| # Точный селектор из HTML - кнопка Close alert | |
| 'xpath://button[@aria-label="Close alert"]', | |
| 'xpath://button[contains(@class, "awsui_dismiss-button")]', | |
| 'xpath://button[contains(@class, "dismiss-button")]', | |
| # AWS Cloudscape/Polaris UI | |
| 'xpath://button[contains(@class, "awsui_dismiss")]', | |
| 'xpath://*[contains(@class, "awsui_alert")]//button', | |
| # Fallback селекторы | |
| 'xpath://button[@aria-label="Close"]', | |
| 'xpath://button[@aria-label="Dismiss"]', | |
| 'aria:Close alert', | |
| 'aria:Close', | |
| 'aria:Dismiss', | |
| ] | |
| for selector in close_selectors: | |
| try: | |
| btn = self.page.ele(selector, timeout=0.3) | |
| if btn: | |
| print(f" [!] Found close button: {selector}") | |
| self._behavior.human_js_click(self.page, btn) | |
| # Проверяем что модалка закрылась | |
| if not self._check_aws_error(): | |
| print(" [OK] Error modal closed") | |
| return True | |
| except: | |
| pass | |
| # Fallback: нажимаем Escape | |
| try: | |
| print(" [!] Trying Escape key...") | |
| self._behavior.human_delay(0.1, 0.3) | |
| self.page.run_cdp('Input.dispatchKeyEvent', type='keyDown', key='Escape', code='Escape', windowsVirtualKeyCode=27) | |
| self.page.run_cdp('Input.dispatchKeyEvent', type='keyUp', key='Escape', code='Escape', windowsVirtualKeyCode=27) | |
| self._behavior.human_delay(0.3, 0.6) | |
| if not self._check_aws_error(): | |
| print(" [OK] Error modal closed via Escape") | |
| return True | |
| except: | |
| pass | |
| # Последний fallback: кликаем вне модалки | |
| try: | |
| print(" [!] Trying click outside modal...") | |
| self._behavior.human_delay(0.1, 0.2) | |
| self.page.run_js('document.body.click()') | |
| self._behavior.human_delay(0.2, 0.4) | |
| except: | |
| pass | |
| return False | |
| def screenshot(self, name: str = "debug") -> Optional[str]: | |
| """Сохраняет скриншот для отладки""" | |
| if not self.screenshots_on_error: | |
| return None | |
| try: | |
| filename = str(BASE_DIR / f"{name}_{int(time.time())}.png") | |
| self.page.get_screenshot(path=filename) | |
| print(f"[SCREENSHOT] Screenshot: {filename}") | |
| # Записываем в debug recorder если это ошибка | |
| if 'error' in name.lower(): | |
| recorder = get_recorder() | |
| if recorder: | |
| recorder.record_error(name, self.page) | |
| return filename | |
| except Exception as e: | |
| print(f"[!] Screenshot failed: {e}") | |
| return None | |
| def pause_for_debug(self, message: str = "Paused for debugging"): | |
| """Пауза для ручной отладки""" | |
| if self.settings.get('debug', {}).get('pause_on_error', False): | |
| print(f"\n⏸️ {message}") | |
| print(" Press Enter to continue...") | |
| input() | |
| def close(self): | |
| """Закрытие браузера""" | |
| # Save collected scripts (only if enabled) | |
| if COLLECT_SCRIPTS: | |
| try: | |
| from debugger.collectors.script_collector import save_collected_scripts | |
| save_collected_scripts() | |
| except Exception as e: | |
| if self.verbose: | |
| print(f"[ScriptCollector] Error saving: {e}") | |
| # Завершаем debug recording | |
| recorder = get_recorder() | |
| if recorder: | |
| recorder.finish() | |
| try: | |
| self.page.quit() | |
| except Exception: | |
| pass | |