daili / cleanup_invalid_auths.py
pjpjq's picture
fix: align _low_quota_from_wham with reg_openai.py parsing path (rate_limit.primary_window)
bf816a5
#!/usr/bin/env python3
"""Periodic cleanup of invalid/low-quota free codex auths.
Runs inside the HF Space container, calls CLIProxyAPI on 127.0.0.1 β€”
no HF rate-limiter in the path.
Usage:
python3 cleanup_invalid_auths.py [--dry-run]
Environment variables:
MANAGEMENT_PASSWORD β€” management API key (required)
PORT β€” gateway port (default 8317)
CLEANUP_PROBE_WORKERS β€” concurrent probe workers (default 8)
CLEANUP_MAX_ACTIVE_PROBES β€” max entries to probe (default 120, 0=unlimited)
CLEANUP_LOW_QUOTA_PERCENT β€” weekly quota threshold % (default 10)
OBJECTSTORE_ENDPOINT β€” objectstore endpoint (for deleting synced copies)
OBJECTSTORE_BUCKET β€” objectstore bucket
OBJECTSTORE_ACCESS_KEY β€” objectstore access key
OBJECTSTORE_SECRET_KEY β€” objectstore secret key
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import quote
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
MGMT_KEY = os.environ.get("MANAGEMENT_PASSWORD") or os.environ.get("API_KEY") or "1111"
MGMT_PORT = os.environ.get("PORT", "8317")
MGMT_BASE = f"http://127.0.0.1:{MGMT_PORT}/v0/management"
PROBE_WORKERS = int(os.environ.get("CLEANUP_PROBE_WORKERS", "8"))
MAX_ACTIVE_PROBES = int(os.environ.get("CLEANUP_MAX_ACTIVE_PROBES", "500"))
LOW_QUOTA_PERCENT = float(os.environ.get("CLEANUP_LOW_QUOTA_PERCENT", "10"))
# Objectstore config β€” for deleting synced auth copies so they don't come back
OBJECTSTORE_ENDPOINT = os.environ.get("OBJECTSTORE_ENDPOINT", "").strip()
OBJECTSTORE_BUCKET = os.environ.get("OBJECTSTORE_BUCKET", "").strip()
OBJECTSTORE_ACCESS_KEY = os.environ.get("OBJECTSTORE_ACCESS_KEY", "").strip()
OBJECTSTORE_SECRET_KEY = os.environ.get("OBJECTSTORE_SECRET_KEY", "").strip()
OBJECTSTORE_ALIAS = os.environ.get("OBJECTSTORE_ALIAS", "daili-objectstore").strip()
OBJECTSTORE_LOCAL_BASE = os.environ.get("OBJECTSTORE_LOCAL_PATH", os.environ.get("WRITABLE_PATH", "/tmp")).strip()
MC_BIN = os.environ.get("MC_BIN", "mc")
MC_CONFIG_DIR = os.environ.get("MC_CONFIG_DIR", f"{OBJECTSTORE_LOCAL_BASE}/.mc")
WHAM_USAGE_URL = "https://chatgpt.com/backend-api/wham/usage"
WHAM_HEADERS = {
"Authorization": "Bearer $TOKEN$",
"Content-Type": "application/json",
"User-Agent": "codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal",
}
# status_message keywords that indicate an invalid auth
MESSAGE_KEYWORDS = [
"authentication token has been invalidated",
"invalidated oauth token",
"token_invalidated",
"token_revoked",
"account has been deactivated",
"no_organization",
"must be a member of an organization",
]
def _now() -> str:
return datetime.now(timezone.utc).strftime("%H:%M:%S")
def _mgmt_request(method: str, path: str, data: bytes | None = None, timeout: int = 15) -> tuple[int, Any]:
url = f"{MGMT_BASE}{path}"
req = Request(url, data=data, method=method)
req.add_header("Authorization", f"Bearer {MGMT_KEY}")
req.add_header("Content-Type", "application/json")
try:
with urlopen(req, timeout=timeout) as resp:
body = json.loads(resp.read())
return resp.status, body
except HTTPError as e:
body = ""
try:
body = e.read().decode()
except Exception:
pass
return e.code, body
except (URLError, OSError) as e:
return 0, str(e)
def list_auth_files() -> list[dict[str, Any]]:
status, body = _mgmt_request("GET", "/auth-files")
if status != 200 or not isinstance(body, dict):
raise RuntimeError(f"list auth-files failed: status={status}")
files = body.get("files", [])
return [f for f in files if isinstance(f, dict)]
def delete_auth_file(name: str) -> bool:
encoded = quote(name, safe="")
status, _ = _mgmt_request("DELETE", f"/auth-files?name={encoded}")
return 200 <= status < 300
def _objectstore_enabled() -> bool:
return bool(OBJECTSTORE_ENDPOINT and OBJECTSTORE_BUCKET and OBJECTSTORE_ACCESS_KEY and OBJECTSTORE_SECRET_KEY)
def _mc_env() -> dict[str, str]:
env = os.environ.copy()
for key in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"):
env.pop(key, None)
env["MC_CONFIG_DIR"] = MC_CONFIG_DIR
return env
def _delete_from_objectstore(name: str) -> bool:
"""Delete auth file from objectstore so sync doesn't pull it back."""
if not _objectstore_enabled():
return False
remote = f"{OBJECTSTORE_ALIAS}/{OBJECTSTORE_BUCKET}/auths/{name}"
try:
subprocess.run(
[MC_BIN, "rm", "--force", remote],
env=_mc_env(), capture_output=True, text=True, timeout=15,
)
return True
except Exception:
return False
def _delete_local_auth_file(name: str) -> None:
"""Delete local objectstore mirror copy."""
local_path = Path(OBJECTSTORE_LOCAL_BASE) / "objectstore" / "auths" / name
if local_path.is_file():
local_path.unlink(missing_ok=True)
def delete_auth_fully(name: str) -> bool:
"""Delete auth from management API + objectstore + local mirror."""
ok = delete_auth_file(name)
if ok:
_delete_from_objectstore(name)
_delete_local_auth_file(name)
return ok
def api_call(payload: dict[str, Any], timeout: int = 10) -> dict[str, Any] | None:
data = json.dumps(payload, ensure_ascii=False).encode()
status, body = _mgmt_request("POST", "/api-call", data=data, timeout=timeout)
if status != 200 or not isinstance(body, dict):
return None
return body
def _is_free_auth(name: str) -> bool:
return name.strip().endswith("-free.json")
def _status_reason(entry: dict[str, Any]) -> str:
"""Check status_message for known invalid keywords (no HTTP needed)."""
name = str(entry.get("name") or "").strip()
if not _is_free_auth(name):
return ""
status_message = str(entry.get("status_message") or "").strip().lower()
if not status_message:
return ""
for kw in MESSAGE_KEYWORDS:
if kw in status_message:
return f"status_message={kw}"
# JSON-encoded status_message with error.code
try:
parsed = json.loads(status_message)
except ValueError:
return ""
if isinstance(parsed, dict):
err = parsed.get("error", {})
if isinstance(err, dict):
code = str(err.get("code") or "")
if code in ("token_invalidated", "token_revoked", "usage_limit_reached"):
return f"status_message_json={code}"
plan_type = str(err.get("plan_type") or "").lower()
err_type = str(err.get("type") or "").lower()
if err_type == "usage_limit_reached" and (not plan_type or plan_type == "free"):
return "status_message=usage_limit_reached_free"
return ""
def _low_quota_reason_from_entry(entry: dict[str, Any]) -> str:
"""Check server-side quota fields."""
for field in ("weekly_remaining_percent", "weekly_limit_remaining_percent",
"weekly_quota_remaining_percent", "weekly_percent_remaining"):
val = entry.get(field)
if val is not None:
try:
pct = float(val)
if pct < LOW_QUOTA_PERCENT:
return f"{field}<{LOW_QUOTA_PERCENT:g}%"
except (TypeError, ValueError):
pass
return ""
def _probe_wham_usage(entry: dict[str, Any]) -> str:
"""Probe wham/usage via local api-call proxy. Returns reason string or empty."""
auth_index = str(entry.get("auth_index") or "").strip()
if not auth_index:
return ""
account_id = ""
id_token = entry.get("id_token")
if isinstance(id_token, dict):
account_id = str(id_token.get("chatgpt_account_id") or "").strip()
if not account_id:
account_id = str(entry.get("account_id") or "").strip()
headers = dict(WHAM_HEADERS)
if account_id:
headers["Chatgpt-Account-Id"] = account_id
response = api_call({
"auth_index": auth_index,
"method": "GET",
"url": WHAM_USAGE_URL,
"header": headers,
})
if not isinstance(response, dict):
return ""
status_code = 0
try:
status_code = int(response.get("status_code", 0) or 0)
except (TypeError, ValueError):
return ""
body = str(response.get("body") or "").lower()
# 200 = check quota
if status_code == 200:
try:
payload = json.loads(response.get("body") or "{}")
except ValueError:
return ""
if not isinstance(payload, dict):
return ""
return _low_quota_from_wham(payload)
# 401 = token invalid
if status_code == 401:
if "token has been invalidated" in body:
return "probe=token_invalidated"
if "invalidated oauth token" in body:
return "probe=token_revoked"
if "account has been deactivated" in body:
return "probe=account_deactivated"
return "probe=401"
return ""
def _parse_percent_value(val: Any) -> float | None:
if val is None:
return None
try:
return float(val)
except (TypeError, ValueError):
return None
def _remaining_percent_from_window(window: Any) -> float | None:
if not isinstance(window, dict):
return None
for field in ("remaining_percent", "left_percent", "available_percent"):
pct = _parse_percent_value(window.get(field))
if pct is not None:
return pct
used = _parse_percent_value(window.get("used_percent"))
if used is not None:
return max(0.0, 100.0 - used)
return None
def _low_quota_from_wham(payload: dict[str, Any]) -> str:
"""Check wham/usage response for low weekly quota β€” mirrors reg_openai.py logic."""
plan_type = str(payload.get("plan_type") or "").strip().lower()
if plan_type and plan_type != "free":
return ""
rate_limit = payload.get("rate_limit")
if not isinstance(rate_limit, dict):
return ""
remaining = _remaining_percent_from_window(rate_limit.get("primary_window"))
if remaining is None:
return ""
if remaining < LOW_QUOTA_PERCENT:
return f"probe_wham_weekly_remaining<{LOW_QUOTA_PERCENT:g}%"
return ""
def run_cleanup(dry_run: bool = False) -> dict[str, Any]:
started = time.monotonic()
print(f"[{_now()}] cleanup start", flush=True)
files = list_auth_files()
total = len(files)
print(f"[{_now()}] auth-files total={total}", flush=True)
# Phase 1: fast status-based filter (no HTTP)
fast_hits: dict[str, str] = {}
probe_candidates: list[dict[str, Any]] = []
for entry in files:
name = str(entry.get("name") or "").strip()
if not name or not _is_free_auth(name):
continue
reason = _status_reason(entry)
if not reason:
reason = _low_quota_reason_from_entry(entry)
if reason:
fast_hits[name] = reason
continue
probe_candidates.append(entry)
print(f"[{_now()}] fast_hits={len(fast_hits)}, probe_candidates={len(probe_candidates)}", flush=True)
# Phase 2: active probe via wham/usage (local api-call, no HF rate limit)
probe_hits: dict[str, str] = {}
if probe_candidates:
if MAX_ACTIVE_PROBES > 0 and len(probe_candidates) > MAX_ACTIVE_PROBES:
print(f"[{_now()}] truncating probe candidates from {len(probe_candidates)} to {MAX_ACTIVE_PROBES}", flush=True)
probe_candidates = probe_candidates[:MAX_ACTIVE_PROBES]
workers = min(PROBE_WORKERS, len(probe_candidates))
print(f"[{_now()}] probing {len(probe_candidates)} entries with {workers} workers", flush=True)
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {
pool.submit(_probe_wham_usage, entry): str(entry.get("name") or "").strip()
for entry in probe_candidates
}
done = 0
for future in as_completed(futures):
done += 1
name = futures[future]
try:
reason = future.result()
except Exception as exc:
print(f"[{_now()}] probe error: {name}: {exc}", flush=True)
continue
if reason:
probe_hits[name] = reason
if done % 20 == 0 or done == len(futures):
print(f"[{_now()}] probe progress: {done}/{len(futures)}, hits={len(probe_hits)}", flush=True)
# Merge and delete
all_hits = {**fast_hits, **probe_hits}
print(f"[{_now()}] total hits to delete: {len(all_hits)}", flush=True)
deleted = 0
failures: list[str] = []
for name, reason in all_hits.items():
if dry_run:
print(f"[{_now()}] [dry-run] would delete: {name} ({reason})", flush=True)
deleted += 1
continue
if delete_auth_fully(name):
print(f"[{_now()}] deleted: {name} ({reason})", flush=True)
deleted += 1
else:
print(f"[{_now()}] delete failed: {name}", flush=True)
failures.append(name)
elapsed = time.monotonic() - started
report = {
"scanned": total,
"fast_hits": len(fast_hits),
"probe_hits": len(probe_hits),
"deleted": deleted,
"failures": len(failures),
"elapsed_seconds": round(elapsed, 1),
}
print(f"[{_now()}] cleanup done: {json.dumps(report)}", flush=True)
return report
if __name__ == "__main__":
dry_run = "--dry-run" in sys.argv
try:
run_cleanup(dry_run=dry_run)
except Exception as exc:
print(f"[{_now()}] cleanup failed: {exc}", file=sys.stderr, flush=True)
sys.exit(1)