""" Браузерная автоматизация для регистрации AWS Builder ID """ import os import time import random import platform import functools from typing import Optional, Callable from DrissionPage import ChromiumPage, ChromiumOptions import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) # Force unbuffered output for real-time logging print = functools.partial(print, flush=True) # Импортируем спуфинг from spoof import apply_pre_navigation_spoofing from spoofers.behavior import BehaviorSpoofModule from spoofers.profile_storage import ProfileStorage # Debug recorder from core.debug_recorder import get_recorder, record, init_recorder # Script collector for analysis (disabled by default, enable for debugging) # from debugger.collectors.script_collector import ScriptCollector, collect_scripts, save_collected_scripts COLLECT_SCRIPTS = False # Set to True to enable script collection for analysis def find_chrome_path() -> Optional[str]: """Find Chrome/Chromium executable path on different platforms""" system = platform.system() if system == 'Windows': # Common Chrome paths on Windows possible_paths = [ os.path.expandvars(r'%ProgramFiles%\Google\Chrome\Application\chrome.exe'), os.path.expandvars(r'%ProgramFiles(x86)%\Google\Chrome\Application\chrome.exe'), os.path.expandvars(r'%LocalAppData%\Google\Chrome\Application\chrome.exe'), os.path.expandvars(r'%ProgramFiles%\Chromium\Application\chrome.exe'), os.path.expandvars(r'%LocalAppData%\Chromium\Application\chrome.exe'), # Edge as fallback os.path.expandvars(r'%ProgramFiles(x86)%\Microsoft\Edge\Application\msedge.exe'), os.path.expandvars(r'%ProgramFiles%\Microsoft\Edge\Application\msedge.exe'), ] elif system == 'Darwin': # macOS possible_paths = [ '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome', '/Applications/Chromium.app/Contents/MacOS/Chromium', os.path.expanduser('~/Applications/Google Chrome.app/Contents/MacOS/Google Chrome'), ] else: # Linux possible_paths = [ '/usr/bin/google-chrome', '/usr/bin/google-chrome-stable', '/usr/bin/chromium', '/usr/bin/chromium-browser', '/snap/bin/chromium', ] for path in possible_paths: if os.path.exists(path): return path return None from core.config import get_config from core.paths import get_paths # Селекторы на основе анализа Playwright (декабрь 2025) # AWS использует data-testid для стабильных селекторов SELECTORS = { 'cookie_accept': [ 'text=Accept', # Английский 'text=Принять', # Русский 'text=Akzeptieren', # German 'text=同意する', # Japanese 'xpath://button[contains(text(), "Accept")]', 'xpath://button[contains(text(), "Принять")]', '[data-id="awsccc-cb-btn-accept"]', ], 'email_input': [ '@placeholder=username@example.com', 'aria:Email', '@type=email', '@data-testid=test-input', ], 'continue_btn': [ '@data-testid=test-primary-button', # Основная кнопка Continue '@data-testid=signup-next-button', # Continue на странице имени '@data-testid=email-verification-verify-button', # Continue на странице кода 'text=Continue', 'text=Weiter', # German 'text=続行', # Japanese ], 'name_input': [ '@placeholder=Maria José Silva', # Актуальный placeholder 'aria:Name', '@data-testid=name-input', ], 'code_input': [ '@placeholder=6-digit', # English placeholder '@placeholder=6-stellig', # German placeholder '@placeholder=6桁', # Japanese placeholder 'aria:Verification code', 'aria:Verifizierungscode', # German 'aria:確認コード', # Japanese 'css:input[maxlength="6"]', # Generic 6-digit input ], 'password_input': [ '@placeholder=Enter password', # English placeholder '@placeholder=Passwort eingeben', # German placeholder '@placeholder=パスワードを入力', # Japanese placeholder 'aria:Password', 'aria:Passwort', # German 'aria:パスワード', # Japanese 'css:input[type="password"]', # Generic password input ], 'confirm_password': [ '@placeholder=Re-enter password', # English placeholder '@placeholder=Passwort erneut eingeben', # German placeholder '@placeholder=パスワードを再入力', # Japanese placeholder 'aria:Confirm password', 'aria:Passwort bestätigen', # German 'aria:パスワードを確認', # Japanese 'css:input[type="password"]:nth-of-type(2)', # Second password field ], 'allow_access': [ 'text=Allow access', 'text=Zugriff erlauben', # German 'text=アクセスを許可', # Japanese '@data-testid=allow-access-button', ], } # Контексты страниц - заголовки для определения текущего шага # Поддерживаемые языки: English, German (de-DE), Japanese (ja-JP) PAGE_CONTEXTS = { 'email': ['Get started', 'Sign in', 'Erste Schritte', 'サインイン', '開始する'], 'name': ['Enter your name', 'Geben Sie Ihren Namen ein', '名前を入力してください'], 'verification': ['Verify your email', 'E-Mail-Adresse bestätigen', 'メールアドレスを確認'], 'password': ['Create your password', 'Ihr Passwort erstellen', 'パスワードを作成'], 'allow_access': ['Allow access', 'Authorization', 'Zugriff erlauben', 'アクセスを許可', '認可'], } BROWSER_ARGS = [ # '--disable-blink-features=AutomationControlled', # AWS детектит этот флаг! '--disable-dev-shm-usage', ] PASSWORD_LENGTH = 16 PASSWORD_CHARS = { 'upper': 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'lower': 'abcdefghijklmnopqrstuvwxyz', 'digits': '0123456789', 'special': '!@#$%^&*', # Расширен набор спецсимволов } # Таймауты по умолчанию (оптимизированы) DEFAULT_TIMEOUTS = { 'page_load': 5, # Уменьшено с 10 'element_wait': 1, # Уменьшено с 3 'page_transition': 3, # Уменьшено с 5 'poll_interval': 0.1, # Интервал проверки элементов } def load_settings(): return get_config().to_dict() def get_setting(path, default=None): return get_config().get(path, default) BASE_DIR = get_paths().autoreg_dir class BrowserAutomation: """Автоматизация браузера для регистрации""" def __init__(self, headless: bool = None, email: str = None): """ Args: headless: Запуск без GUI (по умолчанию из настроек) email: Email аккаунта (для сохранения профиля спуфинга) """ self._email = email settings = load_settings() browser_settings = settings.get('browser', {}) # headless можно переопределить параметром if headless is None: headless = browser_settings.get('headless', False) # Force headless when no display is available (e.g., HF Spaces) if not os.environ.get('DISPLAY') and not os.environ.get('WAYLAND_DISPLAY'): headless = True self.settings = settings self.headless = headless self.verbose = settings.get('debug', {}).get('verbose', False) self.screenshots_on_error = browser_settings.get('screenshots_on_error', True) # Настройка браузера co = ChromiumOptions() # КРИТИЧНО: Уникальная user data directory для каждого экземпляра # Это предотвращает конфликт с уже открытым Chrome import tempfile import uuid temp_profile = os.path.join(tempfile.gettempdir(), f'kiro_chrome_{uuid.uuid4().hex[:8]}') co.set_user_data_path(temp_profile) co.auto_port() # Автоматически найти свободный порт print(f"[Browser] Using temp profile: {temp_profile}") # Найти и установить путь к Chrome (критично для Windows) chrome_path = find_chrome_path() if chrome_path: co.set_browser_path(chrome_path) print(f"[Browser] Using: {chrome_path}") else: print("[Browser] Warning: Chrome not found, using system default") if headless: co.headless() # Explicit headless flag for Chromium in headless Linux co.set_argument('--headless=new') # Дополнительные аргументы для стабильного headless co.set_argument('--disable-gpu') co.set_argument('--no-sandbox') co.set_argument('--disable-dev-shm-usage') # Скрываем automation infobars co.set_argument('--disable-infobars') co.set_argument('--no-first-run') co.set_argument('--no-default-browser-check') # Уменьшаем логи Chrome co.set_argument('--disable-logging') co.set_argument('--log-level=3') # Только fatal errors # НЕ используем --silent-launch - он ломает запуск Chrome! # Force English language to avoid Chinese error messages co.set_argument('--lang=en-US') co.set_argument('--accept-lang=en-US,en') # Set DrissionPage language to English (avoid Chinese error messages) os.environ['DRISSIONPAGE_LANG'] = 'en' # НЕ используем --disable-blink-features=AutomationControlled # Он показывает предупреждение которое палит автоматизацию! # Размер окна - большое для корректного отображения AWS UI co.set_argument('--window-size=1920,1080') co.set_argument('--start-maximized') if browser_settings.get('incognito', True): co.set_argument('--incognito') if browser_settings.get('devtools', False): co.set_argument('--auto-open-devtools-for-tabs') # Proxy support (from environment variables) proxy_url = os.environ.get('HTTPS_PROXY') or os.environ.get('HTTP_PROXY') if proxy_url: # Remove http:// prefix if present for Chrome proxy format proxy_server = proxy_url.replace('http://', '').replace('https://', '') co.set_argument(f'--proxy-server={proxy_server}') # Ignore SSL errors when using proxy (mitmproxy intercepts HTTPS) co.set_argument('--ignore-certificate-errors') co.set_argument('--ignore-ssl-errors') print(f"[Browser] Using proxy: {proxy_server}") for arg in BROWSER_ARGS: co.set_argument(arg) print(f"[Browser] Initializing ChromiumPage (headless={headless})...") try: self.page = ChromiumPage(co) print("[Browser] ChromiumPage initialized successfully") # КРИТИЧНО: Максимизируем окно для корректного отображения AWS UI if not headless: try: self.page.set.window.max() print(" [W] Window maximized") except: # Fallback: устанавливаем большой размер вручную try: self.page.set.window.size(1920, 1080) print(" [W] Window resized to 1920x1080") except: pass except Exception as e: error_msg = str(e).encode('ascii', 'replace').decode('ascii') print(f"[Browser] ERROR: Failed to initialize browser: {error_msg}") raise self._cookie_closed = False # Флаг чтобы не закрывать cookie много раз self._network_logs = [] # Логи сетевых запросов # ВАЖНО: Очищаем cookies и storage для чистой сессии try: # Очищаем cookies для всех AWS доменов self.page.run_cdp('Network.clearBrowserCookies') self.page.run_cdp('Network.clearBrowserCache') # Очищаем storage для AWS доменов aws_origins = [ 'https://profile.aws.amazon.com', 'https://signin.aws.amazon.com', 'https://us-east-1.signin.aws', 'https://oidc.us-east-1.amazonaws.com', 'https://view.awsapps.com', ] for origin in aws_origins: try: self.page.run_cdp('Storage.clearDataForOrigin', origin=origin, storageTypes='all') except: pass print(" [C] Cleared browser cookies, cache and storage") except Exception as e: print(f" [!] Failed to clear cookies: {e}") # КРИТИЧНО: Применяем спуфинг ДО навигации на страницу # Это гарантирует что AWS FWCIM получит подменённые данные # Проверяем env переменную SPOOFING_ENABLED (по умолчанию включено) spoofing_enabled = os.environ.get('SPOOFING_ENABLED', '1') == '1' if spoofing_enabled: try: # Используем ProfileStorage для консистентного fingerprint profile = None if self._email: from core.paths import get_paths storage = ProfileStorage(get_paths().tokens_dir) profile = storage.get_or_create(self._email) self._profile_storage = storage self._spoofer = apply_pre_navigation_spoofing(self.page, profile) print(" [S] Anti-fingerprint spoofing applied") except Exception as e: print(f" [!] Spoofing failed: {e}") self._spoofer = None else: print(" [S] Spoofing disabled by settings") self._spoofer = None # Инициализируем модуль человеческого поведения self._behavior = BehaviorSpoofModule() # Инициализируем debug recorder если включен if os.environ.get('DEBUG_RECORDING', '0') == '1': session_name = f"reg_{email.split('@')[0] if email else 'unknown'}_{int(time.time())}" self._recorder = init_recorder(session_name) else: self._recorder = None # Настройка реалистичного ввода (по умолчанию включено для обхода FWCIM) self._realistic_typing = browser_settings.get('realistic_typing', True) if self._realistic_typing: # Реалистичные задержки для обхода поведенческого анализа print(" [B] Realistic typing enabled (slower but safer)") else: # Быстрые задержки для скорости self._behavior.typing_delay_range = (0.03, 0.08) self._behavior.action_delay_range = (0.1, 0.3) print(" [B] Fast typing mode (faster but may be detected)") # Включаем перехват сетевых запросов self._setup_network_logging() self._log("Browser initialized", f"headless={headless}") def _setup_network_logging(self): """Настройка перехвата сетевых запросов через CDP""" try: # Включаем Network domain self.page.run_cdp('Network.enable') # Слушаем события запросов def on_request(params): url = params.get('request', {}).get('url', '') if 'send-otp' in url or 'api/' in url: self._network_logs.append({ 'type': 'request', 'url': url, 'method': params.get('request', {}).get('method'), 'headers': params.get('request', {}).get('headers'), 'postData': params.get('request', {}).get('postData'), 'requestId': params.get('requestId'), }) print(f" [W] API Request: {params.get('request', {}).get('method')} {url}") def on_response(params): url = params.get('response', {}).get('url', '') if 'send-otp' in url or 'api/' in url: status = params.get('response', {}).get('status') self._network_logs.append({ 'type': 'response', 'url': url, 'status': status, 'headers': params.get('response', {}).get('headers'), 'requestId': params.get('requestId'), }) print(f" [W] API Response: {status} {url}") # DrissionPage не поддерживает CDP events напрямую, используем альтернативу print(" [N] Network logging enabled (will capture via Performance API)") except Exception as e: print(f" [!] Network logging setup failed: {e}") def save_network_logs(self, filename: str = "network_logs.json"): """Сохраняет логи сетевых запросов в файл""" import json filepath = BASE_DIR / filename # Получаем логи через Performance API try: perf_logs = self.page.run_js(''' return performance.getEntriesByType('resource') .filter(e => e.name.includes('api/') || e.name.includes('send-otp')) .map(e => ({ url: e.name, duration: e.duration, startTime: e.startTime, transferSize: e.transferSize, type: e.initiatorType })); ''') self._network_logs.extend(perf_logs or []) except: pass with open(filepath, 'w', encoding='utf-8') as f: json.dump(self._network_logs, f, indent=2, ensure_ascii=False) print(f" [F] Network logs saved: {filepath}") return filepath def _log(self, message: str, detail: str = ""): """Логирование с учётом verbose режима""" if self.verbose or not detail: print(f"[*] {message}" + (f" ({detail})" if detail else "")) def _find_element(self, selectors: list, timeout: int = None): """Ищет элемент по списку селекторов""" timeout = timeout or self.settings.get('timeouts', {}).get('element_wait', 3) for selector in selectors: try: elem = self.page.ele(selector, timeout=timeout) if elem: return elem except Exception: pass return None def _click_if_exists(self, selectors: list, timeout: int = 1) -> bool: """Кликает по элементу если он существует""" elem = self._find_element(selectors, timeout) if elem: self.human_click(elem) return True return False def wait_for_page_context(self, context_key: str, timeout: int = None) -> bool: """ Ждёт появления контекста страницы (заголовка). Оптимизировано: использует быстрый polling вместо медленных циклов. Args: context_key: Ключ из PAGE_CONTEXTS ('email', 'name', 'verification', 'password') timeout: Таймаут в секундах Returns: True если контекст найден """ timeout = timeout or DEFAULT_TIMEOUTS['page_transition'] poll_interval = DEFAULT_TIMEOUTS['poll_interval'] contexts = PAGE_CONTEXTS.get(context_key, []) if not contexts: return True print(f" [...] Waiting for page: {context_key}...") # Строим комбинированный селектор для всех контекстов combined_selectors = [f'text={ctx}' for ctx in contexts] start_time = time.time() while time.time() - start_time < timeout: for selector in combined_selectors: try: if self.page.ele(selector, timeout=poll_interval): elapsed = time.time() - start_time print(f" [OK] Page context found in {elapsed:.2f}s") time.sleep(0.1) # Минимальная пауза для React return True except: pass print(f" [!] Page context not found: {context_key}") return False def wait_for_url_change(self, old_url: str, timeout: int = None) -> bool: """ Ждёт изменения URL после действия. Оптимизировано: быстрый polling с минимальными задержками. Args: old_url: Предыдущий URL timeout: Таймаут в секундах Returns: True если URL изменился """ timeout = timeout or DEFAULT_TIMEOUTS['page_transition'] poll_interval = DEFAULT_TIMEOUTS['poll_interval'] start_time = time.time() while time.time() - start_time < timeout: if self.page.url != old_url: time.sleep(0.15) # Минимальная пауза для загрузки return True time.sleep(poll_interval) return False def wait_for_element(self, selectors: list, timeout: int = None) -> Optional[object]: """ Умное ожидание элемента с быстрым polling. Args: selectors: Список селекторов для поиска timeout: Таймаут в секундах Returns: Найденный элемент или None """ timeout = timeout or DEFAULT_TIMEOUTS['element_wait'] poll_interval = DEFAULT_TIMEOUTS['poll_interval'] start_time = time.time() while time.time() - start_time < timeout: for selector in selectors: try: elem = self.page.ele(selector, timeout=poll_interval) if elem: return elem except: pass return None # ======================================================================== # HUMAN-LIKE INPUT (Обход поведенческого анализа AWS FWCIM) # ======================================================================== def human_type(self, element, text: str, click_first: bool = True, fast: bool = None, field_type: str = 'default'): """ Вводит текст с человеческими задержками. Args: element: Элемент для ввода text: Текст для ввода click_first: Кликнуть на элемент перед вводом fast: Быстрый режим (None = использовать настройку realistic_typing) field_type: Тип поля для BehaviorSpoofModule ('email', 'password', 'name', 'code') """ # Определяем режим: если fast не указан, используем настройку use_fast = fast if fast is not None else not self._realistic_typing # ВСЕГДА используем fwcim_type для генерации правильных событий клавиатуры # FWCIM отслеживает keyCycles (keydown->keyup timing) - execCommand их не генерирует! # Параметр fast передаётся в fwcim_type для уменьшения задержек self._behavior.fwcim_type(self.page, element, text, field_type=field_type, fast=use_fast) def human_click(self, element, with_delay: bool = True): """ Кликает по элементу с человеческой задержкой. Args: element: Элемент для клика with_delay: Добавить задержку до/после клика """ if with_delay: self._behavior.human_delay(0.15, 0.4) # FWCIM-совместимый клик с mousedown/mouseup событиями self._behavior.fwcim_click(self.page, element) def simulate_human_activity(self): """ Симулирует активность реального пользователя. Вызывать периодически для обхода поведенческого анализа. """ # Случайные движения мыши self._behavior.random_mouse_movement(self, count=random.randint(1, 3)) # Иногда скроллим страницу if random.random() < 0.3: direction = random.choice(['up', 'down']) self._behavior.scroll_page(self, direction=direction) @staticmethod def generate_password(length: int = PASSWORD_LENGTH) -> str: """ Генерация криптографически безопасного пароля. Использует secrets для избежания предсказуемости. AWS проверяет пароли на утечки - нужна высокая энтропия. """ import secrets chars = ''.join(PASSWORD_CHARS.values()) # Гарантируем наличие всех типов символов password = [ secrets.choice(PASSWORD_CHARS['upper']), secrets.choice(PASSWORD_CHARS['lower']), secrets.choice(PASSWORD_CHARS['digits']), secrets.choice(PASSWORD_CHARS['special']), ] # Добавляем случайные символы password += [secrets.choice(chars) for _ in range(length - 4)] # Перемешиваем (secrets.SystemRandom для криптографической случайности) secrets.SystemRandom().shuffle(password) return ''.join(password) def _accept_cookie_banner(self): """Принимает cookie banner кликом на Accept/Принять.""" # Метод 1: DrissionPage - ищем кнопку Accept напрямую try: # Ищем оранжевую кнопку Accept на profile.aws.amazon.com accept_btn = self.page.ele('text=Accept', timeout=0.5) if accept_btn and accept_btn.tag == 'button': accept_btn.click() print(" [C] Cookie banner accepted (DrissionPage)") return True except: pass # Метод 2: JavaScript для AWS signin cookie banner try: result = self.page.run_js(''' // AWS signin cookie banner const acceptButtons = [ '[data-id="awsccc-cb-btn-accept"]', 'button[data-id*="accept"]', '#awsccc-cb-btn-accept', ]; for (const sel of acceptButtons) { try { const btn = document.querySelector(sel); if (btn && btn.offsetParent !== null) { btn.click(); return 'clicked'; } } catch(e) {} } // Fallback: ищем кнопку по тексту const allButtons = document.querySelectorAll('button'); for (const btn of allButtons) { const text = btn.textContent.trim(); if ((text === 'Accept' || text === 'Принять') && btn.offsetParent !== null) { btn.click(); return 'clicked'; } } return 'not_found'; ''') if result == 'clicked': print(" [C] Cookie banner accepted (JS)") return True except: pass return False def _hide_cookie_banner(self): """Скрывает cookie banner через CSS (fallback если клик не сработал).""" try: self.page.run_js(''' // Скрываем все возможные cookie элементы через CSS const selectors = [ '#awsccc-cb-content', '#awsccc-cb', '.awsccc-cs-overlay', '.awscc-cookie-banner', '.awsccc-cb-container' ]; selectors.forEach(sel => { const el = document.querySelector(sel); if (el) el.style.display = 'none'; }); // Также удаляем overlay который блокирует клики document.querySelectorAll('.awsccc-cs-overlay, .modal-backdrop').forEach(el => el.remove()); ''') except: pass def _collect_scripts(self): """Collect all JS scripts from current page for analysis. Disabled by default. Set COLLECT_SCRIPTS = True at top of file to enable. """ if not COLLECT_SCRIPTS: return try: # Import only when needed from debugger.collectors.script_collector import collect_scripts collect_scripts(self.page) except Exception as e: # Don't fail registration if script collection fails if self.verbose: print(f"[ScriptCollector] Error: {e}") def close_cookie_dialog(self, force: bool = False): """Принимает или скрывает диалог cookie.""" if self._cookie_closed and not force: return False # Сначала пробуем кликнуть Accept if self._accept_cookie_banner(): self._cookie_closed = True return True # Fallback: скрываем через CSS self._hide_cookie_banner() self._cookie_closed = True return True def enter_device_code(self, user_code: str, email: str = None, password: str = None) -> bool: """ Вводит код устройства на странице Device Authorization. Страница: view.awsapps.com/start/#/device Если показывается форма логина (после регистрации), сначала логинится. Args: user_code: Код устройства (например SQHH-RXJR) email: Email для логина (если нужен) password: Пароль для логина (если нужен) """ print(f"[KEY] Entering device code: {user_code}") # Ждём загрузки страницы device authorization print(" [...] Waiting for device authorization page...") for _ in range(15): try: # Проверяем что мы на странице device if self.page.ele('text=Authorization requested', timeout=0.5): print(" [OK] Device authorization page loaded") break if self.page.ele('text=Enter the code', timeout=0.5): print(" [OK] Device authorization page loaded") break # Форма логина на странице device if self.page.ele('text=Sign in', timeout=0.5): print(" [OK] Login form on device page") break except: pass time.sleep(0.5) # Закрываем cookie если есть self.close_cookie_dialog(force=True) time.sleep(1) # Проверяем - это форма логина или форма device code? # Если есть password поле но нет текстового поля - это логин all_inputs = self.page.eles('tag:input') has_password_field = any((inp.attr('type') or '').lower() == 'password' for inp in all_inputs) has_text_field = any((inp.attr('type') or '').lower() in ('', 'text') and (inp.attr('type') or '').lower() not in ('checkbox', 'hidden', 'submit', 'button') for inp in all_inputs) # Если есть password но нет text - это форма логина if has_password_field and not has_text_field and password: print(" [K] Login form detected, logging in first...") if not self._login_on_device_page(email, password): print(" [!] Login failed") return False time.sleep(2) # После логина перезагружаем inputs all_inputs = self.page.eles('tag:input') # Ждём появления поля ввода кода # На странице device authorization поле может быть без type или с type="" code_input = None for attempt in range(15): try: # Способ 1: Ищем все input'ы и фильтруем all_inputs = self.page.eles('tag:input') print(f" [S] Found {len(all_inputs)} inputs on attempt {attempt + 1}") for inp in all_inputs: inp_type = (inp.attr('type') or '').lower() inp_id = inp.attr('id') or '' inp_name = inp.attr('name') or '' inp_aria = inp.attr('aria-label') or '' print(f" Input: type='{inp_type}', id='{inp_id}', name='{inp_name}', aria='{inp_aria}'") # Пропускаем password, checkbox, hidden if inp_type in ('password', 'checkbox', 'hidden', 'submit', 'button'): continue # Это наше поле! code_input = inp print(f" [OK] Found code input field") break if code_input: break except Exception as e: print(f" [!] Error searching inputs: {e}") time.sleep(0.5) if not code_input: print(" [!] Device code input not found") self._debug_inputs() self.screenshot("error_device_code") return False # Вводим код code_input.clear() self.human_type(code_input, user_code) time.sleep(0.5) # Кликаем Confirm and continue confirm_btn = self._find_element([ 'text=Confirm and continue', 'xpath://button[contains(text(), "Confirm")]', '@data-testid=confirm-button', ], timeout=3) if confirm_btn: print(" [->] Clicking Confirm and continue...") self.human_click(confirm_btn) time.sleep(2) return True print(" [!] Confirm button not found") return False def _login_on_device_page(self, email: str, password: str) -> bool: """ Логинится на странице device (когда показывается форма логина после регистрации). AWS показывает только поле пароля, т.к. email уже известен. """ print(f" [K] Logging in on device page...") # ВАЖНО: Сначала закрываем cookie диалог - он перекрывает кнопки! self.close_cookie_dialog(force=True) time.sleep(0.5) # Ищем поле пароля pwd_field = None for selector in ['tag:input@@type=password', 'input[type="password"]']: try: pwd_field = self.page.ele(selector, timeout=2) if pwd_field: break except: pass if not pwd_field: print(" [!] Password field not found on device page") return False # Вводим пароль print(f" Entering password...") self._behavior.human_click(pwd_field) self.page.run_js('arguments[0].focus()', pwd_field) self._behavior.human_delay(0.1, 0.2) # Вводим через CDP с человеческими задержками for char in password: self.page.run_cdp('Input.insertText', text=char) self._behavior.human_delay(0.03, 0.1) time.sleep(0.5) # Ещё раз закрываем cookie если появился снова self.close_cookie_dialog(force=True) time.sleep(0.3) # Debug: показываем все кнопки на странице try: buttons = self.page.eles('tag:button') print(f" [S] Found {len(buttons)} buttons:") for i, btn in enumerate(buttons[:5]): btn_text = btn.text or '' btn_type = btn.attr('type') or '' btn_testid = btn.attr('data-testid') or '' print(f" Button {i}: text='{btn_text[:30]}', type='{btn_type}', testid='{btn_testid}'") except Exception as e: print(f" [!] Error listing buttons: {e}") # Кликаем Sign in / Continue - расширенный список селекторов sign_in_btn = self._find_element([ 'text=Sign in', 'text=Continue', 'text=Submit', 'text=Next', 'xpath://button[contains(text(), "Sign in")]', 'xpath://button[contains(text(), "Continue")]', 'xpath://button[contains(text(), "Submit")]', 'xpath://button[@type="submit"]', '@data-testid=test-primary-button', '@data-testid=signin-button', '@data-testid=submit-button', 'tag:button@@type=submit', ], timeout=3) if sign_in_btn: print(f" [->] Clicking Sign in button...") self.human_click(sign_in_btn) time.sleep(3) print(" [OK] Logged in") return True # Fallback: кликаем первую кнопку submit try: submit_btn = self.page.ele('tag:button@@type=submit', timeout=2) if submit_btn: print(f" [->] Clicking submit button (fallback)...") self.human_click(submit_btn) time.sleep(3) print(" [OK] Logged in (fallback)") return True except: pass print(" [!] Sign in button not found") self.screenshot("error_login_no_button") return False def enter_email(self, email: str) -> bool: """Вводит email. Оптимизировано для скорости.""" print(f"[M] Entering email: {email}") record('enter_email', {'email': email}, self.page, screenshot=False) # Закрываем cookie один раз self.close_cookie_dialog(force=True) # Минимальная пауза перед вводом (для React) time.sleep(0.15) # Быстрые селекторы в порядке приоритета selectors = [ '@placeholder=username@example.com', '@type=email', 'xpath://input[@data-testid="test-input"]', ] email_input = None for selector in selectors: try: email_input = self.page.ele(selector, timeout=0.5) if email_input: self._log(f"Found email field", selector) break except: pass if not email_input: self._debug_inputs() self.screenshot("error_no_email") raise Exception("Email field not found") # КРИТИЧНО: Используем fwcim_type для генерации правильных событий клавиатуры # FWCIM отслеживает keyCycles (keydown->keyup timing) - execCommand их не генерирует! # fast=True для скорости, но всё ещё генерирует события self._behavior.fwcim_type(self.page, email_input, email, field_type='email', fast=True) time.sleep(0.1) # Проверяем что ввелось правильно entered = email_input.attr('value') or '' if entered != email: print(f" [!] Email mismatch: expected '{email}', got '{entered}'") # Повторная попытка email_input.clear() email_input.input(email) return True def _debug_inputs(self): """Выводит отладочную информацию о input элементах""" print(" [S] Debug: searching for input elements...") try: inputs = self.page.eles('tag:input') for i, inp in enumerate(inputs[:5]): print(f" Input {i}: type={inp.attr('type')}, placeholder={inp.attr('placeholder')}") except Exception as e: print(f" Error: {e}") def click_continue(self) -> bool: """Нажимает кнопку Continue после email и ждёт страницу имени""" print("[->] Clicking Continue...") # Запоминаем URL до клика url_before = self.page.url # Быстрый клик Continue if not self._click_if_exists(SELECTORS['continue_btn'], timeout=1): raise Exception("Continue button not found") # ВАЖНО: Ждём загрузку страницы после клика try: self.page.wait.doc_loaded(timeout=10) except: pass # Ждём изменения URL или появления нового контента time.sleep(1) url_after = self.page.url print(f" [URL] Before: {url_before[:60]}...") print(f" [URL] After: {url_after[:60]}...") # Ждём пока страница profile.aws.amazon.com загрузится # Cookie popup появляется после загрузки if 'profile.aws.amazon.com' in self.page.url: print(" [...] Waiting for profile.aws.amazon.com to load...") for _ in range(10): time.sleep(0.5) # Ищем кнопку Accept которая появится после загрузки try: accept_btn = self.page.ele('text=Accept', timeout=0.3) if accept_btn: print(" [C] Found Accept button, clicking...") accept_btn.click() time.sleep(0.5) break except: pass # КРИТИЧНО: Принимаем cookie диалог - он появляется после клика Continue # и перекрывает страницу имени! Пробуем несколько раз с паузой for attempt in range(3): self._cookie_closed = False # Сбрасываем флаг чтобы попробовать снова if self.close_cookie_dialog(force=True): time.sleep(0.5) break time.sleep(0.5) # Ожидание страницы имени - ищем ТЕКСТ "Enter your name" или placeholder с "Silva" print(" [...] Waiting for name page...") name_page_selectors = [ 'text=Enter your name', # Заголовок страницы '@placeholder=Maria José Silva', # Placeholder поля имени 'xpath://input[contains(@placeholder, "Silva")]', 'xpath://input[@type="text"]', # Любое текстовое поле ] start_time = time.time() timeout = 20 # Увеличено для медленных соединений cookie_retry = 0 last_debug = 0 while time.time() - start_time < timeout: # Периодически пробуем принять cookie (может появиться с задержкой) if cookie_retry < 5 and time.time() - start_time > cookie_retry * 2: self.close_cookie_dialog(force=True) cookie_retry += 1 # Debug каждые 5 секунд elapsed = time.time() - start_time if elapsed - last_debug > 5: print(f" [DEBUG] {elapsed:.0f}s - URL: {self.page.url[:50]}...") # Показываем что есть на странице try: title = self.page.title print(f" [DEBUG] Title: {title}") # Ищем любые заголовки h1 = self.page.ele('tag:h1', timeout=0.2) if h1: print(f" [DEBUG] H1: {h1.text[:50] if h1.text else 'empty'}") except: pass last_debug = elapsed for selector in name_page_selectors: try: if self.page.ele(selector, timeout=0.3): elapsed = time.time() - start_time print(f" [OK] Name page loaded in {elapsed:.2f}s") # Collect scripts after page transition self._collect_scripts() return True except: pass # Финальная отладка перед ошибкой print(" [X] FAILED: Name page did not load!") print(f" [DEBUG] Final URL: {self.page.url}") try: print(f" [DEBUG] Final Title: {self.page.title}") body_text = self.page.ele('tag:body').text[:200] if self.page.ele('tag:body') else 'no body' print(f" [DEBUG] Body text: {body_text}") except Exception as e: print(f" [DEBUG] Error getting page info: {e}") self.screenshot("error_no_name_page") raise Exception("Name page did not load after email") def enter_name(self, name: str) -> bool: """Вводит имя. Оптимизировано для скорости.""" print(f"[N] Entering name: {name}") record('enter_name', {'name': name}, self.page, screenshot=False) # КРИТИЧНО: Закрываем cookie диалог ПЕРЕД поиском поля self._hide_cookie_banner() # ВАЖНО: Проверяем что мы на странице имени, а не email! # Ищем текст "Enter your name" или placeholder с "Silva" on_name_page = False for _ in range(10): try: if self.page.ele('text=Enter your name', timeout=0.3): on_name_page = True break if self.page.ele('@placeholder=Maria José Silva', timeout=0.3): on_name_page = True break except: pass time.sleep(0.3) if not on_name_page: print(" [!] WARNING: Not on name page! Current URL:", self.page.url[:60]) # Попробуем ещё раз кликнуть Continue и подождать self._click_if_exists(SELECTORS['continue_btn'], timeout=1) time.sleep(2) # Проверяем ещё раз for _ in range(5): try: if self.page.ele('@placeholder=Maria José Silva', timeout=0.5): on_name_page = True print(" [OK] Now on name page after retry") break except: pass time.sleep(0.5) if not on_name_page: print(" [X] Still not on name page, aborting!") self.screenshot("error_not_on_name_page") raise Exception("Failed to navigate to name page") name_input = None start_time = time.time() # ПРИОРИТЕТ 1: Ищем по placeholder "Maria José Silva" - это точно поле имени try: name_input = self.page.ele('@placeholder=Maria José Silva', timeout=0.5) if name_input: print(f" Found name field via placeholder (Maria José Silva)") except: pass # Fallback 1: placeholder с "Silva" (уникальный для страницы имени) if not name_input: try: name_input = self.page.ele('xpath://input[contains(@placeholder, "Silva")]', timeout=0.3) if name_input: print(f" Found name field via placeholder: Silva") except: pass # Fallback 2: data-testid if not name_input: try: name_input = self.page.ele('@data-testid=name-input', timeout=0.2) if name_input: print(f" Found name field via data-testid") except: pass # Fallback 3: CSS селектор (ТОЛЬКО если на странице имени!) if not name_input and on_name_page: try: name_input = self.page.ele('css:input[type="text"]:not([hidden])', timeout=0.3) if name_input: print(f" Found name field via CSS (fast)") except: pass elapsed = time.time() - start_time if elapsed > 0.5: print(f" [!] Name field search took {elapsed:.2f}s") if not name_input: print(" [X] Name field not found!") return False # КРИТИЧНО: Используем fwcim_type для генерации правильных событий клавиатуры # FWCIM отслеживает keyCycles (keydown->keyup timing) - execCommand их не генерирует! self._behavior.fwcim_type(self.page, name_input, name, field_type='name', fast=True) # Blur event после ввода self.page.run_js('arguments[0].dispatchEvent(new Event("blur", { bubbles: true }));', name_input) time.sleep(0.1) # Кликаем Continue print(" [->] Clicking Continue...") self._click_if_exists(SELECTORS['continue_btn'], timeout=1) # ВАЖНО: Ждём загрузку страницы после клика try: self.page.wait.doc_loaded(timeout=10) except: pass # Принимаем cookie если появился time.sleep(0.5) self.close_cookie_dialog(force=True) # Ожидание страницы верификации print(" [...] Waiting for verification page...") verification_selectors = [ 'text=Verify your email', 'text=Verification code', '@placeholder=6-digit', # German 'text=E-Mail-Adresse bestätigen', 'text=Verifizierungscode', '@placeholder=6-stellig', # Japanese 'text=メールアドレスを確認', 'text=確認コード', '@placeholder=6桁', # Spanish 'text=Verificar tu correo', 'text=Código de verificación', # French 'text=Vérifier votre e-mail', 'text=Code de vérification', # Generic - input field for 6-digit code 'css:input[maxlength="6"]', 'css:input[type="text"][placeholder*="6"]', ] start_time = time.time() timeout = 20 retry_count = 0 max_retries = 2 cookie_retry = 0 while time.time() - start_time < timeout: # Периодически принимаем cookie if cookie_retry < 5 and time.time() - start_time > cookie_retry * 2: self.close_cookie_dialog(force=True) cookie_retry += 1 # Проверяем на ошибку AWS if self._check_aws_error(): self._close_error_modal() if retry_count < max_retries: retry_count += 1 print(f" [R] Retry {retry_count}/{max_retries}") # Retry - ищем поле имени заново retry_input = self.page.ele('css:input[type="text"]:not([hidden])', timeout=1) if retry_input: # Используем fwcim_type для генерации правильных событий self._behavior.fwcim_type(self.page, retry_input, name, field_type='name', fast=True) self._click_if_exists(SELECTORS['continue_btn'], timeout=1) continue for selector in verification_selectors: try: if self.page.ele(selector, timeout=0.3): elapsed = time.time() - start_time print(f" [OK] Verification page loaded in {elapsed:.2f}s") # Collect scripts after page transition self._collect_scripts() return True except: pass # Отладка перед ошибкой print(" [X] FAILED: Verification page did not load!") print(f" [DEBUG] URL: {self.page.url}") try: print(f" [DEBUG] Title: {self.page.title}") except: pass self.screenshot("error_no_verification_page") raise Exception("Verification page did not load after entering name") def enter_verification_code(self, code: str) -> bool: """Вводит код верификации. Оптимизировано для скорости.""" print(f"[K] Entering code: {code}") record('enter_code', {'code': code}, self.page) # Закрываем cookie один раз self.close_cookie_dialog(force=True) code_input = self._find_element(SELECTORS['code_input'], timeout=10) if not code_input: raise Exception("Verification code field not found") # КРИТИЧНО: Используем fwcim_type для генерации правильных событий клавиатуры # FWCIM отслеживает keyCycles (keydown->keyup timing) - execCommand их не генерирует! # Код верификации - критичное поле, используем fast=True для скорости self._behavior.fwcim_type(self.page, code_input, code, field_type='code', fast=True) time.sleep(0.1) print(f" Entered code: '{code_input.attr('value') or ''}'") # Кликаем Continue/Verify с retry verify_selectors = [ '@data-testid=email-verification-verify-button', 'text=Verify', 'text=Continue', '@data-testid=test-primary-button', ] print(" [->] Clicking Verify/Continue...") clicked = False for attempt in range(3): for selector in verify_selectors: try: btn = self.page.ele(selector, timeout=0.5) if btn: self._behavior.human_js_click(self.page, btn, pre_delay=False) clicked = True print(f" [OK] Clicked: {selector}") break except: pass if clicked: break self._behavior.human_delay(0.15, 0.3) if not clicked: print(" [!] Verify button not found, trying Enter key...") try: self.page.run_cdp('Input.dispatchKeyEvent', type='keyDown', key='Enter', code='Enter', windowsVirtualKeyCode=13) self.page.run_cdp('Input.dispatchKeyEvent', type='keyUp', key='Enter', code='Enter', windowsVirtualKeyCode=13) except: pass # Оптимизированное ожидание страницы пароля print(" [...] Waiting for password page...") password_selectors = [ 'text=Create your password', 'text=Set your password', '@placeholder=Enter password', # German 'text=Ihr Passwort erstellen', '@placeholder=Passwort eingeben', # Japanese 'text=パスワードを作成', '@placeholder=パスワードを入力', # Spanish 'text=Crea tu contraseña', # French 'text=Créer votre mot de passe', # Generic 'css:input[type="password"]', ] start_time = time.time() timeout = 15 # Увеличено для надёжности while time.time() - start_time < timeout: for selector in password_selectors: try: if self.page.ele(selector, timeout=0.1): elapsed = time.time() - start_time print(f" [OK] Password page loaded in {elapsed:.2f}s") # Collect scripts after page transition self._collect_scripts() return True except: pass # Проверяем на ошибку AWS if self._check_aws_error(): print(" [!] AWS error detected, closing modal...") self._close_error_modal() time.sleep(0.3) print(" [X] FAILED: Password page did not load!") self.screenshot("error_no_password_page") raise Exception("Password page did not load after verification code") def enter_password(self, password: str) -> bool: """Вводит и подтверждает пароль""" print("[KEY] Entering password...") record('enter_password', {'length': len(password)}, self.page, screenshot=False) # Скриншот отключен - таймаутится step_start = time.time() # Быстрое ожидание контекста self.wait_for_page_context('password', timeout=5) time.sleep(0.15) # Минимальная пауза для React # Ищем password поля pwd_fields = self.page.eles('tag:input@@type=password', timeout=3) print(f" Found {len(pwd_fields)} password fields ({time.time() - step_start:.1f}s)") pwd1, pwd2 = None, None # Быстрая стратегия определения полей for field in pwd_fields: ph = (field.attr('placeholder') or '').lower() if 're-enter' in ph or 'confirm' in ph: pwd2 = field elif not pwd1: pwd1 = field if not pwd1 and pwd_fields: pwd1 = pwd_fields[0] if not pwd2 and len(pwd_fields) >= 2: pwd2 = pwd_fields[1] if not pwd1: print(" [!] No password fields found!") self.screenshot("error_no_password") return False # Ввод пароля с человеческим поведением (медленно - вспоминаем пароль) print(" Entering password...") typing_start = time.time() self.human_type(pwd1, password, field_type='password') print(f" Password entered ({time.time() - typing_start:.1f}s)") if pwd2: self._behavior.human_delay(0.2, 0.4) # Пауза перед повторным вводом print(" Confirming password...") confirm_start = time.time() self.human_type(pwd2, password, field_type='password') # Проверяем что оба пароля одинаковые через JavaScript time.sleep(0.3) # Даём время React обновить состояние pwd1_value = self.page.run_js('return arguments[0].value || ""', pwd1) pwd2_value = self.page.run_js('return arguments[0].value || ""', pwd2) # Также проверяем наличие ошибки на странице password_error = self._check_password_mismatch_error() if pwd1_value != pwd2_value or password_error: print(f" [!] Password mismatch detected! Clearing and re-entering...") # Очищаем оба поля через JavaScript self.page.run_js('arguments[0].value = ""; arguments[0].dispatchEvent(new Event("input", {bubbles: true}))', pwd1) self.page.run_js('arguments[0].value = ""; arguments[0].dispatchEvent(new Event("input", {bubbles: true}))', pwd2) time.sleep(0.2) # Вводим заново - используем быстрый режим для надёжности self.human_type(pwd1, password, field_type='password', fast=True) self._behavior.human_delay(0.3, 0.5) self.human_type(pwd2, password, field_type='password', fast=True) # Проверяем ещё раз time.sleep(0.3) pwd1_value = self.page.run_js('return arguments[0].value || ""', pwd1) pwd2_value = self.page.run_js('return arguments[0].value || ""', pwd2) password_error = self._check_password_mismatch_error() if pwd1_value != pwd2_value or password_error: print(f" [!] Password still mismatched after retry!") self.screenshot("error_password_mismatch") else: print(f" [OK] Passwords match after retry") print(f" Password confirmed ({time.time() - confirm_start:.1f}s)") time.sleep(0.15) # Проверяем наличие CAPTCHA перед кликом Continue if self._check_captcha(): print(" [!] CAPTCHA detected on password page!") self.screenshot("captcha_detected") # Ждём ручного решения капчи if not self._handle_captcha(timeout=120): print(" [!] CAPTCHA solving failed or timeout") return False print(" [OK] CAPTCHA solved, continuing...") print(f"[->] Clicking Continue... (total input time: {time.time() - step_start:.1f}s)") old_url = self.page.url click_time = time.time() self._click_if_exists(SELECTORS['continue_btn'], timeout=1) # Ждём немного и проверяем результат time.sleep(1) # Проверяем появилась ли капча после клика Continue if self._check_captcha(): print(" [!] CAPTCHA appeared after Continue click!") if not self._handle_captcha(timeout=120): print(" [!] CAPTCHA solving failed") return False # После решения капчи кликаем Continue снова print("[->] Clicking Continue after captcha...") self._click_if_exists(SELECTORS['continue_btn'], timeout=1) time.sleep(1) # Проверяем на ошибку "Invalid captcha" if self._check_captcha_error(): print(" [!] Invalid captcha error - captcha was rejected by server") self.screenshot("captcha_invalid") # Пробуем ещё раз с новой капчей print(" [R] Retrying with new captcha...") time.sleep(1) if self._check_captcha(): if not self._handle_captcha(timeout=120): return False self._click_if_exists(SELECTORS['continue_btn'], timeout=1) time.sleep(1) if self._check_captcha_error(): print(" [!] Captcha rejected again") return False if self._check_password_error(): print(" [!] Password rejected, generating new one...") return self.enter_password(self.generate_password(18)) # Ждём перехода - AWS может долго редиректить (показывает "Redirecting...") # Таймаут настраивается в config.timeouts.password_redirect password_redirect_timeout = self.settings.get('timeouts', {}).get('password_redirect', 60) print(f" [...] Waiting for redirect after password (timeout: {password_redirect_timeout}s)...") # Логируем прогресс ожидания # Ждём редирект на view.awsapps.com или callback URL redirect_start = time.time() last_log = 0 last_url = old_url workflow_success_detected = False while time.time() - redirect_start < password_redirect_timeout: current_url = self.page.url # Логируем изменение URL if current_url != last_url: elapsed = time.time() - redirect_start print(f" [{elapsed:.1f}s] URL: {current_url[:60]}...") last_url = current_url # Успешный редирект на Allow access или callback if 'view.awsapps.com' in current_url or 'awsapps.com/start' in current_url: elapsed = time.time() - redirect_start print(f" [OK] Redirected to awsapps.com after {elapsed:.1f}s") # Collect scripts after redirect self._collect_scripts() break if '127.0.0.1' in current_url and 'oauth/callback' in current_url: elapsed = time.time() - redirect_start print(f" [OK] Redirected to callback after {elapsed:.1f}s") # Collect scripts after redirect self._collect_scripts() break # КРИТИЧНО: Проверяем cookie workflow-step-id # AWS устанавливает его в "end-of-workflow-success" когда регистрация завершена # но JS на странице может не сработать из-за спуфинга if not workflow_success_detected: try: workflow_cookie = self.page.run_js(''' const cookies = document.cookie.split(';'); for (const c of cookies) { const [name, value] = c.trim().split('='); if (name === 'workflow-step-id') return decodeURIComponent(value); } return null; ''') if workflow_cookie == 'end-of-workflow-success': elapsed = time.time() - redirect_start print(f" [!] Cookie 'workflow-step-id=end-of-workflow-success' detected at {elapsed:.1f}s") print(f" [!] AWS registration complete but page stuck - forcing navigation...") workflow_success_detected = True # Пробуем найти redirect URL в странице или cookies redirect_url = self.page.run_js(r''' // Ищем redirect URL в meta refresh const meta = document.querySelector('meta[http-equiv="refresh"]'); if (meta) { const content = meta.getAttribute('content'); const match = content.match(/url=(.+)/i); if (match) return match[1]; } // Ищем в скриптах const scripts = document.querySelectorAll('script'); for (const s of scripts) { const text = s.textContent; const match = text.match(/window\.location\s*=\s*["']([^"']+)["']/); if (match) return match[1]; } // Ищем ссылку на awsapps const links = document.querySelectorAll('a[href*="awsapps"]'); if (links.length > 0) return links[0].href; return null; ''') if redirect_url: print(f" [>] Found redirect URL: {redirect_url[:60]}...") self.page.get(redirect_url) time.sleep(1) continue # Если не нашли URL - пробуем стандартный путь # Получаем workflowResultHandle из URL import re result_match = re.search(r'workflowResultHandle=([^&]+)', current_url) if result_match: result_handle = result_match.group(1) # Пробуем перейти на view.awsapps.com напрямую awsapps_url = f"https://view.awsapps.com/start/#/?workflowResultHandle={result_handle}" print(f" [>] Trying direct navigation to awsapps...") self.page.get(awsapps_url) time.sleep(2) continue except Exception as e: pass # Логируем каждые 10 секунд elapsed = int(time.time() - redirect_start) if elapsed > 0 and elapsed % 10 == 0 and elapsed != last_log: print(f" [...] Still waiting... ({elapsed}s)") last_log = elapsed # Проверяем текст на странице try: page_text = self.page.run_js('document.body.innerText.substring(0, 200)') if 'Redirecting' in page_text: print(f" [i] AWS shows 'Redirecting...' - page may be stuck") # Если страница застряла на Redirecting больше 15 секунд - пробуем refresh if elapsed > 15 and not workflow_success_detected: print(f" [!] Page stuck on Redirecting, trying page refresh...") self.page.refresh() time.sleep(2) except: pass time.sleep(0.2) total_time = time.time() - step_start self._log(f"URL after password ({total_time:.1f}s total)", self.page.url[:60]) return True def _check_captcha(self) -> bool: """Проверяет наличие CAPTCHA на странице""" captcha_indicators = [ 'text=Security check', 'text=Security Verification', 'text=Please click verify', 'text=solve the challenge', # AWS puzzle captcha 'text=Verify you are human', '#captcha', '[data-testid="ams-captcha"]', '.wOjKx2rsiwhq8wvEYj3p', # AWS captcha container class 'canvas', # Puzzle captcha uses canvas 'iframe[src*="captcha"]', ] for selector in captcha_indicators: try: if self.page.ele(selector, timeout=0.3): return True except: pass return False def _check_captcha_error(self) -> bool: """Проверяет наличие ошибки Invalid captcha""" try: if self.page.ele('text=Invalid captcha', timeout=0.5): return True except: pass return False def _handle_captcha(self, timeout: int = 120) -> bool: """ Обрабатывает CAPTCHA на странице. AWS использует визуальную капчу (puzzle) которую нужно решить вручную. Args: timeout: Время ожидания решения в секундах (по умолчанию 120) Returns: True если капча решена, False если не удалось """ print(" [CAPTCHA] ========================================") print(" [CAPTCHA] CAPTCHA DETECTED - MANUAL SOLUTION REQUIRED") print(" [CAPTCHA] ========================================") # Делаем скриншот капчи для анализа self.screenshot("captcha_puzzle") # Если headless - капчу не решить if self.headless: print(" [!] Cannot solve CAPTCHA in headless mode!") print(" [!] Restart with --no-headless flag") return False # Выводим инструкции print(f" [CAPTCHA] Please solve the captcha in the browser window") print(f" [CAPTCHA] Waiting up to {timeout} seconds...") print(" [CAPTCHA] ----------------------------------------") # Ждём пока пользователь решит капчу start_time = time.time() last_status = "" while time.time() - start_time < timeout: elapsed = int(time.time() - start_time) # Проверяем Success try: success_elem = self.page.ele('text=Success', timeout=0.3) if success_elem: print(" [CAPTCHA] [OK] Success detected!") time.sleep(0.5) return True except: pass # Проверяем что капча исчезла (страница перешла дальше) if not self._check_captcha(): # Дополнительная проверка - нет ли ошибки Invalid captcha if not self._check_captcha_error(): print(" [CAPTCHA] [OK] Captcha solved!") return True # Проверяем ошибку Invalid captcha if self._check_captcha_error(): if last_status != "invalid": print(" [CAPTCHA] [!] Invalid captcha - please try again") last_status = "invalid" # Выводим статус каждые 10 секунд if elapsed > 0 and elapsed % 10 == 0 and last_status != str(elapsed): remaining = timeout - elapsed print(f" [CAPTCHA] Waiting... {remaining}s remaining") last_status = str(elapsed) time.sleep(0.5) print(" [CAPTCHA] [!] Timeout - captcha not solved") return False def _check_password_error(self) -> bool: """Проверяет наличие ошибки о слабом/утёкшем пароле""" error_texts = [ 'publicly known', 'leaked', 'data set', 'try a different password', 'password is too weak', ] for text in error_texts: try: if self.page.ele(f'text={text}', timeout=0.5): return True except: pass return False def _check_password_mismatch_error(self) -> bool: """Проверяет наличие ошибки о несовпадении паролей""" error_texts = [ 'Passwords must match', 'Passwörter müssen übereinstimmen', # German 'パスワードが一致しません', # Japanese 'Las contraseñas deben coincidir', # Spanish 'Les mots de passe doivent correspondre', # French 'passwords do not match', 'password mismatch', ] for text in error_texts: try: if self.page.ele(f'text={text}', timeout=0.3): return True except: pass return False def has_login_form(self) -> bool: """Проверяет есть ли форма логина на странице""" try: # Ищем поле email или password на странице логина selectors = [ 'input[type="email"]', 'input[placeholder*="email" i]', 'input[placeholder*="username" i]', ] for selector in selectors: elements = self.page.query_selector_all(selector) if elements: return True return False except: return False def login_with_credentials(self, email: str, password: str) -> bool: """Логинится с email и паролем""" print(f"[K] Logging in as {email}...") try: self.close_cookie_dialog() # Вводим email email_selectors = [ 'input[type="email"]', 'input[placeholder*="email" i]', 'input[placeholder*="username" i]', 'input[name="email"]', ] email_field = None for selector in email_selectors: email_field = self.page.query_selector(selector) if email_field: break if email_field: self._behavior.human_click(email_field) email_field.fill(email) print(f" [OK] Email entered") # Нажимаем Continue если есть continue_btn = self.page.query_selector('button:has-text("Continue")') if continue_btn: self._behavior.human_click(continue_btn) self._behavior.human_delay(1.5, 2.5) # Вводим пароль password_selectors = [ 'input[type="password"]', 'input[placeholder*="password" i]', ] password_field = None for selector in password_selectors: password_field = self.page.query_selector(selector) if password_field: break if password_field: self._behavior.human_click(password_field) password_field.fill(password) print(f" [OK] Password entered") # Нажимаем Sign in / Continue sign_in_btn = self.page.query_selector('button:has-text("Sign in"), button:has-text("Continue")') if sign_in_btn: self._behavior.human_click(sign_in_btn) self._behavior.human_delay(2.0, 3.5) print(f" [OK] Logged in") return True return False except Exception as e: print(f" [X] Login error: {e}") return False def click_confirm_and_continue(self) -> bool: """Нажимает Confirm and continue на странице device authorization""" print("[S] Looking for Confirm and continue button...") selectors = [ 'text=Confirm and continue', '@data-testid=confirm-button', 'xpath://button[contains(text(), "Confirm")]', 'tag:button@@text()=Confirm and continue', ] for attempt in range(5): for selector in selectors: try: btn = self.page.ele(selector, timeout=1) if btn: print(f" [OK] Found Confirm button (attempt {attempt + 1})") self._behavior.human_js_click(self.page, btn) return True except: pass self._behavior.human_delay(0.4, 0.7) print(" [!] Confirm and continue button not found") return False def click_allow_access(self) -> bool: """Нажимает Allow access. ОПТИМИЗИРОВАНО: быстрый поиск и клик.""" print("[OK] Looking for Allow access button...") print(f" Current URL: {self.page.url[:80]}...") record('click_allow_access', {'url': self.page.url}, self.page) # Скрываем cookie через CSS (мгновенно, без проверок) self._hide_cookie_banner() # ОПТИМИЗАЦИЯ: Ждём кнопку напрямую, без отдельного ожидания страницы # Используем waitUntil с коротким polling btn = None btn_selector = None # Сохраняем селектор для повторного поиска start_time = time.time() max_wait = 5 # Уменьшено с 10 итераций по 0.3с # Приоритетные селекторы - ТОЛЬКО специфичные для Allow access # НЕ используем общие селекторы типа button[type="submit"] - они находят cookie кнопки! fast_selectors = [ '@data-testid=allow-access-button', 'css:button[data-testid="allow-access-button"]', 'text=Allow access', # Точный текст ] fallback_selectors = [ # AWS UI специфичные 'css:button.awsui_button_vjswe_146je_157', # AWS UI button class 'xpath://button[contains(@class, "awsui") and contains(text(), "Allow")]', # Текстовые селекторы 'text=Allow', 'text=Authorize', 'xpath://button[contains(text(), "Allow access")]', 'xpath://button[contains(text(), "Allow")]', 'xpath://button[contains(text(), "Authorize")]', # Fallback для нового flow '@data-testid=confirm-button', '@data-testid=accept-button', # Общие селекторы ТОЛЬКО в конце 'css:button.awsui-button-variant-primary', 'css:button[class*="variant-primary"]', ] # Фаза 1: Быстрый поиск по data-testid (0.5 сек макс) for selector in fast_selectors: try: btn = self.page.ele(selector, timeout=0.25) if btn: btn_selector = selector print(f" [OK] Found button via {selector[:30]}") break except: pass # Фаза 2: Fallback селекторы если нужно if not btn: for _ in range(20): # ~5 сек максимум for selector in fallback_selectors: try: btn = self.page.ele(selector, timeout=0.15) if btn: btn_selector = selector print(f" [OK] Found button via fallback") break except: pass if btn: break time.sleep(0.1) if not btn: print(" [!] Allow access button not found") self.screenshot("error_no_allow_button") return False # Логируем что нашли try: btn_text = btn.text or btn.attr('value') or 'no text' btn_class = btn.attr('class') or 'no class' print(f" [DEBUG] Button text: '{btn_text}', class: '{btn_class[:50]}'") except: pass # Ждём пока страница стабилизируется time.sleep(1) # Клик с разными методами for attempt in range(3): # Проверяем disabled (с защитой от NoneElement) try: if btn and btn.attr('disabled'): print(f" [!] Button is disabled, waiting...") time.sleep(1) # Перезапрашиваем элемент if btn_selector: btn = self.page.ele(btn_selector, timeout=0.5) continue except: pass print(f"[UNLOCK] Clicking Allow access (attempt {attempt + 1})...") # Метод 1: Прямой клик через DrissionPage try: btn.click() time.sleep(0.5) if '127.0.0.1' in self.page.url: print(" [OK] Redirected to callback!") # Collect scripts after redirect self._collect_scripts() return True except Exception as e: print(f" [!] Direct click failed: {e}") # Метод 2: JS click try: self.page.run_js('arguments[0].click()', btn) time.sleep(0.5) if '127.0.0.1' in self.page.url: print(" [OK] Redirected to callback!") # Collect scripts after redirect self._collect_scripts() return True except Exception as e: print(f" [!] JS click failed: {e}") # Метод 3: Behavior click try: self._behavior.human_js_click(self.page, btn) time.sleep(0.5) if '127.0.0.1' in self.page.url: print(" [OK] Redirected to callback!") # Collect scripts after redirect self._collect_scripts() return True except: pass # Перезапрашиваем элемент (избегаем stale) if btn_selector: try: btn = self.page.ele(btn_selector, timeout=0.5) except: pass time.sleep(1) # Последняя проверка if '127.0.0.1' in self.page.url: return True print(" [!] Allow access button didn't work") self.screenshot("error_allow_access") return False def wait_for_callback(self, timeout: int = None) -> bool: """Ждёт редиректа на callback""" timeout = timeout or self.settings.get('timeouts', {}).get('oauth_callback', 60) print(f"[...] Waiting for callback redirect ({timeout}s)...") for _ in range(timeout): current_url = self.page.url if '127.0.0.1' in current_url and 'oauth/callback' in current_url: return True time.sleep(1) return False @property def current_url(self) -> str: return self.page.url def prewarm(self): """ Прогрев браузера - посещаем несколько сайтов перед AWS. Создаёт реальную историю и выглядит естественнее. """ print("[W] Warming up browser...") # Сайты для прогрева (быстрые, популярные) warmup_sites = [ 'https://www.google.com', 'https://github.com', ] # Выбираем 1-2 случайных сайта sites_to_visit = random.sample(warmup_sites, k=random.randint(1, 2)) for site in sites_to_visit: try: print(f" Visiting {site}...") self.page.get(site) # Ждём загрузку try: self.page.wait.doc_loaded(timeout=5) except: pass # Имитируем просмотр (1-3 сек) self._behavior.simulate_page_reading(self.page, duration=random.uniform(1.0, 3.0)) # Случайный скролл if random.random() < 0.5: self._behavior.scroll_page(self, direction='down', amount=random.randint(100, 300)) except Exception as e: print(f" [!] Warmup failed for {site}: {e}") print(" [OK] Browser warmed up") def navigate(self, url: str): """Переход по URL.""" print(f"[>] Opening page...") record('navigate', {'url': url}, self.page) self.page.get(url) # Ждём загрузку документа print(" [...] Waiting for page load...") try: self.page.wait.doc_loaded(timeout=8) except: pass # Ждём появления элементов страницы (email input или заголовок) page_ready = False for _ in range(20): # 2 секунды максимум try: if self.page.ele('@placeholder=username@example.com', timeout=0.1): page_ready = True break if self.page.ele('text=Get started', timeout=0.05): page_ready = True break except: pass if page_ready: print(f" [OK] Page elements loaded") else: print(f" [!] Page elements not detected, continuing anyway") # Collect JS scripts from this page self._collect_scripts() # Скрываем cookie self._hide_cookie_banner() def check_aws_error(self) -> bool: """Проверяет наличие ошибки AWS""" try: error_text = self.page.ele("text=It's not you, it's us", timeout=1) if error_text: print("[!] AWS temporary error, need to wait and retry") return True except: pass return False def _check_aws_error(self) -> bool: """Проверяет наличие ошибки AWS на странице (модальное окно)""" error_texts = [ 'error processing your request', 'Please try again', "It's not you, it's us", 'Something went wrong', ] for text in error_texts: try: if self.page.ele(f'text={text}', timeout=0.5): return True except: pass return False def _close_error_modal(self) -> bool: """Закрывает модальное окно с ошибкой AWS""" print(" [!] Attempting to close error modal...") # AWS Cloudscape UI - специфичные селекторы из HTML close_selectors = [ # Точный селектор из HTML - кнопка Close alert 'xpath://button[@aria-label="Close alert"]', 'xpath://button[contains(@class, "awsui_dismiss-button")]', 'xpath://button[contains(@class, "dismiss-button")]', # AWS Cloudscape/Polaris UI 'xpath://button[contains(@class, "awsui_dismiss")]', 'xpath://*[contains(@class, "awsui_alert")]//button', # Fallback селекторы 'xpath://button[@aria-label="Close"]', 'xpath://button[@aria-label="Dismiss"]', 'aria:Close alert', 'aria:Close', 'aria:Dismiss', ] for selector in close_selectors: try: btn = self.page.ele(selector, timeout=0.3) if btn: print(f" [!] Found close button: {selector}") self._behavior.human_js_click(self.page, btn) # Проверяем что модалка закрылась if not self._check_aws_error(): print(" [OK] Error modal closed") return True except: pass # Fallback: нажимаем Escape try: print(" [!] Trying Escape key...") self._behavior.human_delay(0.1, 0.3) self.page.run_cdp('Input.dispatchKeyEvent', type='keyDown', key='Escape', code='Escape', windowsVirtualKeyCode=27) self.page.run_cdp('Input.dispatchKeyEvent', type='keyUp', key='Escape', code='Escape', windowsVirtualKeyCode=27) self._behavior.human_delay(0.3, 0.6) if not self._check_aws_error(): print(" [OK] Error modal closed via Escape") return True except: pass # Последний fallback: кликаем вне модалки try: print(" [!] Trying click outside modal...") self._behavior.human_delay(0.1, 0.2) self.page.run_js('document.body.click()') self._behavior.human_delay(0.2, 0.4) except: pass return False def screenshot(self, name: str = "debug") -> Optional[str]: """Сохраняет скриншот для отладки""" if not self.screenshots_on_error: return None try: filename = str(BASE_DIR / f"{name}_{int(time.time())}.png") self.page.get_screenshot(path=filename) print(f"[SCREENSHOT] Screenshot: {filename}") # Записываем в debug recorder если это ошибка if 'error' in name.lower(): recorder = get_recorder() if recorder: recorder.record_error(name, self.page) return filename except Exception as e: print(f"[!] Screenshot failed: {e}") return None def pause_for_debug(self, message: str = "Paused for debugging"): """Пауза для ручной отладки""" if self.settings.get('debug', {}).get('pause_on_error', False): print(f"\n⏸️ {message}") print(" Press Enter to continue...") input() def close(self): """Закрытие браузера""" # Save collected scripts (only if enabled) if COLLECT_SCRIPTS: try: from debugger.collectors.script_collector import save_collected_scripts save_collected_scripts() except Exception as e: if self.verbose: print(f"[ScriptCollector] Error saving: {e}") # Завершаем debug recording recorder = get_recorder() if recorder: recorder.finish() try: self.page.quit() except Exception: pass