#!/usr/bin/env python3 import argparse import hashlib import json import os import shutil import subprocess import sys from datetime import datetime, timezone from pathlib import Path from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError def env_required(name: str) -> str: value = os.environ.get(name, "").strip() if not value: raise SystemExit(f"missing required env: {name}") return value MC_BIN = os.environ.get("MC_BIN", "mc") ALIAS = os.environ.get("OBJECTSTORE_ALIAS", "daili-objectstore").strip() or "daili-objectstore" ENDPOINT = env_required("OBJECTSTORE_ENDPOINT") ACCESS_KEY = env_required("OBJECTSTORE_ACCESS_KEY") SECRET_KEY = env_required("OBJECTSTORE_SECRET_KEY") BUCKET = env_required("OBJECTSTORE_BUCKET") ROOT = Path(env_required("OBJECTSTORE_ROOT")).resolve() CONFIG_FALLBACK = os.environ.get("OBJECTSTORE_CONFIG_FALLBACK", "").strip() REMOTE_CONFIG_KEY = "config/config.yaml" REMOTE_AUTHS_PREFIX = "auths" def mc_env() -> dict: env = os.environ.copy() for key in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"): env.pop(key, None) return env MC_TIMEOUT = int(os.environ.get("MC_TIMEOUT", "30")) def run_mc(args: list[str], check: bool = True) -> subprocess.CompletedProcess[str]: return subprocess.run( [MC_BIN, *args], check=check, env=mc_env(), capture_output=True, text=True, timeout=MC_TIMEOUT, ) def ensure_alias() -> None: run_mc(["alias", "set", ALIAS, ENDPOINT, ACCESS_KEY, SECRET_KEY, "--api", "S3v4", "--path", "on"]) def remote_path(key: str) -> str: return f"{ALIAS}/{BUCKET}/{key}" def parse_ts(value: str) -> float: return datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp() def local_path(rel: str) -> Path: return ROOT / rel def touch_local(path: Path, ts: float) -> None: os.utime(path, (ts, ts)) def prune_empty_dirs(root: Path) -> None: if not root.exists(): return for path in sorted((p for p in root.rglob("*") if p.is_dir()), reverse=True): try: path.rmdir() except OSError: pass def file_md5(path: Path) -> str: digest = hashlib.md5() with path.open("rb") as handle: while True: chunk = handle.read(1024 * 1024) if not chunk: break digest.update(chunk) return digest.hexdigest() def remote_inventory() -> dict[str, dict]: inventory: dict[str, dict] = {} config_proc = run_mc(["stat", "--json", remote_path(REMOTE_CONFIG_KEY)], check=False) if config_proc.returncode == 0 and config_proc.stdout.strip(): raw = json.loads(config_proc.stdout) inventory[REMOTE_CONFIG_KEY] = { "etag": raw["etag"], "last_modified": parse_ts(raw["lastModified"]), "remote": remote_path(REMOTE_CONFIG_KEY), } auths_proc = run_mc(["ls", "--json", "--recursive", remote_path(REMOTE_AUTHS_PREFIX)], check=False) if auths_proc.returncode == 0: for line in auths_proc.stdout.splitlines(): if not line.strip(): continue raw = json.loads(line) if raw.get("status") != "success" or raw.get("type") != "file": continue rel = f"{REMOTE_AUTHS_PREFIX}/{raw['key']}" inventory[rel] = { "etag": raw["etag"], "last_modified": parse_ts(raw["lastModified"]), "remote": remote_path(rel), } return inventory def local_inventory() -> dict[str, dict]: inventory: dict[str, dict] = {} config_path = local_path(REMOTE_CONFIG_KEY) if config_path.is_file(): inventory[REMOTE_CONFIG_KEY] = { "md5": file_md5(config_path), "mtime": config_path.stat().st_mtime, } auth_root = local_path(REMOTE_AUTHS_PREFIX) if auth_root.is_dir(): for path in sorted(auth_root.rglob("*")): if not path.is_file(): continue rel = path.relative_to(ROOT).as_posix() inventory[rel] = { "md5": file_md5(path), "mtime": path.stat().st_mtime, } return inventory def download_file(rel: str, meta: dict) -> None: dest = local_path(rel) dest.parent.mkdir(parents=True, exist_ok=True) run_mc(["cp", meta["remote"], str(dest)]) touch_local(dest, meta["last_modified"]) def upload_file(rel: str) -> None: src = local_path(rel) if not src.is_file(): return run_mc(["cp", str(src), remote_path(rel)]) def delete_remote(rel: str) -> None: run_mc(["rm", "--force", remote_path(rel)], check=False) INVALID_STATUS_KEYWORDS = [ "token_invalidated", "token_revoked", "invalidated oauth token", "authentication token has been invalidated", "account has been deactivated", "no_organization", "unauthorized", "401", ] def _fetch_invalid_auth_names() -> set[str]: """Query local CLIProxyAPI management API for auth names with invalid status.""" mgmt_key = os.environ.get("MANAGEMENT_PASSWORD") or os.environ.get("API_KEY") or "" port = os.environ.get("PORT", "8317") if not mgmt_key: return set() url = f"http://127.0.0.1:{port}/v0/management/auth-files" req = Request(url) req.add_header("Authorization", f"Bearer {mgmt_key}") try: with urlopen(req, timeout=10) as resp: data = json.loads(resp.read()) except (HTTPError, URLError, OSError, ValueError): return set() if not isinstance(data, dict): return set() files = data.get("files", []) if not isinstance(files, list): return set() invalid_names: set[str] = set() now = datetime.now(timezone.utc).isoformat() for entry in files: if not isinstance(entry, dict): continue name = str(entry.get("name") or "").strip() if not name: continue status = str(entry.get("status") or "").strip().lower() status_message = str(entry.get("status_message") or "").strip().lower() unavailable = entry.get("unavailable", False) # Check next_retry_after — if still in cooldown, treat as invalid next_retry = str(entry.get("next_retry_after") or "").strip() in_cooldown = bool(next_retry and next_retry > now) if status == "error" or unavailable or in_cooldown: invalid_names.add(name) continue if status_message: for kw in INVALID_STATUS_KEYWORDS: if kw in status_message: invalid_names.add(name) break return invalid_names def restore() -> None: ensure_alias() ROOT.mkdir(parents=True, exist_ok=True) local_path("config").mkdir(parents=True, exist_ok=True) shutil.rmtree(local_path(REMOTE_AUTHS_PREFIX), ignore_errors=True) local_path(REMOTE_AUTHS_PREFIX).mkdir(parents=True, exist_ok=True) remote = remote_inventory() if REMOTE_CONFIG_KEY in remote: download_file(REMOTE_CONFIG_KEY, remote[REMOTE_CONFIG_KEY]) elif CONFIG_FALLBACK: fallback = Path(CONFIG_FALLBACK) if fallback.is_file(): target = local_path(REMOTE_CONFIG_KEY) target.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(fallback, target) run_mc(["cp", "--recursive", remote_path(f"{REMOTE_AUTHS_PREFIX}/"), str(local_path(REMOTE_AUTHS_PREFIX))], check=False) def sync() -> None: ensure_alias() ROOT.mkdir(parents=True, exist_ok=True) invalid_names = _fetch_invalid_auth_names() remote = remote_inventory() local = local_inventory() # Remote has, local doesn't → download (unless invalid) # Both have, md5 differs → compare mtime, newer wins (unless invalid) for rel, meta in remote.items(): # Extract auth file name from rel path (e.g. "auths/codex-xxx-free.json" → "codex-xxx-free.json") file_name = rel.rsplit("/", 1)[-1] if "/" in rel else rel if file_name in invalid_names: delete_remote(rel) dest = local_path(rel) if dest.is_file(): dest.unlink() continue local_meta = local.get(rel) if local_meta is None: download_file(rel, meta) continue if local_meta["md5"] == meta["etag"]: continue if local_meta["mtime"] > meta["last_modified"]: upload_file(rel) else: download_file(rel, meta) # Local has, remote doesn't → upload to remote (new file from management UI) for rel in sorted(set(local) - set(remote)): file_name = rel.rsplit("/", 1)[-1] if "/" in rel else rel if file_name in invalid_names: dest = local_path(rel) if dest.is_file(): dest.unlink() continue upload_file(rel) prune_empty_dirs(local_path(REMOTE_AUTHS_PREFIX)) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("mode", choices=("restore", "sync")) args = parser.parse_args() if args.mode == "restore": restore() else: sync() return 0 if __name__ == "__main__": sys.exit(main())