Spaces:
Sleeping
Sleeping
| """ | |
| WebView OAuth - Manual Registration via Real Browser | |
| Использует pywebview для открытия OAuth окна и перехвата callback. | |
| Аналог подхода из kiro-account-manager (Tauri). | |
| Flow: | |
| 1. InitiateLogin через KiroWebPortal API (CBOR) | |
| 2. Открываем WebView с authorize_url | |
| 3. Пользователь логинится вручную | |
| 4. Перехватываем редирект на https://app.kiro.dev/signin/oauth?code=... | |
| 5. ExchangeToken через KiroWebPortal API (CBOR) | |
| 6. Сохраняем токены | |
| """ | |
| import secrets | |
| import hashlib | |
| import base64 | |
| import logging | |
| import threading | |
| import time | |
| from typing import Optional, Dict, Any, Tuple | |
| from urllib.parse import urlparse, parse_qs | |
| logger = logging.getLogger(__name__) | |
| # Constants | |
| KIRO_WEB_PORTAL = "https://app.kiro.dev" | |
| KIRO_REDIRECT_URI = "https://app.kiro.dev/signin/oauth" | |
| def generate_pkce() -> Tuple[str, str]: | |
| """Generate PKCE code_verifier and code_challenge""" | |
| code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=') | |
| code_challenge = base64.urlsafe_b64encode( | |
| hashlib.sha256(code_verifier.encode('utf-8')).digest() | |
| ).decode('utf-8').rstrip('=') | |
| return code_verifier, code_challenge | |
| class WebViewOAuth: | |
| """ | |
| OAuth через WebView окно | |
| Открывает нативное окно браузера, пользователь логинится, | |
| перехватываем callback URL с code. | |
| """ | |
| def __init__(self): | |
| self.code: Optional[str] = None | |
| self.state: Optional[str] = None | |
| self.error: Optional[str] = None | |
| self._window = None | |
| self._callback_received = threading.Event() | |
| def _on_navigation(self, url: str) -> bool: | |
| """ | |
| Callback при навигации в WebView | |
| Returns: | |
| True - разрешить навигацию | |
| False - заблокировать навигацию | |
| """ | |
| logger.info(f"[WebView] Navigation: {url[:100]}...") | |
| # Проверяем callback URL | |
| if url.startswith(KIRO_REDIRECT_URI) and 'code=' in url: | |
| logger.info("[WebView] Callback URL detected!") | |
| # Парсим параметры | |
| parsed = urlparse(url) | |
| params = parse_qs(parsed.query) | |
| self.code = params.get('code', [None])[0] | |
| self.state = params.get('state', [None])[0] | |
| if self.code: | |
| logger.info(f"[WebView] Got code: {self.code[:20]}...") | |
| self._callback_received.set() | |
| # Закрываем окно | |
| if self._window: | |
| try: | |
| self._window.destroy() | |
| except: | |
| pass | |
| return False # Блокируем навигацию | |
| # Проверяем ошибку | |
| if 'error=' in url: | |
| parsed = urlparse(url) | |
| params = parse_qs(parsed.query) | |
| self.error = params.get('error', ['Unknown error'])[0] | |
| error_desc = params.get('error_description', [''])[0] | |
| logger.error(f"[WebView] OAuth error: {self.error} - {error_desc}") | |
| self._callback_received.set() | |
| if self._window: | |
| try: | |
| self._window.destroy() | |
| except: | |
| pass | |
| return False | |
| return True # Разрешаем навигацию | |
| def open_auth_window(self, authorize_url: str, title: str = "Login with Google") -> bool: | |
| """ | |
| Открыть WebView окно для авторизации | |
| Args: | |
| authorize_url: URL для авторизации | |
| title: Заголовок окна | |
| Returns: | |
| True если получен code, False при ошибке/отмене | |
| """ | |
| try: | |
| import webview | |
| except ImportError: | |
| logger.error("[WebView] pywebview not installed! Run: pip install pywebview") | |
| raise RuntimeError("pywebview not installed. Run: pip install pywebview") | |
| logger.info(f"[WebView] Opening auth window: {authorize_url[:80]}...") | |
| # Создаём окно | |
| self._window = webview.create_window( | |
| title=title, | |
| url=authorize_url, | |
| width=500, | |
| height=700, | |
| resizable=True, | |
| text_select=False, | |
| confirm_close=False | |
| ) | |
| # Polling для проверки URL в отдельном потоке | |
| def poll_url(): | |
| """Poll URL changes and detect OAuth callback""" | |
| time.sleep(2) # Wait for window to initialize | |
| while not self._callback_received.is_set(): | |
| try: | |
| if self._window: | |
| current_url = self._window.get_current_url() | |
| if current_url: | |
| if not self._on_navigation(current_url): | |
| break | |
| except Exception as e: | |
| # Window might be closed | |
| if "window" in str(e).lower() or "destroyed" in str(e).lower(): | |
| break | |
| time.sleep(0.5) | |
| # Start polling thread before webview.start() | |
| poll_thread = threading.Thread(target=poll_url, daemon=True) | |
| poll_thread.start() | |
| # webview.start() MUST run in main thread on Windows | |
| # It blocks until all windows are closed | |
| try: | |
| webview.start(debug=False) | |
| except Exception as e: | |
| logger.error(f"[WebView] Error: {e}") | |
| # Wait a bit for callback processing | |
| self._callback_received.wait(timeout=2) | |
| return self.code is not None | |
| class KiroWebPortalClient: | |
| """ | |
| Клиент для Kiro Web Portal API (CBOR) | |
| Реализует InitiateLogin и ExchangeToken endpoints. | |
| """ | |
| def __init__(self): | |
| self.base_url = KIRO_WEB_PORTAL | |
| self._session = None | |
| def session(self): | |
| if self._session is None: | |
| import requests | |
| self._session = requests.Session() | |
| self._session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', | |
| 'Accept': 'application/cbor', | |
| 'Content-Type': 'application/cbor', | |
| 'smithy-protocol': 'rpc-v2-cbor', | |
| }) | |
| return self._session | |
| def _cbor_encode(self, data: dict) -> bytes: | |
| """Encode data to CBOR""" | |
| try: | |
| import cbor2 | |
| return cbor2.dumps(data) | |
| except ImportError: | |
| raise RuntimeError("cbor2 not installed. Run: pip install cbor2") | |
| def _cbor_decode(self, data: bytes) -> dict: | |
| """Decode CBOR data""" | |
| import cbor2 | |
| return cbor2.loads(data) | |
| def initiate_login(self, idp: str, code_challenge: str, state: str) -> Dict[str, Any]: | |
| """ | |
| InitiateLogin - начать OAuth flow | |
| POST /service/KiroWebPortalService/operation/InitiateLogin | |
| Body (CBOR): {idp, redirectUri, codeChallenge, codeChallengeMethod, state} | |
| Returns: | |
| {redirectUrl: str} - URL для авторизации | |
| """ | |
| url = f"{self.base_url}/service/KiroWebPortalService/operation/InitiateLogin" | |
| payload = { | |
| 'idp': idp, | |
| 'redirectUri': KIRO_REDIRECT_URI, | |
| 'codeChallenge': code_challenge, | |
| 'codeChallengeMethod': 'S256', | |
| 'state': state, | |
| } | |
| logger.info(f"[WebPortal] InitiateLogin: idp={idp}, state={state[:20]}...") | |
| response = self.session.post(url, data=self._cbor_encode(payload)) | |
| if response.status_code != 200: | |
| logger.error(f"[WebPortal] InitiateLogin failed: {response.status_code}") | |
| raise RuntimeError(f"InitiateLogin failed: {response.status_code}") | |
| result = self._cbor_decode(response.content) | |
| logger.info(f"[WebPortal] InitiateLogin success, redirectUrl: {result.get('redirectUrl', '')[:80]}...") | |
| return result | |
| def exchange_token(self, idp: str, code: str, code_verifier: str, state: str) -> Dict[str, Any]: | |
| """ | |
| ExchangeToken - обменять code на токены | |
| POST /service/KiroWebPortalService/operation/ExchangeToken | |
| Body (CBOR): {idp, code, codeVerifier, redirectUri, state} | |
| Returns: | |
| {accessToken, csrfToken, expiresIn, profileArn} + cookies (RefreshToken) | |
| """ | |
| url = f"{self.base_url}/service/KiroWebPortalService/operation/ExchangeToken" | |
| payload = { | |
| 'idp': idp, | |
| 'code': code, | |
| 'codeVerifier': code_verifier, | |
| 'redirectUri': KIRO_REDIRECT_URI, | |
| 'state': state, | |
| } | |
| logger.info(f"[WebPortal] ExchangeToken: idp={idp}, code={code[:20]}...") | |
| response = self.session.post(url, data=self._cbor_encode(payload)) | |
| if response.status_code != 200: | |
| logger.error(f"[WebPortal] ExchangeToken failed: {response.status_code}") | |
| raise RuntimeError(f"ExchangeToken failed: {response.status_code}") | |
| result = self._cbor_decode(response.content) | |
| # Извлекаем RefreshToken из cookies | |
| refresh_token = None | |
| for cookie in response.cookies: | |
| if cookie.name == 'RefreshToken': | |
| refresh_token = cookie.value | |
| break | |
| result['refreshToken'] = refresh_token | |
| result['idp'] = idp | |
| logger.info(f"[WebPortal] ExchangeToken success, accessToken: {result.get('accessToken', '')[:20]}...") | |
| return result | |
| def webview_register(email: str, name: str = None, provider: str = 'Google', timeout: int = 300) -> Dict[str, Any]: | |
| """ | |
| Регистрация через WebView | |
| Args: | |
| email: Email (для подсказки пользователю) | |
| name: Display name для аккаунта (используется в имени файла токена) | |
| provider: OAuth provider ('Google' или 'Github') | |
| timeout: Таймаут в секундах | |
| Returns: | |
| Dict с результатом регистрации | |
| """ | |
| # Use name if provided, otherwise extract from email | |
| account_name = name or email.split('@')[0] | |
| print("\n" + "="*60) | |
| print("🌐 WEBVIEW REGISTRATION (Low Ban Risk)") | |
| print("="*60) | |
| print(f"Provider: {provider}") | |
| print(f"Account name: {account_name}") | |
| print(f"Email hint: {email}") | |
| print("="*60 + "\n") | |
| try: | |
| # 1. Генерируем PKCE | |
| code_verifier, code_challenge = generate_pkce() | |
| state = secrets.token_urlsafe(32) | |
| logger.info(f"[WebView] Generated PKCE, state: {state[:20]}...") | |
| # 2. InitiateLogin | |
| client = KiroWebPortalClient() | |
| init_result = client.initiate_login( | |
| idp=provider, | |
| code_challenge=code_challenge, | |
| state=state | |
| ) | |
| authorize_url = init_result.get('redirectUrl') | |
| if not authorize_url: | |
| return { | |
| 'email': email, | |
| 'success': False, | |
| 'error': 'No redirectUrl in InitiateLogin response', | |
| } | |
| # 3. Открываем WebView | |
| print("📱 Opening browser window for login...") | |
| print(" Please log in with your credentials.\n") | |
| oauth = WebViewOAuth() | |
| success = oauth.open_auth_window( | |
| authorize_url=authorize_url, | |
| title=f"Login with {provider}" | |
| ) | |
| if not success or not oauth.code: | |
| return { | |
| 'email': email, | |
| 'success': False, | |
| 'error': oauth.error or 'OAuth cancelled or timeout', | |
| } | |
| # 4. ExchangeToken | |
| print("🔄 Exchanging code for tokens...") | |
| token_result = client.exchange_token( | |
| idp=provider, | |
| code=oauth.code, | |
| code_verifier=code_verifier, | |
| state=oauth.state or state | |
| ) | |
| # 5. Сохраняем токены | |
| print("✅ Authentication successful!") | |
| print(f" Access token: {token_result.get('accessToken', '')[:20]}...") | |
| # Сохраняем через TokenService | |
| from ..services.token_service import TokenService | |
| token_service = TokenService() | |
| # Prepare token data | |
| token_data = { | |
| 'accessToken': token_result['accessToken'], | |
| 'refreshToken': token_result.get('refreshToken'), | |
| 'expiresIn': token_result.get('expiresIn', 3600), | |
| 'csrfToken': token_result.get('csrfToken'), | |
| 'profileArn': token_result.get('profileArn'), | |
| 'idp': provider, | |
| 'email': email, | |
| 'accountName': account_name, | |
| } | |
| token_file = token_service.save_token(data=token_data, name=account_name) | |
| return { | |
| 'email': email, | |
| 'success': True, | |
| 'token_file': token_file, | |
| 'access_token': token_result['accessToken'], | |
| 'refresh_token': token_result.get('refreshToken'), | |
| 'csrf_token': token_result.get('csrfToken'), | |
| 'expires_in': token_result.get('expiresIn'), | |
| 'profile_arn': token_result.get('profileArn'), | |
| 'idp': provider, | |
| 'strategy': 'webview', | |
| 'ban_risk': 'low', | |
| } | |
| except Exception as e: | |
| logger.error(f"[WebView] Registration error: {e}", exc_info=True) | |
| return { | |
| 'email': email, | |
| 'success': False, | |
| 'error': str(e), | |
| } | |
| if __name__ == '__main__': | |
| # Test | |
| logging.basicConfig(level=logging.INFO) | |
| result = webview_register('test@example.com', 'Google') | |
| print(f"\nResult: {result}") | |