""" OAuth клиент - Device Authorization Flow Альтернативный метод авторизации без локального сервера. Flow: 1. Register client → получаем clientId, clientSecret 2. POST /device_authorization → получаем device_code, user_code, verification_uri 3. Открываем браузер с verification_uri 4. Пользователь вводит user_code и авторизуется 5. Polling POST /token пока не получим токены """ import json import hashlib import requests import time import webbrowser from datetime import datetime, timedelta from pathlib import Path from typing import Optional, Dict, Tuple import sys sys.path.insert(0, str(Path(__file__).parent.parent)) from core.paths import get_paths from core.kiro_config import ( get_kiro_user_agent, get_kiro_scopes, get_client_id_hash, ) _paths = get_paths() TOKENS_DIR = _paths.tokens_dir # AWS SSO OIDC Configuration OIDC_REGION = "us-east-1" OIDC_BASE = f"https://oidc.{OIDC_REGION}.amazonaws.com" START_URL = "https://view.awsapps.com/start" KIRO_SCOPES = get_kiro_scopes() class OAuthDevice: """OAuth клиент с Device Authorization Flow""" def __init__(self): self.tokens_dir = TOKENS_DIR self.output_lines = [] self.token_filename = None self.account_name = None # Client registration self.client_id = None self.client_secret = None # Device auth self.device_code = None self.user_code = None self.verification_uri = None self.interval = 5 # Auth URL (verification URI) self.auth_url = None def _register_client(self) -> Tuple[str, str]: """Register OIDC client for device flow""" print("[OAuth-Device] Registering OIDC client...") headers = { "Content-Type": "application/json", "User-Agent": get_kiro_user_agent(), "Accept": "application/json", } resp = requests.post( f"{OIDC_BASE}/client/register", json={ "clientName": "Kiro Account Switcher", "clientType": "public", "scopes": KIRO_SCOPES, "grantTypes": ["urn:ietf:params:oauth:grant-type:device_code", "refresh_token"], "issuerUrl": START_URL }, headers=headers, timeout=30 ) if resp.status_code != 200: raise Exception(f"Client registration failed: {resp.text}") data = resp.json() print(f"[OAuth-Device] ✓ Client registered: {data['clientId'][:20]}...") return data["clientId"], data.get("clientSecret", "") def _start_device_auth(self) -> Dict: """Start device authorization flow""" print("[OAuth-Device] Starting device authorization...") headers = { "Content-Type": "application/json", "User-Agent": get_kiro_user_agent(), "Accept": "application/json", } resp = requests.post( f"{OIDC_BASE}/device_authorization", json={ "clientId": self.client_id, "clientSecret": self.client_secret, "startUrl": START_URL }, headers=headers, timeout=30 ) if resp.status_code != 200: raise Exception(f"Device authorization failed: {resp.text}") data = resp.json() self.device_code = data["deviceCode"] self.user_code = data["userCode"] self.verification_uri = data.get("verificationUriComplete") or data.get("verificationUri") self.interval = data.get("interval", 5) print(f"[OAuth-Device] ✓ Device code obtained") print(f"[OAuth-Device] User code: {self.user_code}") return data def _poll_for_token(self, timeout: int = 300) -> Dict: """Poll for token after user authorizes""" print("[OAuth-Device] Waiting for authorization...") headers = { "Content-Type": "application/json", "User-Agent": get_kiro_user_agent(), "Accept": "application/json", } start_time = time.time() while time.time() - start_time < timeout: time.sleep(self.interval) resp = requests.post( f"{OIDC_BASE}/token", json={ "clientId": self.client_id, "clientSecret": self.client_secret, "grantType": "urn:ietf:params:oauth:grant-type:device_code", "deviceCode": self.device_code }, headers=headers, timeout=30 ) if resp.status_code == 200: print("[OAuth-Device] ✓ Token obtained!") return resp.json() data = resp.json() error = data.get("error", "") if error == "authorization_pending": continue elif error == "slow_down": self.interval += 1 continue elif error == "expired_token": raise Exception("Device code expired") elif error == "access_denied": raise Exception("Access denied by user") else: raise Exception(f"Token error: {resp.text}") raise Exception("Authorization timeout") def _save_token(self, token_data: Dict, account_name: str) -> str: """Save token to file""" import re from core.kiro_config import get_machine_id timestamp = int(datetime.now().timestamp() * 1000) safe_name = re.sub(r'[^a-zA-Z0-9_-]', '_', account_name) filename = f"token-BuilderId-IdC-{safe_name}-{timestamp}.json" filepath = self.tokens_dir / filename expires_in = token_data.get('expiresIn', token_data.get('expires_in', 3600)) expires_at = (datetime.utcnow() + timedelta(seconds=expires_in)).isoformat() + 'Z' client_id_hash = get_client_id_hash(START_URL) # ВАЖНО: Сохраняем machine ID который использовался при регистрации! current_machine_id = get_machine_id() token_file = { "accessToken": token_data.get('accessToken', token_data.get('access_token')), "refreshToken": token_data.get('refreshToken', token_data.get('refresh_token')), "expiresAt": expires_at, "tokenType": token_data.get('tokenType', token_data.get('token_type', 'Bearer')), "clientIdHash": client_id_hash, "accountName": account_name, "provider": "BuilderId", "authMethod": "DeviceFlow", "region": OIDC_REGION, "createdAt": datetime.now().isoformat(), "_clientId": self.client_id, "_clientSecret": self.client_secret, "_machineId": current_machine_id # ANTI-BAN: сохраняем machine ID! } filepath.write_text(json.dumps(token_file, indent=2)) print(f"[OAuth-Device] ✓ Token saved to: {filepath}") self.output_lines.append(f"Token saved to: {filepath}") return filename def start(self, account_name: str = 'auto') -> Optional[str]: """ Start OAuth device flow and return verification URL Returns: Verification URL for user to open in browser """ try: self.account_name = account_name # 1. Register client self.client_id, self.client_secret = self._register_client() # 2. Start device authorization self._start_device_auth() # 3. Build verification URL self.auth_url = self.verification_uri or f"{START_URL}?user_code={self.user_code}" print(f"[OAuth-Device] Verification URL: {self.auth_url}") self.output_lines.append(f"Verification URL:\n{self.auth_url}") self.output_lines.append(f"User Code: {self.user_code}") return self.auth_url except Exception as e: print(f"[OAuth-Device] Error: {e}") self.output_lines.append(f"Error: {e}") return None def wait_for_callback(self, timeout: int = 300) -> bool: """ Poll for token after user authorizes in browser Returns: True if successful, False on error or timeout """ try: # Poll for token token_data = self._poll_for_token(timeout) # Save token self.token_filename = self._save_token(token_data, self.account_name) self.output_lines.append("Authentication successful!") return True except Exception as e: print(f"[OAuth-Device] Error: {e}") self.output_lines.append(f"Error: {e}") return False def get_auth_url(self) -> Optional[str]: """Get the verification URL""" return self.auth_url def get_user_code(self) -> Optional[str]: """Get the user code to display""" return self.user_code def get_token_filename(self) -> Optional[str]: """Get saved token filename""" return self.token_filename def close(self): """Cleanup (nothing to do for device flow)""" pass