from __future__ import annotations import errno import argparse import json import os import sys import webbrowser from datetime import datetime from .app import create_app from .config import CLIENT_ID_DEFAULT from .limits import RateLimitWindow, compute_reset_at, load_rate_limit_snapshot from .oauth import OAuthHTTPServer, OAuthHandler, REQUIRED_PORT, URL_BASE from .utils import eprint, get_home_dir, load_chatgpt_tokens, parse_jwt_claims, read_auth_file _STATUS_LIMIT_BAR_SEGMENTS = 30 _STATUS_LIMIT_BAR_FILLED = "β–ˆ" _STATUS_LIMIT_BAR_EMPTY = "β–‘" _STATUS_LIMIT_BAR_PARTIAL = "β–“" def _clamp_percent(value: float) -> float: try: percent = float(value) except Exception: return 0.0 if percent != percent: return 0.0 if percent < 0.0: return 0.0 if percent > 100.0: return 100.0 return percent def _render_progress_bar(percent_used: float) -> str: ratio = max(0.0, min(1.0, percent_used / 100.0)) filled_exact = ratio * _STATUS_LIMIT_BAR_SEGMENTS filled = int(filled_exact) partial = filled_exact - filled has_partial = partial > 0.5 if has_partial: filled += 1 filled = max(0, min(_STATUS_LIMIT_BAR_SEGMENTS, filled)) empty = _STATUS_LIMIT_BAR_SEGMENTS - filled if has_partial and filled > 0: bar = _STATUS_LIMIT_BAR_FILLED * (filled - 1) + _STATUS_LIMIT_BAR_PARTIAL + _STATUS_LIMIT_BAR_EMPTY * empty else: bar = _STATUS_LIMIT_BAR_FILLED * filled + _STATUS_LIMIT_BAR_EMPTY * empty return f"[{bar}]" def _get_usage_color(percent_used: float) -> str: if percent_used >= 90: return "\033[91m" elif percent_used >= 75: return "\033[93m" elif percent_used >= 50: return "\033[94m" else: return "\033[92m" def _reset_color() -> str: """ANSI reset color code""" return "\033[0m" def _format_window_duration(minutes: int | None) -> str | None: if minutes is None: return None try: total = int(minutes) except Exception: return None if total <= 0: return None minutes = total weeks, remainder = divmod(minutes, 7 * 24 * 60) days, remainder = divmod(remainder, 24 * 60) hours, remainder = divmod(remainder, 60) parts = [] if weeks: parts.append(f"{weeks} week" + ("s" if weeks != 1 else "")) if days: parts.append(f"{days} day" + ("s" if days != 1 else "")) if hours: parts.append(f"{hours} hour" + ("s" if hours != 1 else "")) if remainder: parts.append(f"{remainder} minute" + ("s" if remainder != 1 else "")) if not parts: parts.append(f"{minutes} minute" + ("s" if minutes != 1 else "")) return " ".join(parts) def _format_reset_duration(seconds: int | None) -> str | None: if seconds is None: return None try: value = int(seconds) except Exception: return None if value < 0: value = 0 days, remainder = divmod(value, 86400) hours, remainder = divmod(remainder, 3600) minutes, remainder = divmod(remainder, 60) parts: list[str] = [] if days: parts.append(f"{days}d") if hours: parts.append(f"{hours}h") if minutes: parts.append(f"{minutes}m") if not parts and remainder: parts.append("under 1m") if not parts: parts.append("0m") return " ".join(parts) def _format_local_datetime(dt: datetime) -> str: local = dt.astimezone() tz_name = local.tzname() or "local" return f"{local.strftime('%b %d, %Y %H:%M')} {tz_name}" def _print_usage_limits_block() -> None: stored = load_rate_limit_snapshot() print("πŸ“Š Usage Limits") if stored is None: print(" No usage data available yet. Send a request through ChatMock first.") print() return update_time = _format_local_datetime(stored.captured_at) print(f"Last updated: {update_time}") print() windows: list[tuple[str, str, RateLimitWindow]] = [] if stored.snapshot.primary is not None: windows.append(("⚑", "5 hour limit", stored.snapshot.primary)) if stored.snapshot.secondary is not None: windows.append(("πŸ“…", "Weekly limit", stored.snapshot.secondary)) if not windows: print(" Usage data was captured but no limit windows were provided.") print() return for i, (icon_label, desc, window) in enumerate(windows): if i > 0: print() percent_used = _clamp_percent(window.used_percent) remaining = max(0.0, 100.0 - percent_used) color = _get_usage_color(percent_used) reset = _reset_color() progress = _render_progress_bar(percent_used) usage_text = f"{percent_used:5.1f}% used" remaining_text = f"{remaining:5.1f}% left" print(f"{icon_label} {desc}") print(f"{color}{progress}{reset} {color}{usage_text}{reset} | {remaining_text}") reset_in = _format_reset_duration(window.resets_in_seconds) reset_at = compute_reset_at(stored.captured_at, window) if reset_in and reset_at: reset_at_str = _format_local_datetime(reset_at) print(f" ⏳ Resets in: {reset_in} at {reset_at_str}") elif reset_in: print(f" ⏳ Resets in: {reset_in}") elif reset_at: reset_at_str = _format_local_datetime(reset_at) print(f" ⏳ Resets at: {reset_at_str}") print() def cmd_login(no_browser: bool, verbose: bool) -> int: home_dir = get_home_dir() client_id = CLIENT_ID_DEFAULT if not client_id: eprint("ERROR: No OAuth client id configured. Set CHATGPT_LOCAL_CLIENT_ID.") return 1 try: bind_host = os.getenv("CHATGPT_LOCAL_LOGIN_BIND", "127.0.0.1") httpd = OAuthHTTPServer((bind_host, REQUIRED_PORT), OAuthHandler, home_dir=home_dir, client_id=client_id, verbose=verbose) except OSError as e: eprint(f"ERROR: {e}") if e.errno == errno.EADDRINUSE: return 13 return 1 auth_url = httpd.auth_url() with httpd: eprint(f"Starting local login server on {URL_BASE}") if not no_browser: try: webbrowser.open(auth_url, new=1, autoraise=True) except Exception as e: eprint(f"Failed to open browser: {e}") eprint(f"If your browser did not open, navigate to:\n{auth_url}") def _stdin_paste_worker() -> None: try: eprint( "If the browser can't reach this machine, paste the full redirect URL here and press Enter (or leave blank to keep waiting):" ) line = sys.stdin.readline().strip() if not line: return try: from urllib.parse import urlparse, parse_qs parsed = urlparse(line) params = parse_qs(parsed.query) code = (params.get("code") or [None])[0] state = (params.get("state") or [None])[0] if not code: eprint("Input did not contain an auth code. Ignoring.") return if state and state != httpd.state: eprint("State mismatch. Ignoring pasted URL for safety.") return eprint("Received redirect URL. Completing login without callback…") bundle, _ = httpd.exchange_code(code) if httpd.persist_auth(bundle): httpd.exit_code = 0 eprint("Login successful. Tokens saved.") else: eprint("ERROR: Unable to persist auth file.") httpd.shutdown() except Exception as exc: eprint(f"Failed to process pasted redirect URL: {exc}") except Exception: pass try: import threading threading.Thread(target=_stdin_paste_worker, daemon=True).start() except Exception: pass try: httpd.serve_forever() except KeyboardInterrupt: eprint("\nKeyboard interrupt received, exiting.") return httpd.exit_code def cmd_serve( host: str, port: int, verbose: bool, verbose_obfuscation: bool, reasoning_effort: str, reasoning_summary: str, reasoning_compat: str, fast_mode: bool, debug_model: str | None, expose_reasoning_models: bool, default_web_search: bool, ) -> int: app = create_app( verbose=verbose, verbose_obfuscation=verbose_obfuscation, reasoning_effort=reasoning_effort, reasoning_summary=reasoning_summary, reasoning_compat=reasoning_compat, fast_mode=fast_mode, debug_model=debug_model, expose_reasoning_models=expose_reasoning_models, default_web_search=default_web_search, ) app.run(host=host, use_reloader=False, port=port, threaded=True) return 0 def main() -> None: parser = argparse.ArgumentParser(description="ChatMock: login & OpenAI-compatible proxy") sub = parser.add_subparsers(dest="command", required=True) p_login = sub.add_parser("login", help="Authorize with ChatGPT and store tokens") p_login.add_argument("--no-browser", action="store_true", help="Do not open the browser automatically") p_login.add_argument("--verbose", action="store_true", help="Enable verbose logging") p_serve = sub.add_parser("serve", help="Run local OpenAI-compatible server") p_serve.add_argument("--host", default="127.0.0.1") p_serve.add_argument("--port", type=int, default=8000) p_serve.add_argument("--verbose", action="store_true", help="Enable verbose logging") p_serve.add_argument( "--verbose-obfuscation", action="store_true", help="Also dump raw SSE/obfuscation events (in addition to --verbose request/response logs).", ) p_serve.add_argument( "--debug-model", dest="debug_model", default=os.getenv("CHATGPT_LOCAL_DEBUG_MODEL"), help="Forcibly override requested 'model' with this value", ) p_serve.add_argument( "--fast-mode", action=argparse.BooleanOptionalAction, default=(os.getenv("CHATGPT_LOCAL_FAST_MODE") or "").strip().lower() in ("1", "true", "yes", "on"), help="Enable GPT fast mode by default for supported models; request-level overrides still take precedence.", ) p_serve.add_argument( "--reasoning-effort", choices=["none", "minimal", "low", "medium", "high", "xhigh"], default=os.getenv("CHATGPT_LOCAL_REASONING_EFFORT", "medium").lower(), help="Reasoning effort level for Responses API (default: medium)", ) p_serve.add_argument( "--reasoning-summary", choices=["auto", "concise", "detailed", "none"], default=os.getenv("CHATGPT_LOCAL_REASONING_SUMMARY", "auto").lower(), help="Reasoning summary verbosity (default: auto)", ) p_serve.add_argument( "--reasoning-compat", choices=["legacy", "o3", "think-tags", "current"], default=os.getenv("CHATGPT_LOCAL_REASONING_COMPAT", "think-tags").lower(), help=( "Compatibility mode for exposing reasoning to clients (legacy|o3|think-tags). " "'current' is accepted as an alias for 'legacy'" ), ) p_serve.add_argument( "--expose-reasoning-models", action="store_true", default=(os.getenv("CHATGPT_LOCAL_EXPOSE_REASONING_MODELS") or "").strip().lower() in ("1", "true", "yes", "on"), help=( "Expose GPT-5 family reasoning effort variants (none|minimal|low|medium|high|xhigh where supported) " "as separate models from /v1/models. This allows choosing effort via model selection in compatible UIs." ), ) p_serve.add_argument( "--enable-web-search", action=argparse.BooleanOptionalAction, default=(os.getenv("CHATGPT_LOCAL_ENABLE_WEB_SEARCH") or "").strip().lower() in ("1", "true", "yes", "on"), help=( "Enable default web_search tool when a request omits responses_tools (off by default). " "Also configurable via CHATGPT_LOCAL_ENABLE_WEB_SEARCH." ), ) p_info = sub.add_parser("info", help="Print current stored tokens and derived account id") p_info.add_argument("--json", action="store_true", help="Output raw auth.json contents") args = parser.parse_args() if args.command == "login": sys.exit(cmd_login(no_browser=args.no_browser, verbose=args.verbose)) elif args.command == "serve": sys.exit( cmd_serve( host=args.host, port=args.port, verbose=args.verbose, verbose_obfuscation=args.verbose_obfuscation, reasoning_effort=args.reasoning_effort, reasoning_summary=args.reasoning_summary, reasoning_compat=args.reasoning_compat, fast_mode=args.fast_mode, debug_model=args.debug_model, expose_reasoning_models=args.expose_reasoning_models, default_web_search=args.enable_web_search, ) ) elif args.command == "info": auth = read_auth_file() if getattr(args, "json", False): print(json.dumps(auth or {}, indent=2)) sys.exit(0) access_token, account_id, id_token = load_chatgpt_tokens() if not access_token or not id_token: print("πŸ‘€ Account") print(" β€’ Not signed in") print(" β€’ Run: python3 chatmock.py login") print("") _print_usage_limits_block() sys.exit(0) id_claims = parse_jwt_claims(id_token) or {} access_claims = parse_jwt_claims(access_token) or {} email = id_claims.get("email") or id_claims.get("preferred_username") or "" plan_raw = (access_claims.get("https://api.openai.com/auth") or {}).get("chatgpt_plan_type") or "unknown" plan_map = { "plus": "Plus", "pro": "Pro", "free": "Free", "team": "Team", "enterprise": "Enterprise", } plan = plan_map.get(str(plan_raw).lower(), str(plan_raw).title() if isinstance(plan_raw, str) else "Unknown") print("πŸ‘€ Account") print(" β€’ Signed in with ChatGPT") print(f" β€’ Login: {email}") print(f" β€’ Plan: {plan}") if account_id: print(f" β€’ Account ID: {account_id}") print("") _print_usage_limits_block() sys.exit(0) else: parser.error("Unknown command") if __name__ == "__main__": main()