| |
| """Periodic cleanup of invalid/low-quota free codex auths. |
| |
| Runs inside the HF Space container, calls CLIProxyAPI on 127.0.0.1 β |
| no HF rate-limiter in the path. |
| |
| Usage: |
| python3 cleanup_invalid_auths.py [--dry-run] |
| |
| Environment variables: |
| MANAGEMENT_PASSWORD β management API key (required) |
| PORT β gateway port (default 8317) |
| CLEANUP_PROBE_WORKERS β concurrent probe workers (default 8) |
| CLEANUP_MAX_ACTIVE_PROBES β max entries to probe (default 120, 0=unlimited) |
| CLEANUP_LOW_QUOTA_PERCENT β weekly quota threshold % (default 10) |
| OBJECTSTORE_ENDPOINT β objectstore endpoint (for deleting synced copies) |
| OBJECTSTORE_BUCKET β objectstore bucket |
| OBJECTSTORE_ACCESS_KEY β objectstore access key |
| OBJECTSTORE_SECRET_KEY β objectstore secret key |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import os |
| import subprocess |
| import sys |
| import time |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any |
| from urllib.parse import quote |
| from urllib.request import Request, urlopen |
| from urllib.error import HTTPError, URLError |
|
|
|
|
| MGMT_KEY = os.environ.get("MANAGEMENT_PASSWORD") or os.environ.get("API_KEY") or "1111" |
| MGMT_PORT = os.environ.get("PORT", "8317") |
| MGMT_BASE = f"http://127.0.0.1:{MGMT_PORT}/v0/management" |
|
|
| PROBE_WORKERS = int(os.environ.get("CLEANUP_PROBE_WORKERS", "8")) |
| MAX_ACTIVE_PROBES = int(os.environ.get("CLEANUP_MAX_ACTIVE_PROBES", "500")) |
| LOW_QUOTA_PERCENT = float(os.environ.get("CLEANUP_LOW_QUOTA_PERCENT", "10")) |
|
|
| |
| OBJECTSTORE_ENDPOINT = os.environ.get("OBJECTSTORE_ENDPOINT", "").strip() |
| OBJECTSTORE_BUCKET = os.environ.get("OBJECTSTORE_BUCKET", "").strip() |
| OBJECTSTORE_ACCESS_KEY = os.environ.get("OBJECTSTORE_ACCESS_KEY", "").strip() |
| OBJECTSTORE_SECRET_KEY = os.environ.get("OBJECTSTORE_SECRET_KEY", "").strip() |
| OBJECTSTORE_ALIAS = os.environ.get("OBJECTSTORE_ALIAS", "daili-objectstore").strip() |
| OBJECTSTORE_LOCAL_BASE = os.environ.get("OBJECTSTORE_LOCAL_PATH", os.environ.get("WRITABLE_PATH", "/tmp")).strip() |
| MC_BIN = os.environ.get("MC_BIN", "mc") |
| MC_CONFIG_DIR = os.environ.get("MC_CONFIG_DIR", f"{OBJECTSTORE_LOCAL_BASE}/.mc") |
|
|
| WHAM_USAGE_URL = "https://chatgpt.com/backend-api/wham/usage" |
| WHAM_HEADERS = { |
| "Authorization": "Bearer $TOKEN$", |
| "Content-Type": "application/json", |
| "User-Agent": "codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal", |
| } |
|
|
| |
| MESSAGE_KEYWORDS = [ |
| "authentication token has been invalidated", |
| "invalidated oauth token", |
| "token_invalidated", |
| "token_revoked", |
| "account has been deactivated", |
| "no_organization", |
| "must be a member of an organization", |
| ] |
|
|
|
|
| def _now() -> str: |
| return datetime.now(timezone.utc).strftime("%H:%M:%S") |
|
|
|
|
| def _mgmt_request(method: str, path: str, data: bytes | None = None, timeout: int = 15) -> tuple[int, Any]: |
| url = f"{MGMT_BASE}{path}" |
| req = Request(url, data=data, method=method) |
| req.add_header("Authorization", f"Bearer {MGMT_KEY}") |
| req.add_header("Content-Type", "application/json") |
| try: |
| with urlopen(req, timeout=timeout) as resp: |
| body = json.loads(resp.read()) |
| return resp.status, body |
| except HTTPError as e: |
| body = "" |
| try: |
| body = e.read().decode() |
| except Exception: |
| pass |
| return e.code, body |
| except (URLError, OSError) as e: |
| return 0, str(e) |
|
|
|
|
| def list_auth_files() -> list[dict[str, Any]]: |
| status, body = _mgmt_request("GET", "/auth-files") |
| if status != 200 or not isinstance(body, dict): |
| raise RuntimeError(f"list auth-files failed: status={status}") |
| files = body.get("files", []) |
| return [f for f in files if isinstance(f, dict)] |
|
|
|
|
| def delete_auth_file(name: str) -> bool: |
| encoded = quote(name, safe="") |
| status, _ = _mgmt_request("DELETE", f"/auth-files?name={encoded}") |
| return 200 <= status < 300 |
|
|
|
|
| def _objectstore_enabled() -> bool: |
| return bool(OBJECTSTORE_ENDPOINT and OBJECTSTORE_BUCKET and OBJECTSTORE_ACCESS_KEY and OBJECTSTORE_SECRET_KEY) |
|
|
|
|
| def _mc_env() -> dict[str, str]: |
| env = os.environ.copy() |
| for key in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"): |
| env.pop(key, None) |
| env["MC_CONFIG_DIR"] = MC_CONFIG_DIR |
| return env |
|
|
|
|
| def _delete_from_objectstore(name: str) -> bool: |
| """Delete auth file from objectstore so sync doesn't pull it back.""" |
| if not _objectstore_enabled(): |
| return False |
| remote = f"{OBJECTSTORE_ALIAS}/{OBJECTSTORE_BUCKET}/auths/{name}" |
| try: |
| subprocess.run( |
| [MC_BIN, "rm", "--force", remote], |
| env=_mc_env(), capture_output=True, text=True, timeout=15, |
| ) |
| return True |
| except Exception: |
| return False |
|
|
|
|
| def _delete_local_auth_file(name: str) -> None: |
| """Delete local objectstore mirror copy.""" |
| local_path = Path(OBJECTSTORE_LOCAL_BASE) / "objectstore" / "auths" / name |
| if local_path.is_file(): |
| local_path.unlink(missing_ok=True) |
|
|
|
|
| def delete_auth_fully(name: str) -> bool: |
| """Delete auth from management API + objectstore + local mirror.""" |
| ok = delete_auth_file(name) |
| if ok: |
| _delete_from_objectstore(name) |
| _delete_local_auth_file(name) |
| return ok |
|
|
|
|
| def api_call(payload: dict[str, Any], timeout: int = 10) -> dict[str, Any] | None: |
| data = json.dumps(payload, ensure_ascii=False).encode() |
| status, body = _mgmt_request("POST", "/api-call", data=data, timeout=timeout) |
| if status != 200 or not isinstance(body, dict): |
| return None |
| return body |
|
|
|
|
| def _is_free_auth(name: str) -> bool: |
| return name.strip().endswith("-free.json") |
|
|
|
|
| def _status_reason(entry: dict[str, Any]) -> str: |
| """Check status_message for known invalid keywords (no HTTP needed).""" |
| name = str(entry.get("name") or "").strip() |
| if not _is_free_auth(name): |
| return "" |
| status_message = str(entry.get("status_message") or "").strip().lower() |
| if not status_message: |
| return "" |
| for kw in MESSAGE_KEYWORDS: |
| if kw in status_message: |
| return f"status_message={kw}" |
| |
| try: |
| parsed = json.loads(status_message) |
| except ValueError: |
| return "" |
| if isinstance(parsed, dict): |
| err = parsed.get("error", {}) |
| if isinstance(err, dict): |
| code = str(err.get("code") or "") |
| if code in ("token_invalidated", "token_revoked", "usage_limit_reached"): |
| return f"status_message_json={code}" |
| plan_type = str(err.get("plan_type") or "").lower() |
| err_type = str(err.get("type") or "").lower() |
| if err_type == "usage_limit_reached" and (not plan_type or plan_type == "free"): |
| return "status_message=usage_limit_reached_free" |
| return "" |
|
|
|
|
| def _low_quota_reason_from_entry(entry: dict[str, Any]) -> str: |
| """Check server-side quota fields.""" |
| for field in ("weekly_remaining_percent", "weekly_limit_remaining_percent", |
| "weekly_quota_remaining_percent", "weekly_percent_remaining"): |
| val = entry.get(field) |
| if val is not None: |
| try: |
| pct = float(val) |
| if pct < LOW_QUOTA_PERCENT: |
| return f"{field}<{LOW_QUOTA_PERCENT:g}%" |
| except (TypeError, ValueError): |
| pass |
| return "" |
|
|
|
|
| def _probe_wham_usage(entry: dict[str, Any]) -> str: |
| """Probe wham/usage via local api-call proxy. Returns reason string or empty.""" |
| auth_index = str(entry.get("auth_index") or "").strip() |
| if not auth_index: |
| return "" |
|
|
| account_id = "" |
| id_token = entry.get("id_token") |
| if isinstance(id_token, dict): |
| account_id = str(id_token.get("chatgpt_account_id") or "").strip() |
| if not account_id: |
| account_id = str(entry.get("account_id") or "").strip() |
|
|
| headers = dict(WHAM_HEADERS) |
| if account_id: |
| headers["Chatgpt-Account-Id"] = account_id |
|
|
| response = api_call({ |
| "auth_index": auth_index, |
| "method": "GET", |
| "url": WHAM_USAGE_URL, |
| "header": headers, |
| }) |
| if not isinstance(response, dict): |
| return "" |
|
|
| status_code = 0 |
| try: |
| status_code = int(response.get("status_code", 0) or 0) |
| except (TypeError, ValueError): |
| return "" |
|
|
| body = str(response.get("body") or "").lower() |
|
|
| |
| if status_code == 200: |
| try: |
| payload = json.loads(response.get("body") or "{}") |
| except ValueError: |
| return "" |
| if not isinstance(payload, dict): |
| return "" |
| return _low_quota_from_wham(payload) |
|
|
| |
| if status_code == 401: |
| if "token has been invalidated" in body: |
| return "probe=token_invalidated" |
| if "invalidated oauth token" in body: |
| return "probe=token_revoked" |
| if "account has been deactivated" in body: |
| return "probe=account_deactivated" |
| return "probe=401" |
|
|
| return "" |
|
|
|
|
| def _parse_percent_value(val: Any) -> float | None: |
| if val is None: |
| return None |
| try: |
| return float(val) |
| except (TypeError, ValueError): |
| return None |
|
|
|
|
| def _remaining_percent_from_window(window: Any) -> float | None: |
| if not isinstance(window, dict): |
| return None |
| for field in ("remaining_percent", "left_percent", "available_percent"): |
| pct = _parse_percent_value(window.get(field)) |
| if pct is not None: |
| return pct |
| used = _parse_percent_value(window.get("used_percent")) |
| if used is not None: |
| return max(0.0, 100.0 - used) |
| return None |
|
|
|
|
| def _low_quota_from_wham(payload: dict[str, Any]) -> str: |
| """Check wham/usage response for low weekly quota β mirrors reg_openai.py logic.""" |
| plan_type = str(payload.get("plan_type") or "").strip().lower() |
| if plan_type and plan_type != "free": |
| return "" |
|
|
| rate_limit = payload.get("rate_limit") |
| if not isinstance(rate_limit, dict): |
| return "" |
| remaining = _remaining_percent_from_window(rate_limit.get("primary_window")) |
| if remaining is None: |
| return "" |
| if remaining < LOW_QUOTA_PERCENT: |
| return f"probe_wham_weekly_remaining<{LOW_QUOTA_PERCENT:g}%" |
| return "" |
|
|
|
|
| def run_cleanup(dry_run: bool = False) -> dict[str, Any]: |
| started = time.monotonic() |
| print(f"[{_now()}] cleanup start", flush=True) |
|
|
| files = list_auth_files() |
| total = len(files) |
| print(f"[{_now()}] auth-files total={total}", flush=True) |
|
|
| |
| fast_hits: dict[str, str] = {} |
| probe_candidates: list[dict[str, Any]] = [] |
|
|
| for entry in files: |
| name = str(entry.get("name") or "").strip() |
| if not name or not _is_free_auth(name): |
| continue |
|
|
| reason = _status_reason(entry) |
| if not reason: |
| reason = _low_quota_reason_from_entry(entry) |
| if reason: |
| fast_hits[name] = reason |
| continue |
| probe_candidates.append(entry) |
|
|
| print(f"[{_now()}] fast_hits={len(fast_hits)}, probe_candidates={len(probe_candidates)}", flush=True) |
|
|
| |
| probe_hits: dict[str, str] = {} |
| if probe_candidates: |
| if MAX_ACTIVE_PROBES > 0 and len(probe_candidates) > MAX_ACTIVE_PROBES: |
| print(f"[{_now()}] truncating probe candidates from {len(probe_candidates)} to {MAX_ACTIVE_PROBES}", flush=True) |
| probe_candidates = probe_candidates[:MAX_ACTIVE_PROBES] |
|
|
| workers = min(PROBE_WORKERS, len(probe_candidates)) |
| print(f"[{_now()}] probing {len(probe_candidates)} entries with {workers} workers", flush=True) |
|
|
| with ThreadPoolExecutor(max_workers=workers) as pool: |
| futures = { |
| pool.submit(_probe_wham_usage, entry): str(entry.get("name") or "").strip() |
| for entry in probe_candidates |
| } |
| done = 0 |
| for future in as_completed(futures): |
| done += 1 |
| name = futures[future] |
| try: |
| reason = future.result() |
| except Exception as exc: |
| print(f"[{_now()}] probe error: {name}: {exc}", flush=True) |
| continue |
| if reason: |
| probe_hits[name] = reason |
| if done % 20 == 0 or done == len(futures): |
| print(f"[{_now()}] probe progress: {done}/{len(futures)}, hits={len(probe_hits)}", flush=True) |
|
|
| |
| all_hits = {**fast_hits, **probe_hits} |
| print(f"[{_now()}] total hits to delete: {len(all_hits)}", flush=True) |
|
|
| deleted = 0 |
| failures: list[str] = [] |
| for name, reason in all_hits.items(): |
| if dry_run: |
| print(f"[{_now()}] [dry-run] would delete: {name} ({reason})", flush=True) |
| deleted += 1 |
| continue |
| if delete_auth_fully(name): |
| print(f"[{_now()}] deleted: {name} ({reason})", flush=True) |
| deleted += 1 |
| else: |
| print(f"[{_now()}] delete failed: {name}", flush=True) |
| failures.append(name) |
|
|
| elapsed = time.monotonic() - started |
| report = { |
| "scanned": total, |
| "fast_hits": len(fast_hits), |
| "probe_hits": len(probe_hits), |
| "deleted": deleted, |
| "failures": len(failures), |
| "elapsed_seconds": round(elapsed, 1), |
| } |
| print(f"[{_now()}] cleanup done: {json.dumps(report)}", flush=True) |
| return report |
|
|
|
|
| if __name__ == "__main__": |
| dry_run = "--dry-run" in sys.argv |
| try: |
| run_cleanup(dry_run=dry_run) |
| except Exception as exc: |
| print(f"[{_now()}] cleanup failed: {exc}", file=sys.stderr, flush=True) |
| sys.exit(1) |
|
|