#!/usr/bin/env python3 """Kiro Proxy CLI - 轻量命令行工具""" import argparse import asyncio import json import sys from pathlib import Path from . import __version__ def cmd_serve(args): """启动代理服务""" from .main import run run(port=args.port) def cmd_accounts_list(args): """列出所有账号""" from .core import state accounts = state.get_accounts_status() if not accounts: print("暂无账号") return print(f"{'ID':<10} {'名称':<20} {'状态':<10} {'请求数':<8}") print("-" * 50) for acc in accounts: print(f"{acc['id']:<10} {acc['name']:<20} {acc['status']:<10} {acc['request_count']:<8}") def cmd_accounts_export(args): """导出账号配置""" from .core import state accounts_data = [] for acc in state.accounts: creds = acc.get_credentials() if creds: accounts_data.append({ "name": acc.name, "enabled": acc.enabled, "credentials": { "accessToken": creds.access_token, "refreshToken": creds.refresh_token, "expiresAt": creds.expires_at, "region": creds.region, "authMethod": creds.auth_method, } }) output = {"accounts": accounts_data, "version": "1.0"} if args.output: Path(args.output).write_text(json.dumps(output, indent=2, ensure_ascii=False)) print(f"已导出 {len(accounts_data)} 个账号到 {args.output}") else: print(json.dumps(output, indent=2, ensure_ascii=False)) def cmd_accounts_import(args): """导入账号配置""" import uuid from .core import state, Account from .auth import save_credentials_to_file data = json.loads(Path(args.file).read_text()) accounts_data = data.get("accounts", []) imported = 0 for acc_data in accounts_data: creds = acc_data.get("credentials", {}) if not creds.get("accessToken"): print(f"跳过 {acc_data.get('name', '未知')}: 缺少 accessToken") continue # 保存凭证到文件 file_path = asyncio.run(save_credentials_to_file({ "accessToken": creds.get("accessToken"), "refreshToken": creds.get("refreshToken"), "expiresAt": creds.get("expiresAt"), "region": creds.get("region", "us-east-1"), "authMethod": creds.get("authMethod", "social"), }, f"imported-{uuid.uuid4().hex[:8]}")) account = Account( id=uuid.uuid4().hex[:8], name=acc_data.get("name", "导入账号"), token_path=file_path, enabled=acc_data.get("enabled", True) ) state.accounts.append(account) account.load_credentials() imported += 1 print(f"已导入: {account.name}") state._save_accounts() print(f"\n共导入 {imported} 个账号") def cmd_accounts_add(args): """手动添加 Token""" import uuid from .core import state, Account from .auth import save_credentials_to_file print("手动添加 Kiro 账号") print("-" * 40) name = input("账号名称 [我的账号]: ").strip() or "我的账号" print("\n请粘贴 Access Token:") access_token = input().strip() if not access_token: print("错误: Access Token 不能为空") return print("\n请粘贴 Refresh Token (可选,直接回车跳过):") refresh_token = input().strip() or None # 保存凭证 file_path = asyncio.run(save_credentials_to_file({ "accessToken": access_token, "refreshToken": refresh_token, "region": "us-east-1", "authMethod": "social", }, f"manual-{uuid.uuid4().hex[:8]}")) account = Account( id=uuid.uuid4().hex[:8], name=name, token_path=file_path ) state.accounts.append(account) account.load_credentials() state._save_accounts() print(f"\n✅ 账号已添加: {name} (ID: {account.id})") def cmd_accounts_scan(args): """扫描本地 Token""" import uuid from .core import state, Account from .config import TOKEN_DIR # 扫描新目录 found = [] if TOKEN_DIR.exists(): for f in TOKEN_DIR.glob("*.json"): try: data = json.loads(f.read_text()) if "accessToken" in data: already = any(a.token_path == str(f) for a in state.accounts) found.append({"path": str(f), "name": f.stem, "already": already}) except: pass # 兼容旧目录 sso_cache = Path.home() / ".aws/sso/cache" if sso_cache.exists(): for f in sso_cache.glob("*.json"): try: data = json.loads(f.read_text()) if "accessToken" in data: already = any(a.token_path == str(f) for a in state.accounts) found.append({"path": str(f), "name": f.stem + " (旧目录)", "already": already}) except: pass if not found: print("未找到 Token 文件") print(f"Token 目录: {TOKEN_DIR}") return print(f"找到 {len(found)} 个 Token:\n") for i, t in enumerate(found): status = "[已添加]" if t["already"] else "" print(f" {i+1}. {t['name']} {status}") if args.auto: # 自动添加所有未添加的 added = 0 for t in found: if not t["already"]: account = Account( id=uuid.uuid4().hex[:8], name=t["name"], token_path=t["path"] ) state.accounts.append(account) account.load_credentials() added += 1 state._save_accounts() print(f"\n已添加 {added} 个账号") else: print("\n使用 --auto 自动添加所有未添加的账号") def cmd_login_remote(args): """生成远程登录链接""" import uuid import time session_id = uuid.uuid4().hex host = args.host or "localhost:8080" scheme = "https" if args.https else "http" print("远程登录链接") print("-" * 40) print(f"\n将以下链接发送到有浏览器的机器上完成登录:\n") print(f" {scheme}://{host}/remote-login/{session_id}") print(f"\n链接有效期 10 分钟") print("\n登录完成后,在那台机器上导出账号,然后在这里导入:") print(f" python -m kiro_proxy accounts import xxx.json") def cmd_login_social(args): """Social 登录 (Google/GitHub)""" from .auth import start_social_auth provider = args.provider print(f"启动 {provider.title()} 登录...") success, result = asyncio.run(start_social_auth(provider)) if not success: print(f"错误: {result.get('error', '未知错误')}") return print(f"\n请在浏览器中打开以下链接完成授权:\n") print(f" {result['login_url']}") print(f"\n授权完成后,将浏览器地址栏中的完整 URL 粘贴到这里:") callback_url = input().strip() if not callback_url: print("已取消") return try: from urllib.parse import urlparse, parse_qs parsed = urlparse(callback_url) params = parse_qs(parsed.query) code = params.get("code", [None])[0] oauth_state = params.get("state", [None])[0] if not code or not oauth_state: print("错误: 无效的回调 URL") return from .auth import exchange_social_auth_token success, result = asyncio.run(exchange_social_auth_token(code, oauth_state)) if success and result.get("completed"): import uuid from .core import state, Account from .auth import save_credentials_to_file credentials = result["credentials"] file_path = asyncio.run(save_credentials_to_file( credentials, f"cli-{provider}" )) account = Account( id=uuid.uuid4().hex[:8], name=f"{provider.title()} 登录", token_path=file_path ) state.accounts.append(account) account.load_credentials() state._save_accounts() print(f"\n✅ 登录成功! 账号已添加: {account.name}") else: print(f"错误: {result.get('error', '登录失败')}") except Exception as e: print(f"错误: {e}") def cmd_status(args): """查看服务状态""" from .core import state stats = state.get_stats() print("Kiro Proxy 状态") print("-" * 40) print(f"运行时间: {stats['uptime_seconds']} 秒") print(f"总请求数: {stats['total_requests']}") print(f"错误数: {stats['total_errors']}") print(f"错误率: {stats['error_rate']}") print(f"账号总数: {stats['accounts_total']}") print(f"可用账号: {stats['accounts_available']}") print(f"冷却中: {stats['accounts_cooldown']}") def main(): parser = argparse.ArgumentParser( prog="kiro-proxy", description="Kiro API Proxy CLI" ) parser.add_argument("-v", "--version", action="version", version=__version__) subparsers = parser.add_subparsers(dest="command", help="命令") # serve serve_parser = subparsers.add_parser("serve", help="启动代理服务") serve_parser.add_argument("-p", "--port", type=int, default=8080, help="端口号") serve_parser.set_defaults(func=cmd_serve) # status status_parser = subparsers.add_parser("status", help="查看状态") status_parser.set_defaults(func=cmd_status) # accounts accounts_parser = subparsers.add_parser("accounts", help="账号管理") accounts_sub = accounts_parser.add_subparsers(dest="accounts_cmd") # accounts list list_parser = accounts_sub.add_parser("list", help="列出账号") list_parser.set_defaults(func=cmd_accounts_list) # accounts export export_parser = accounts_sub.add_parser("export", help="导出账号") export_parser.add_argument("-o", "--output", help="输出文件") export_parser.set_defaults(func=cmd_accounts_export) # accounts import import_parser = accounts_sub.add_parser("import", help="导入账号") import_parser.add_argument("file", help="JSON 文件路径") import_parser.set_defaults(func=cmd_accounts_import) # accounts add add_parser = accounts_sub.add_parser("add", help="手动添加 Token") add_parser.set_defaults(func=cmd_accounts_add) # accounts scan scan_parser = accounts_sub.add_parser("scan", help="扫描本地 Token") scan_parser.add_argument("--auto", action="store_true", help="自动添加") scan_parser.set_defaults(func=cmd_accounts_scan) # login login_parser = subparsers.add_parser("login", help="登录") login_sub = login_parser.add_subparsers(dest="login_cmd") # login remote remote_parser = login_sub.add_parser("remote", help="生成远程登录链接") remote_parser.add_argument("--host", help="服务器地址 (如 example.com:8080)") remote_parser.add_argument("--https", action="store_true", help="使用 HTTPS") remote_parser.set_defaults(func=cmd_login_remote) # login google google_parser = login_sub.add_parser("google", help="Google 登录") google_parser.set_defaults(func=cmd_login_social, provider="google") # login github github_parser = login_sub.add_parser("github", help="GitHub 登录") github_parser.set_defaults(func=cmd_login_social, provider="github") args = parser.parse_args() if not args.command: parser.print_help() return if args.command == "accounts" and not args.accounts_cmd: accounts_parser.print_help() return if args.command == "login" and not args.login_cmd: login_parser.print_help() return if hasattr(args, "func"): args.func(args) if __name__ == "__main__": main()