File size: 15,075 Bytes
35205e8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 | from __future__ import annotations
import errno
import argparse
import json
import os
import sys
import webbrowser
from datetime import datetime
from .app import create_app
from .config import CLIENT_ID_DEFAULT
from .limits import RateLimitWindow, compute_reset_at, load_rate_limit_snapshot
from .oauth import OAuthHTTPServer, OAuthHandler, REQUIRED_PORT, URL_BASE
from .utils import eprint, get_home_dir, load_chatgpt_tokens, parse_jwt_claims, read_auth_file
_STATUS_LIMIT_BAR_SEGMENTS = 30
_STATUS_LIMIT_BAR_FILLED = "█"
_STATUS_LIMIT_BAR_EMPTY = "░"
_STATUS_LIMIT_BAR_PARTIAL = "▓"
def _clamp_percent(value: float) -> float:
try:
percent = float(value)
except Exception:
return 0.0
if percent != percent:
return 0.0
if percent < 0.0:
return 0.0
if percent > 100.0:
return 100.0
return percent
def _render_progress_bar(percent_used: float) -> str:
ratio = max(0.0, min(1.0, percent_used / 100.0))
filled_exact = ratio * _STATUS_LIMIT_BAR_SEGMENTS
filled = int(filled_exact)
partial = filled_exact - filled
has_partial = partial > 0.5
if has_partial:
filled += 1
filled = max(0, min(_STATUS_LIMIT_BAR_SEGMENTS, filled))
empty = _STATUS_LIMIT_BAR_SEGMENTS - filled
if has_partial and filled > 0:
bar = _STATUS_LIMIT_BAR_FILLED * (filled - 1) + _STATUS_LIMIT_BAR_PARTIAL + _STATUS_LIMIT_BAR_EMPTY * empty
else:
bar = _STATUS_LIMIT_BAR_FILLED * filled + _STATUS_LIMIT_BAR_EMPTY * empty
return f"[{bar}]"
def _get_usage_color(percent_used: float) -> str:
if percent_used >= 90:
return "\033[91m"
elif percent_used >= 75:
return "\033[93m"
elif percent_used >= 50:
return "\033[94m"
else:
return "\033[92m"
def _reset_color() -> str:
"""ANSI reset color code"""
return "\033[0m"
def _format_window_duration(minutes: int | None) -> str | None:
if minutes is None:
return None
try:
total = int(minutes)
except Exception:
return None
if total <= 0:
return None
minutes = total
weeks, remainder = divmod(minutes, 7 * 24 * 60)
days, remainder = divmod(remainder, 24 * 60)
hours, remainder = divmod(remainder, 60)
parts = []
if weeks:
parts.append(f"{weeks} week" + ("s" if weeks != 1 else ""))
if days:
parts.append(f"{days} day" + ("s" if days != 1 else ""))
if hours:
parts.append(f"{hours} hour" + ("s" if hours != 1 else ""))
if remainder:
parts.append(f"{remainder} minute" + ("s" if remainder != 1 else ""))
if not parts:
parts.append(f"{minutes} minute" + ("s" if minutes != 1 else ""))
return " ".join(parts)
def _format_reset_duration(seconds: int | None) -> str | None:
if seconds is None:
return None
try:
value = int(seconds)
except Exception:
return None
if value < 0:
value = 0
days, remainder = divmod(value, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, remainder = divmod(remainder, 60)
parts: list[str] = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
if not parts and remainder:
parts.append("under 1m")
if not parts:
parts.append("0m")
return " ".join(parts)
def _format_local_datetime(dt: datetime) -> str:
local = dt.astimezone()
tz_name = local.tzname() or "local"
return f"{local.strftime('%b %d, %Y %H:%M')} {tz_name}"
def _print_usage_limits_block() -> None:
stored = load_rate_limit_snapshot()
print("📊 Usage Limits")
if stored is None:
print(" No usage data available yet. Send a request through ChatMock first.")
print()
return
update_time = _format_local_datetime(stored.captured_at)
print(f"Last updated: {update_time}")
print()
windows: list[tuple[str, str, RateLimitWindow]] = []
if stored.snapshot.primary is not None:
windows.append(("⚡", "5 hour limit", stored.snapshot.primary))
if stored.snapshot.secondary is not None:
windows.append(("📅", "Weekly limit", stored.snapshot.secondary))
if not windows:
print(" Usage data was captured but no limit windows were provided.")
print()
return
for i, (icon_label, desc, window) in enumerate(windows):
if i > 0:
print()
percent_used = _clamp_percent(window.used_percent)
remaining = max(0.0, 100.0 - percent_used)
color = _get_usage_color(percent_used)
reset = _reset_color()
progress = _render_progress_bar(percent_used)
usage_text = f"{percent_used:5.1f}% used"
remaining_text = f"{remaining:5.1f}% left"
print(f"{icon_label} {desc}")
print(f"{color}{progress}{reset} {color}{usage_text}{reset} | {remaining_text}")
reset_in = _format_reset_duration(window.resets_in_seconds)
reset_at = compute_reset_at(stored.captured_at, window)
if reset_in and reset_at:
reset_at_str = _format_local_datetime(reset_at)
print(f" ⏳ Resets in: {reset_in} at {reset_at_str}")
elif reset_in:
print(f" ⏳ Resets in: {reset_in}")
elif reset_at:
reset_at_str = _format_local_datetime(reset_at)
print(f" ⏳ Resets at: {reset_at_str}")
print()
def cmd_login(no_browser: bool, verbose: bool) -> int:
home_dir = get_home_dir()
client_id = CLIENT_ID_DEFAULT
if not client_id:
eprint("ERROR: No OAuth client id configured. Set CHATGPT_LOCAL_CLIENT_ID.")
return 1
try:
bind_host = os.getenv("CHATGPT_LOCAL_LOGIN_BIND", "127.0.0.1")
httpd = OAuthHTTPServer((bind_host, REQUIRED_PORT), OAuthHandler, home_dir=home_dir, client_id=client_id, verbose=verbose)
except OSError as e:
eprint(f"ERROR: {e}")
if e.errno == errno.EADDRINUSE:
return 13
return 1
auth_url = httpd.auth_url()
with httpd:
eprint(f"Starting local login server on {URL_BASE}")
if not no_browser:
try:
webbrowser.open(auth_url, new=1, autoraise=True)
except Exception as e:
eprint(f"Failed to open browser: {e}")
eprint(f"If your browser did not open, navigate to:\n{auth_url}")
def _stdin_paste_worker() -> None:
try:
eprint(
"If the browser can't reach this machine, paste the full redirect URL here and press Enter (or leave blank to keep waiting):"
)
line = sys.stdin.readline().strip()
if not line:
return
try:
from urllib.parse import urlparse, parse_qs
parsed = urlparse(line)
params = parse_qs(parsed.query)
code = (params.get("code") or [None])[0]
state = (params.get("state") or [None])[0]
if not code:
eprint("Input did not contain an auth code. Ignoring.")
return
if state and state != httpd.state:
eprint("State mismatch. Ignoring pasted URL for safety.")
return
eprint("Received redirect URL. Completing login without callback…")
bundle, _ = httpd.exchange_code(code)
if httpd.persist_auth(bundle):
httpd.exit_code = 0
eprint("Login successful. Tokens saved.")
else:
eprint("ERROR: Unable to persist auth file.")
httpd.shutdown()
except Exception as exc:
eprint(f"Failed to process pasted redirect URL: {exc}")
except Exception:
pass
try:
import threading
threading.Thread(target=_stdin_paste_worker, daemon=True).start()
except Exception:
pass
try:
httpd.serve_forever()
except KeyboardInterrupt:
eprint("\nKeyboard interrupt received, exiting.")
return httpd.exit_code
def cmd_serve(
host: str,
port: int,
verbose: bool,
verbose_obfuscation: bool,
reasoning_effort: str,
reasoning_summary: str,
reasoning_compat: str,
fast_mode: bool,
debug_model: str | None,
expose_reasoning_models: bool,
default_web_search: bool,
) -> int:
app = create_app(
verbose=verbose,
verbose_obfuscation=verbose_obfuscation,
reasoning_effort=reasoning_effort,
reasoning_summary=reasoning_summary,
reasoning_compat=reasoning_compat,
fast_mode=fast_mode,
debug_model=debug_model,
expose_reasoning_models=expose_reasoning_models,
default_web_search=default_web_search,
)
app.run(host=host, use_reloader=False, port=port, threaded=True)
return 0
def main() -> None:
parser = argparse.ArgumentParser(description="ChatMock: login & OpenAI-compatible proxy")
sub = parser.add_subparsers(dest="command", required=True)
p_login = sub.add_parser("login", help="Authorize with ChatGPT and store tokens")
p_login.add_argument("--no-browser", action="store_true", help="Do not open the browser automatically")
p_login.add_argument("--verbose", action="store_true", help="Enable verbose logging")
p_serve = sub.add_parser("serve", help="Run local OpenAI-compatible server")
p_serve.add_argument("--host", default="127.0.0.1")
p_serve.add_argument("--port", type=int, default=8000)
p_serve.add_argument("--verbose", action="store_true", help="Enable verbose logging")
p_serve.add_argument(
"--verbose-obfuscation",
action="store_true",
help="Also dump raw SSE/obfuscation events (in addition to --verbose request/response logs).",
)
p_serve.add_argument(
"--debug-model",
dest="debug_model",
default=os.getenv("CHATGPT_LOCAL_DEBUG_MODEL"),
help="Forcibly override requested 'model' with this value",
)
p_serve.add_argument(
"--fast-mode",
action=argparse.BooleanOptionalAction,
default=(os.getenv("CHATGPT_LOCAL_FAST_MODE") or "").strip().lower() in ("1", "true", "yes", "on"),
help="Enable GPT fast mode by default for supported models; request-level overrides still take precedence.",
)
p_serve.add_argument(
"--reasoning-effort",
choices=["none", "minimal", "low", "medium", "high", "xhigh"],
default=os.getenv("CHATGPT_LOCAL_REASONING_EFFORT", "medium").lower(),
help="Reasoning effort level for Responses API (default: medium)",
)
p_serve.add_argument(
"--reasoning-summary",
choices=["auto", "concise", "detailed", "none"],
default=os.getenv("CHATGPT_LOCAL_REASONING_SUMMARY", "auto").lower(),
help="Reasoning summary verbosity (default: auto)",
)
p_serve.add_argument(
"--reasoning-compat",
choices=["legacy", "o3", "think-tags", "current"],
default=os.getenv("CHATGPT_LOCAL_REASONING_COMPAT", "think-tags").lower(),
help=(
"Compatibility mode for exposing reasoning to clients (legacy|o3|think-tags). "
"'current' is accepted as an alias for 'legacy'"
),
)
p_serve.add_argument(
"--expose-reasoning-models",
action="store_true",
default=(os.getenv("CHATGPT_LOCAL_EXPOSE_REASONING_MODELS") or "").strip().lower() in ("1", "true", "yes", "on"),
help=(
"Expose GPT-5 family reasoning effort variants (none|minimal|low|medium|high|xhigh where supported) "
"as separate models from /v1/models. This allows choosing effort via model selection in compatible UIs."
),
)
p_serve.add_argument(
"--enable-web-search",
action=argparse.BooleanOptionalAction,
default=(os.getenv("CHATGPT_LOCAL_ENABLE_WEB_SEARCH") or "").strip().lower() in ("1", "true", "yes", "on"),
help=(
"Enable default web_search tool when a request omits responses_tools (off by default). "
"Also configurable via CHATGPT_LOCAL_ENABLE_WEB_SEARCH."
),
)
p_info = sub.add_parser("info", help="Print current stored tokens and derived account id")
p_info.add_argument("--json", action="store_true", help="Output raw auth.json contents")
args = parser.parse_args()
if args.command == "login":
sys.exit(cmd_login(no_browser=args.no_browser, verbose=args.verbose))
elif args.command == "serve":
sys.exit(
cmd_serve(
host=args.host,
port=args.port,
verbose=args.verbose,
verbose_obfuscation=args.verbose_obfuscation,
reasoning_effort=args.reasoning_effort,
reasoning_summary=args.reasoning_summary,
reasoning_compat=args.reasoning_compat,
fast_mode=args.fast_mode,
debug_model=args.debug_model,
expose_reasoning_models=args.expose_reasoning_models,
default_web_search=args.enable_web_search,
)
)
elif args.command == "info":
auth = read_auth_file()
if getattr(args, "json", False):
print(json.dumps(auth or {}, indent=2))
sys.exit(0)
access_token, account_id, id_token = load_chatgpt_tokens()
if not access_token or not id_token:
print("👤 Account")
print(" • Not signed in")
print(" • Run: python3 chatmock.py login")
print("")
_print_usage_limits_block()
sys.exit(0)
id_claims = parse_jwt_claims(id_token) or {}
access_claims = parse_jwt_claims(access_token) or {}
email = id_claims.get("email") or id_claims.get("preferred_username") or "<unknown>"
plan_raw = (access_claims.get("https://api.openai.com/auth") or {}).get("chatgpt_plan_type") or "unknown"
plan_map = {
"plus": "Plus",
"pro": "Pro",
"free": "Free",
"team": "Team",
"enterprise": "Enterprise",
}
plan = plan_map.get(str(plan_raw).lower(), str(plan_raw).title() if isinstance(plan_raw, str) else "Unknown")
print("👤 Account")
print(" • Signed in with ChatGPT")
print(f" • Login: {email}")
print(f" • Plan: {plan}")
if account_id:
print(f" • Account ID: {account_id}")
print("")
_print_usage_limits_block()
sys.exit(0)
else:
parser.error("Unknown command")
if __name__ == "__main__":
main()
|