Spaces:
Sleeping
Sleeping
| """ | |
| AWS Builder ID Auto-Registration with OAuth PKCE Flow | |
| Правильный flow (как в Kiro IDE): | |
| 1. Register OIDC client + start callback server | |
| 2. Generate PKCE (code_verifier, code_challenge) | |
| 3. Build auth_url: /authorize?client_id=...&code_challenge=... | |
| 4. Open auth_url in browser → AWS redirects to signin/signup | |
| 5. Enter email → Continue → AWS redirects to profile.aws for registration | |
| 6. Enter name → Continue | |
| 7. Enter verification code → Continue | |
| 8. Enter password → Continue | |
| 9. AWS redirects to view.awsapps.com/start | |
| 10. Click "Allow access" button (CRITICAL!) | |
| 11. AWS redirects to 127.0.0.1:PORT/oauth/callback?code=... | |
| 12. Exchange code for tokens via POST /token | |
| 13. Save tokens | |
| """ | |
| import argparse | |
| import time | |
| import re | |
| import random | |
| import threading | |
| import functools | |
| import json | |
| from typing import List, Optional | |
| import sys | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| # Force unbuffered output for real-time logging | |
| print = functools.partial(print, flush=True) | |
| from core.config import get_config | |
| from core.email_generator import EmailGenerator, EmailResult | |
| from .browser import BrowserAutomation | |
| from .mail_handler import get_mail_handler, create_mail_handler_from_env, get_mail_backend | |
| from .oauth_pkce import OAuthPKCE | |
| from .oauth_device import OAuthDevice | |
| config = get_config() | |
| TIMEOUTS = { | |
| 'page_load': config.timeouts.page_load, | |
| 'element_wait': config.timeouts.element_wait, | |
| 'verification_code': config.timeouts.verification_code, | |
| 'oauth_callback': config.timeouts.oauth_callback, | |
| 'between_accounts': config.timeouts.between_accounts, | |
| } | |
| class AccountStorage: | |
| """Простое хранилище аккаунтов""" | |
| def __init__(self): | |
| from core.paths import get_paths | |
| self.paths = get_paths() | |
| self.filepath = self.paths.accounts_file | |
| self._ensure_file() | |
| def _ensure_file(self): | |
| if not self.filepath.exists(): | |
| self.filepath.write_text('[]', encoding='utf-8') | |
| def load_all(self) -> list: | |
| import json | |
| try: | |
| return json.loads(self.filepath.read_text(encoding='utf-8')) | |
| except: | |
| return [] | |
| def save(self, email: str, password: str, name: str, token_file=None) -> dict: | |
| import json | |
| accounts = self.load_all() | |
| account = { | |
| 'email': email, | |
| 'password': password, | |
| 'name': name, | |
| 'token_file': token_file, | |
| 'created_at': time.strftime('%Y-%m-%d %H:%M:%S'), | |
| 'status': 'active' | |
| } | |
| accounts.append(account) | |
| self.filepath.write_text(json.dumps(accounts, indent=2, ensure_ascii=False), encoding='utf-8') | |
| return account | |
| def count(self) -> dict: | |
| accounts = self.load_all() | |
| return { | |
| 'total': len(accounts), | |
| 'active': len([a for a in accounts if a.get('status') == 'active']), | |
| } | |
| class AWSRegistration: | |
| """Регистрация AWS Builder ID через OAuth PKCE Flow (как в Kiro IDE)""" | |
| def __init__(self, headless: bool = False, device_flow: bool = False): | |
| self.storage = AccountStorage() | |
| self.headless = headless | |
| self.device_flow = device_flow | |
| self.browser = None | |
| self.mail_handler = None | |
| self.oauth = None | |
| self.email_generator = None | |
| # Загружаем настройки задержек | |
| from core.config import get_config | |
| config = get_config() | |
| self._human_delays = config.browser.human_delays | |
| self._delay_multiplier = config.browser.delay_multiplier | |
| def _human_delay(self, min_sec: float, max_sec: float): | |
| """Человеческая задержка с учётом настроек""" | |
| if not self._human_delays: | |
| return | |
| delay = random.uniform(min_sec, max_sec) * self._delay_multiplier | |
| time.sleep(delay) | |
| def _simulate_page_arrival(self): | |
| """Симулирует поведение при загрузке новой страницы""" | |
| if not self._human_delays or not self.browser: | |
| return | |
| try: | |
| behavior = self.browser._behavior | |
| # Осматриваем страницу (движения мыши, небольшой скролл) | |
| behavior.simulate_page_reading(self.browser.page, duration=random.uniform(1.5, 3.0) * self._delay_multiplier) | |
| # Колебание перед формой | |
| behavior.simulate_form_hesitation(self.browser.page) | |
| except Exception: | |
| self._human_delay(1.0, 2.5) | |
| def _simulate_after_input(self): | |
| """Симулирует проверку введённых данных""" | |
| if not self._human_delays or not self.browser: | |
| return | |
| try: | |
| behavior = self.browser._behavior | |
| # Микро-движения (проверяем что ввели) | |
| behavior.random_micro_movements(self.browser.page, count=random.randint(2, 5)) | |
| self._human_delay(0.3, 1.0) | |
| except Exception: | |
| self._human_delay(0.5, 1.5) | |
| def _simulate_checking_email(self): | |
| """Симулирует переключение на почту и обратно""" | |
| if not self._human_delays: | |
| self._human_delay(1.0, 2.0) | |
| return | |
| # Человек переключается на почту, ищет письмо, копирует код | |
| # Это занимает 3-8 секунд | |
| delay = random.uniform(3.0, 8.0) * self._delay_multiplier | |
| time.sleep(delay) | |
| # После возврата в браузер - небольшая пауза | |
| if self.browser: | |
| try: | |
| self.browser._behavior.random_micro_movements(self.browser.page, count=2) | |
| except: | |
| pass | |
| def _simulate_distraction(self): | |
| """Симулирует случайное отвлечение (с низкой вероятностью)""" | |
| if not self._human_delays or not self.browser: | |
| return | |
| try: | |
| # 15% шанс отвлечься | |
| self.browser._behavior.simulate_distraction(self.browser.page, probability=0.15) | |
| except Exception: | |
| pass | |
| def _init_mail(self, email_domain: str = None): | |
| """Initialize mail handler from environment settings""" | |
| if not self.mail_handler: | |
| self.mail_handler = create_mail_handler_from_env() | |
| return self.mail_handler | |
| def _init_email_generator(self) -> EmailGenerator: | |
| """Initialize email generator from environment settings""" | |
| if not self.email_generator: | |
| self.email_generator = EmailGenerator.from_env() | |
| return self.email_generator | |
| def _save_account_info(self, result: dict): | |
| email = result.get('email') | |
| if not email: | |
| return | |
| try: | |
| safe_email = re.sub(r'[^A-Za-z0-9@._+-]', '_', email) | |
| output_dir = Path.cwd() / "accounts" | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| output_path = output_dir / f"{safe_email}_info.json" | |
| token_raw = None | |
| token_data = None | |
| token_file = result.get('token_file') | |
| if token_file: | |
| token_path = Path(token_file) | |
| if not token_path.is_absolute(): | |
| token_path = self.storage.paths.tokens_dir / token_file | |
| if token_path.exists(): | |
| token_raw = token_path.read_text(encoding='utf-8', errors='ignore') | |
| try: | |
| token_data = json.loads(token_raw) | |
| except json.JSONDecodeError: | |
| token_data = None | |
| if token_data: | |
| result = result.copy() | |
| result['refreshToken'] = token_data.get('refreshToken') | |
| result['clientId'] = token_data.get('_clientId') | |
| result['clientSecret'] = token_data.get('_clientSecret') | |
| payload = { | |
| "result": result, | |
| "token_raw": token_raw, | |
| } | |
| output_path.write_text(json.dumps(payload, indent=2), encoding='utf-8') | |
| except Exception as e: | |
| print(f"[!] Failed to write account info: {e}") | |
| def _finalize_result(self, result: dict) -> dict: | |
| self._save_account_info(result) | |
| return result | |
| def register_auto(self, password: Optional[str] = None) -> dict: | |
| """ | |
| Автоматическая регистрация с использованием email стратегии. | |
| Email и имя генерируются автоматически на основе настроенной стратегии: | |
| - single: использует IMAP email напрямую | |
| - plus_alias: генерирует user+random@domain | |
| - catch_all: генерирует random@custom-domain | |
| - pool: берёт следующий email:password из списка (поддерживает разные IMAP аккаунты) | |
| Returns: | |
| dict с результатом регистрации | |
| """ | |
| # Инициализируем генератор email | |
| generator = self._init_email_generator() | |
| try: | |
| # Генерируем email по стратегии | |
| email_result = generator.generate() | |
| print(f"[EMAIL] Strategy: {generator.config.strategy}") | |
| print(f"[EMAIL] Registration: {email_result.registration_email}") | |
| print(f"[EMAIL] IMAP lookup: {email_result.imap_lookup_email}") | |
| print(f"[EMAIL] Name: {email_result.display_name}") | |
| # Для pool стратегии с разными паролями - переподключаем IMAP | |
| backend = get_mail_backend() | |
| if email_result.imap_password and backend == "imap": | |
| print(f"[EMAIL] Pool mode: switching IMAP credentials") | |
| from .mail_handler import IMAPMailHandler, get_imap_settings | |
| settings = get_imap_settings() | |
| # Try to connect with pool credentials | |
| max_retries = len(generator.config.email_pool) | |
| connected = False | |
| for attempt in range(max_retries): | |
| try: | |
| if self.mail_handler: | |
| self.mail_handler.disconnect() | |
| self.mail_handler = IMAPMailHandler( | |
| imap_host=settings['host'], | |
| imap_email=email_result.imap_lookup_email, | |
| imap_password=email_result.imap_password | |
| ) | |
| if self.mail_handler.connect(): | |
| connected = True | |
| break | |
| except Exception as e: | |
| print(f"[!] IMAP auth failed for {email_result.imap_lookup_email}: {e}") | |
| # Try next email from pool | |
| if attempt < max_retries - 1: | |
| print(f"[EMAIL] Trying next email from pool...") | |
| email_result = generator.generate() | |
| print(f"[EMAIL] Switched to: {email_result.registration_email}") | |
| if not connected: | |
| raise ValueError("All pool emails failed IMAP authentication") | |
| elif email_result.imap_password and backend != "imap": | |
| print(f"[EMAIL] Pool mode: mail backend '{backend}' ignores IMAP credentials") | |
| # Вызываем основной метод регистрации | |
| result = self.register_single( | |
| email=email_result.registration_email, | |
| name=email_result.display_name, | |
| password=password, | |
| imap_lookup_email=email_result.imap_lookup_email | |
| ) | |
| # Добавляем информацию о стратегии | |
| result['strategy'] = generator.config.strategy | |
| result['imap_lookup_email'] = email_result.imap_lookup_email | |
| return result | |
| except ValueError as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'strategy': generator.config.strategy | |
| } | |
| def register_single(self, email: str, name: Optional[str] = None, | |
| password: Optional[str] = None, | |
| imap_lookup_email: Optional[str] = None) -> dict: | |
| """ | |
| Регистрация одного аккаунта через OAuth PKCE Flow | |
| Args: | |
| email: Email для регистрации в AWS | |
| name: Имя пользователя (генерируется если не указано) | |
| password: Пароль (генерируется если не указан) | |
| imap_lookup_email: Email для поиска в IMAP (для plus_alias стратегии) | |
| Если не указан, используется email | |
| Flow: | |
| 1. Start OAuth (callback server + PKCE + client registration) | |
| 2. Get auth_url from OAuth | |
| 3. Open auth_url in browser → AWS redirects to login/signup | |
| 4. Enter email → Continue → redirects to profile.aws for registration | |
| 5. Enter name → Continue | |
| 6. Enter verification code → Continue | |
| 7. Enter password → Continue | |
| 8. AWS redirects to view.awsapps.com/start | |
| 9. Click "Allow access" button (CRITICAL!) | |
| 10. AWS redirects to callback → OAuth exchanges code for tokens | |
| """ | |
| # Email для поиска в IMAP (может отличаться от registration email) | |
| lookup_email = imap_lookup_email or email | |
| # Генерируем имя из email если не указано | |
| if name is None: | |
| username = email.split('@')[0] | |
| name_part = re.sub(r'\d+$', '', username) | |
| name = re.sub(r'([a-z])([A-Z])', r'\1 \2', name_part) | |
| # Генерируем пароль если не указан | |
| if password is None: | |
| password = BrowserAutomation.generate_password() | |
| # Инициализируем почту (использует настройки из env) | |
| mail_handler = self._init_mail() | |
| if not mail_handler: | |
| return self._finalize_result({'email': email, 'success': False, 'error': 'Mail handler not available. Check IMAP settings.'}) | |
| try: | |
| # ШАГ 1: Запускаем OAuth flow | |
| flow_type = "Device" if self.device_flow else "PKCE" | |
| print(f"\n[1/8] Starting OAuth {flow_type} flow...") | |
| if self.oauth: | |
| self.oauth.close() | |
| self.oauth = OAuthDevice() if self.device_flow else OAuthPKCE() | |
| # Получаем auth_url (это также запускает callback server и регистрирует client) | |
| auth_url = self.oauth.start(account_name=email.split('@')[0]) | |
| if not auth_url: | |
| return self._finalize_result({'email': email, 'success': False, 'error': 'Failed to start OAuth flow'}) | |
| if self.device_flow: | |
| print(f" [OK] Device flow started") | |
| else: | |
| print(f" [OK] OAuth started, callback server on port {self.oauth.port}") | |
| print(f" Auth URL: {auth_url[:80]}...") | |
| # ШАГ 2: Открываем браузер с auth_url | |
| print(f"\n[2/8] Opening browser with OAuth authorize URL...") | |
| if self.browser: | |
| self.browser.close() | |
| self.browser = BrowserAutomation(headless=self.headless, email=email) | |
| # Спуфинг уже применён в BrowserAutomation.__init__ | |
| # через apply_pre_navigation_spoofing | |
| # Прогрев браузера - создаём реальную историю | |
| if self._human_delays: | |
| self.browser.prewarm() | |
| # Открываем OAuth authorize URL (НЕ profile.aws напрямую!) | |
| print(f" Opening: {auth_url[:60]}...") | |
| self.browser.navigate(auth_url) | |
| # Проверяем на ошибку AWS | |
| if self.browser.check_aws_error(): | |
| return self._finalize_result({'email': email, 'success': False, 'error': 'AWS temporary error'}) | |
| # ШАГ 3: Вводим email | |
| print(f"[3/8] Entering email: {email}") | |
| # Человек осматривает страницу перед вводом | |
| self._simulate_page_arrival() | |
| self.browser.enter_email(email) | |
| # Проверяет что ввёл | |
| self._simulate_after_input() | |
| self.browser.click_continue() | |
| # Пауза между шагами | |
| self._human_delay(1.5, 3.0) | |
| # ШАГ 4: Вводим имя | |
| print(f"[4/8] Entering name: {name}") | |
| # Осматриваем новую страницу | |
| self._simulate_page_arrival() | |
| # Иногда отвлекаемся | |
| self._simulate_distraction() | |
| self.browser.enter_name(name) | |
| # Пауза между шагами | |
| self._human_delay(2.0, 4.0) | |
| # ШАГ 5: Получаем и вводим код верификации | |
| print(f"[5/8] Waiting for verification code (lookup: {lookup_email})...") | |
| code = mail_handler.get_verification_code(lookup_email, timeout=TIMEOUTS['verification_code']) | |
| if not code: | |
| return self._finalize_result({'email': email, 'success': False, 'error': 'Verification code not received'}) | |
| print(f"[5/8] Entering code: {code}") | |
| # Человек переключается на почту и обратно | |
| self._simulate_checking_email() | |
| self.browser.enter_verification_code(code) | |
| # Пауза между шагами | |
| self._human_delay(2.0, 4.0) | |
| # ШАГ 6: Вводим пароль (с retry при captcha/password error) | |
| print(f"[6/8] Setting password...") | |
| # Осматриваем страницу пароля | |
| self._simulate_page_arrival() | |
| max_password_retries = 3 | |
| for pwd_attempt in range(max_password_retries): | |
| if self.browser.enter_password(password): | |
| break | |
| else: | |
| if pwd_attempt < max_password_retries - 1: | |
| print(f" [R] Password attempt {pwd_attempt + 1} failed, retrying with new password...") | |
| password = self.browser.generate_password(18) | |
| time.sleep(1) | |
| else: | |
| raise Exception("Failed to set password after multiple attempts (captcha or validation error)") | |
| # ШАГ 7: Ждём редирект на view.awsapps.com и кликаем "Allow access" | |
| # Таймаут настраивается в config.timeouts.allow_access_wait | |
| allow_access_timeout = config.timeouts.allow_access_wait if hasattr(config.timeouts, 'allow_access_wait') else 90 | |
| print(f"[7/8] Waiting for Allow access page (timeout: {allow_access_timeout}s)...") | |
| # ОПТИМИЗИРОВАНО: быстрый polling с минимальными задержками | |
| start_time = time.time() | |
| allow_clicked = False | |
| last_url = "" | |
| while time.time() - start_time < allow_access_timeout: | |
| current_url = self.browser.current_url | |
| # Логируем изменение URL | |
| if current_url != last_url: | |
| print(f" [URL] {current_url[:70]}...") | |
| last_url = current_url | |
| if '127.0.0.1' in current_url and 'oauth/callback' in current_url: | |
| print(f" [OK] Already redirected to callback!") | |
| break | |
| # Кнопка Allow access на view.awsapps.com (старый flow) | |
| if 'view.awsapps.com' in current_url: | |
| elapsed = time.time() - start_time | |
| print(f" [OK] Redirected to view.awsapps.com in {elapsed:.2f}s") | |
| # Принимаем cookie если есть | |
| self.browser.close_cookie_dialog(force=True) | |
| time.sleep(0.5) | |
| if not allow_clicked: | |
| # Кликаем Allow access | |
| if self.browser.click_allow_access(): | |
| allow_clicked = True | |
| else: | |
| print(f" [!] Failed to click Allow access, retrying...") | |
| self.browser.screenshot("error_allow_access_click") | |
| time.sleep(1) | |
| # Новый flow: awsapps.com/start (без view.) | |
| elif 'awsapps.com/start' in current_url and not allow_clicked: | |
| elapsed = time.time() - start_time | |
| print(f" [OK] Redirected to awsapps.com/start in {elapsed:.2f}s") | |
| self.browser.close_cookie_dialog(force=True) | |
| time.sleep(0.5) | |
| if self.browser.click_allow_access(): | |
| allow_clicked = True | |
| # Если на signin.aws - это новый flow AWS (декабрь 2024+) | |
| elif 'signin.aws' in current_url: | |
| elapsed = time.time() - start_time | |
| # Принимаем cookie | |
| self.browser.close_cookie_dialog(force=True) | |
| if '/login' in current_url and elapsed > 10: | |
| print(f" [!] Stuck on login page, trying to login...") | |
| if self.browser.login_with_credentials(email, password): | |
| print(f" [OK] Logged in successfully") | |
| elif '/signup' in current_url or '/platform/' in current_url: | |
| # Новый flow: signin.aws/platform/.../signup содержит Allow access | |
| if not allow_clicked: | |
| if elapsed > 3: # Даём странице загрузиться | |
| if int(elapsed) % 5 == 0: | |
| print(f" [...] Looking for Allow access on signin.aws ({elapsed:.0f}s)...") | |
| # Пробуем кликнуть Allow access | |
| if self.browser.click_allow_access(): | |
| allow_clicked = True | |
| print(f" [OK] Clicked Allow access on signin.aws") | |
| else: | |
| # Пробуем Continue как fallback | |
| self.browser._click_if_exists(['text=Continue', '@data-testid=test-primary-button'], timeout=0.3) | |
| # Делаем скриншот один раз для диагностики | |
| if elapsed > 30 and elapsed < 32: | |
| self.browser.screenshot("debug_stuck_signup") | |
| # На profile.aws - ждём редирект | |
| elif 'profile.aws' in current_url: | |
| # Принимаем cookie | |
| self.browser.close_cookie_dialog(force=True) | |
| elapsed = time.time() - start_time | |
| if elapsed > 10 and int(elapsed) % 5 == 0: | |
| print(f" [...] Waiting on profile.aws ({elapsed:.0f}s)...") | |
| time.sleep(0.2) | |
| # ШАГ 8: Ждём callback и обмениваем code на токены | |
| print(f"[8/8] Waiting for OAuth callback...") | |
| # Ждём callback (OAuth сервер обработает его автоматически) | |
| success = self.oauth.wait_for_callback(timeout=TIMEOUTS['oauth_callback']) | |
| if success: | |
| token_file = self.oauth.get_token_filename() | |
| # Сохраняем аккаунт С токеном | |
| self.storage.save(email, password, name, token_file) | |
| print(f"\n[OK] SUCCESS: {email}") | |
| print(f" Password: {password}") | |
| print(f" Token: {token_file}") | |
| return self._finalize_result({ | |
| 'email': email, | |
| 'password': password, | |
| 'name': name, | |
| 'token_file': token_file, | |
| 'provider': 'Google', # Automated всегда через Google | |
| 'auth_method': 'social', | |
| 'idp': 'Google', # ВАЖНО: для Web Portal API! | |
| 'success': True | |
| }) | |
| else: | |
| # OAuth callback не получен, но регистрация могла пройти | |
| print(f" [!] OAuth callback not received") | |
| # Проверяем текущий URL | |
| current_url = self.browser.current_url | |
| print(f" Current URL: {current_url[:60]}...") | |
| # Если мы на callback URL, пробуем обработать вручную | |
| if '127.0.0.1' in current_url and 'code=' in current_url: | |
| print(f" Found code in URL, but callback wasn't processed") | |
| # Сохраняем аккаунт без токена (для device flow) | |
| self.storage.save(email, password, name, None) | |
| # БЕЗ токена - это НЕ успех для автоматической регистрации! | |
| return self._finalize_result({ | |
| 'email': email, | |
| 'password': password, | |
| 'name': name, | |
| 'token_file': None, | |
| 'success': False, | |
| 'error': 'Registration complete but token not obtained. OAuth callback failed.' | |
| }) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return self._finalize_result({'email': email, 'success': False, 'error': str(e)}) | |
| def register_batch(self, emails: List[str], names: List[str] = None) -> List[dict]: | |
| """Пакетная регистрация""" | |
| if names is None: | |
| names = [None] * len(emails) | |
| results = [] | |
| for i, (email, name) in enumerate(zip(emails, names)): | |
| print(f"\n{'='*60}") | |
| print(f"Account {i+1}/{len(emails)}: {email}") | |
| print('='*60) | |
| result = self.register_single(email, name) | |
| results.append(result) | |
| if i < len(emails) - 1: | |
| print(f"\n[...] Pause {TIMEOUTS['between_accounts']}s...") | |
| time.sleep(TIMEOUTS['between_accounts']) | |
| return results | |
| def print_summary(self, results: List[dict]): | |
| """Итоги регистрации""" | |
| print("\n" + "="*60) | |
| print("[STATS] SUMMARY") | |
| print("="*60) | |
| success = [r for r in results if r.get('success')] | |
| failed = [r for r in results if not r.get('success')] | |
| print(f"[OK] Success: {len(success)}") | |
| print(f"[X] Failed: {len(failed)}") | |
| if success: | |
| print("\nSuccessful:") | |
| for r in success: | |
| token_info = f" (token: {r.get('token_file', 'none')})" if r.get('token_file') else " (no token)" | |
| print(f" {r['email']} : {r['password']}{token_info}") | |
| if failed: | |
| print("\nFailed:") | |
| for r in failed: | |
| error_msg = str(r.get('error', 'Unknown error')) | |
| # Sanitize non-ASCII characters for Windows console | |
| error_msg = error_msg.encode('ascii', 'replace').decode('ascii') | |
| print(f" {r['email']} - {error_msg}") | |
| def close(self): | |
| if self.mail_handler: | |
| self.mail_handler.disconnect() | |
| if self.browser: | |
| self.browser.close() | |
| if self.oauth: | |
| self.oauth.close() | |
| def generate_emails(count: int, domain: str = '') -> List[tuple]: | |
| """Генерация email адресов""" | |
| import random | |
| first_names = ['James', 'John', 'Robert', 'Michael', 'David', 'Mary', 'Jennifer', 'Linda', 'Alex', 'Sam'] | |
| last_names = ['Smith', 'Johnson', 'Williams', 'Brown', 'Jones', 'Garcia', 'Miller', 'Davis'] | |
| results = [] | |
| used = set() | |
| for _ in range(count): | |
| first = random.choice(first_names) | |
| last = random.choice(last_names) | |
| for _ in range(100): | |
| num = random.randint(100, 9999) | |
| email = f"{first}{last}{num}@{domain}" | |
| if email.lower() not in used: | |
| used.add(email.lower()) | |
| results.append((email, f"{first} {last}")) | |
| break | |
| return results | |
| def main(): | |
| parser = argparse.ArgumentParser(description='AWS Builder ID Auto-Registration') | |
| parser.add_argument('--email', '-e', help='Email для регистрации') | |
| parser.add_argument('--count', '-c', type=int, help='Количество аккаунтов') | |
| parser.add_argument('--headless', action='store_true', help='Без GUI') | |
| parser.add_argument('--yes', '-y', action='store_true', help='Автоматическое подтверждение (без prompt)') | |
| args = parser.parse_args() | |
| emails = [] | |
| names = None | |
| if args.email: | |
| emails = [args.email] | |
| elif args.count: | |
| generated = generate_emails(args.count) | |
| emails = [e for e, _ in generated] | |
| names = [n for _, n in generated] | |
| print(f"Generated {len(emails)} accounts") | |
| else: | |
| # Если запущено без --yes, спрашиваем email | |
| if not args.yes: | |
| email = input("Email: ").strip() | |
| if email: | |
| emails = [email] | |
| if not emails: | |
| print("No emails") | |
| return | |
| print(f"\nWill register: {len(emails)} accounts") | |
| reg = AWSRegistration(headless=args.headless) | |
| try: | |
| results = reg.register_batch(emails, names) | |
| reg.print_summary(results) | |
| finally: | |
| reg.close() | |
| if __name__ == '__main__': | |
| main() | |