| """Token 刷新器""" |
| import httpx |
| from datetime import datetime, timezone, timedelta |
| from typing import Tuple |
|
|
| from .types import KiroCredentials |
| from .fingerprint import generate_machine_id, get_kiro_version |
|
|
|
|
| |
| KIRO_AUTH_ENDPOINT = "https://prod.us-east-1.auth.desktop.kiro.dev" |
|
|
|
|
| class TokenRefresher: |
| """Token 刷新器""" |
| |
| def __init__(self, credentials: KiroCredentials): |
| self.credentials = credentials |
| |
| def get_refresh_url(self) -> str: |
| """获取刷新 URL""" |
| region = self.credentials.region or "us-east-1" |
| auth_method = (self.credentials.auth_method or "social").lower() |
| |
| if auth_method == "idc": |
| |
| return f"https://oidc.{region}.amazonaws.com/token" |
| else: |
| |
| return f"{KIRO_AUTH_ENDPOINT}/refreshToken" |
| |
| def validate_refresh_token(self) -> Tuple[bool, str]: |
| """验证 refresh_token 有效性""" |
| refresh_token = self.credentials.refresh_token |
| |
| if not refresh_token: |
| return False, "缺少 refresh_token" |
| |
| if len(refresh_token.strip()) == 0: |
| return False, "refresh_token 为空" |
| |
| if len(refresh_token) < 100 or refresh_token.endswith("..."): |
| return False, f"refresh_token 已被截断(长度: {len(refresh_token)})" |
| |
| return True, "" |
| |
| def _get_machine_id(self) -> str: |
| """获取 Machine ID""" |
| return generate_machine_id( |
| self.credentials.profile_arn, |
| self.credentials.client_id |
| ) |
| |
| async def refresh_social_token(self) -> Tuple[bool, str]: |
| """ |
| 刷新 Social Token (Google/GitHub) |
| |
| 参考 Kiro-account-manager 实现: |
| - 端点: https://prod.us-east-1.auth.desktop.kiro.dev/refreshToken |
| - 请求体: {"refreshToken": refresh_token} |
| - 响应: {accessToken, refreshToken, expiresIn} |
| """ |
| refresh_url = f"{KIRO_AUTH_ENDPOINT}/refreshToken" |
| |
| body = {"refreshToken": self.credentials.refresh_token} |
| headers = { |
| "Content-Type": "application/json", |
| "User-Agent": "kiro-proxy/1.0.0", |
| "Accept": "application/json", |
| } |
| |
| try: |
| async with httpx.AsyncClient(verify=False, timeout=30) as client: |
| resp = await client.post(refresh_url, json=body, headers=headers) |
| |
| if resp.status_code != 200: |
| error_text = resp.text |
| if resp.status_code == 401: |
| return False, "凭证已过期或无效,需要重新登录" |
| elif resp.status_code == 429: |
| return False, "请求过于频繁,请稍后重试" |
| else: |
| return False, f"刷新失败: {resp.status_code} - {error_text[:200]}" |
| |
| data = resp.json() |
| |
| new_token = data.get("accessToken") |
| if not new_token: |
| return False, "响应中没有 accessToken" |
| |
| |
| self.credentials.access_token = new_token |
| |
| |
| if rt := data.get("refreshToken"): |
| self.credentials.refresh_token = rt |
| |
| |
| if expires_in := data.get("expiresIn"): |
| expires_at = datetime.now(timezone.utc) + timedelta(seconds=expires_in) |
| self.credentials.expires_at = expires_at.isoformat() |
| |
| self.credentials.last_refresh = datetime.now(timezone.utc).isoformat() |
| |
| print(f"[TokenRefresher] Social token 刷新成功,过期时间: {expires_in}s") |
| return True, new_token |
| |
| except Exception as e: |
| return False, f"刷新异常: {str(e)}" |
| |
| async def refresh_idc_token(self) -> Tuple[bool, str]: |
| """ |
| 刷新 IDC Token (AWS Builder ID) |
| |
| 使用 AWS OIDC 端点刷新 |
| """ |
| region = self.credentials.region or "us-east-1" |
| refresh_url = f"https://oidc.{region}.amazonaws.com/token" |
| |
| if not self.credentials.client_id or not self.credentials.client_secret: |
| return False, "IdC 认证缺少 client_id 或 client_secret" |
| |
| machine_id = self._get_machine_id() |
| kiro_version = get_kiro_version() |
| |
| body = { |
| "refreshToken": self.credentials.refresh_token, |
| "clientId": self.credentials.client_id, |
| "clientSecret": self.credentials.client_secret, |
| "grantType": "refresh_token" |
| } |
| headers = { |
| "Content-Type": "application/json", |
| "x-amz-user-agent": f"aws-sdk-js/3.738.0 KiroIDE-{kiro_version}-{machine_id}", |
| "User-Agent": "node", |
| } |
| |
| try: |
| async with httpx.AsyncClient(verify=False, timeout=30) as client: |
| resp = await client.post(refresh_url, json=body, headers=headers) |
| |
| if resp.status_code != 200: |
| error_text = resp.text |
| if resp.status_code == 401: |
| return False, "凭证已过期或无效,需要重新登录" |
| elif resp.status_code == 429: |
| return False, "请求过于频繁,请稍后重试" |
| else: |
| return False, f"刷新失败: {resp.status_code} - {error_text[:200]}" |
| |
| data = resp.json() |
| |
| new_token = data.get("accessToken") or data.get("access_token") |
| if not new_token: |
| return False, "响应中没有 access_token" |
| |
| |
| self.credentials.access_token = new_token |
| |
| if rt := data.get("refreshToken") or data.get("refresh_token"): |
| self.credentials.refresh_token = rt |
| |
| if arn := data.get("profileArn"): |
| self.credentials.profile_arn = arn |
| |
| if expires_in := data.get("expiresIn") or data.get("expires_in"): |
| expires_at = datetime.now(timezone.utc) + timedelta(seconds=expires_in) |
| self.credentials.expires_at = expires_at.isoformat() |
| |
| self.credentials.last_refresh = datetime.now(timezone.utc).isoformat() |
| |
| print(f"[TokenRefresher] IDC token 刷新成功") |
| return True, new_token |
| |
| except Exception as e: |
| return False, f"刷新异常: {str(e)}" |
| |
| async def refresh(self) -> Tuple[bool, str]: |
| """ |
| 刷新 token,根据 authMethod 分发到正确的刷新方法 |
| |
| Returns: |
| (success, new_token_or_error) |
| """ |
| is_valid, error = self.validate_refresh_token() |
| if not is_valid: |
| return False, error |
| |
| auth_method = (self.credentials.auth_method or "social").lower() |
| |
| if auth_method == "idc": |
| return await self.refresh_idc_token() |
| else: |
| |
| return await self.refresh_social_token() |
|
|