daili / objectstore_sync.py
pjpjq's picture
fix: also treat auths in cooldown (next_retry_after) as invalid during sync
7c3abff
#!/usr/bin/env python3
import argparse
import hashlib
import json
import os
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
def env_required(name: str) -> str:
value = os.environ.get(name, "").strip()
if not value:
raise SystemExit(f"missing required env: {name}")
return value
MC_BIN = os.environ.get("MC_BIN", "mc")
ALIAS = os.environ.get("OBJECTSTORE_ALIAS", "daili-objectstore").strip() or "daili-objectstore"
ENDPOINT = env_required("OBJECTSTORE_ENDPOINT")
ACCESS_KEY = env_required("OBJECTSTORE_ACCESS_KEY")
SECRET_KEY = env_required("OBJECTSTORE_SECRET_KEY")
BUCKET = env_required("OBJECTSTORE_BUCKET")
ROOT = Path(env_required("OBJECTSTORE_ROOT")).resolve()
CONFIG_FALLBACK = os.environ.get("OBJECTSTORE_CONFIG_FALLBACK", "").strip()
REMOTE_CONFIG_KEY = "config/config.yaml"
REMOTE_AUTHS_PREFIX = "auths"
def mc_env() -> dict:
env = os.environ.copy()
for key in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"):
env.pop(key, None)
return env
MC_TIMEOUT = int(os.environ.get("MC_TIMEOUT", "30"))
def run_mc(args: list[str], check: bool = True) -> subprocess.CompletedProcess[str]:
return subprocess.run(
[MC_BIN, *args],
check=check,
env=mc_env(),
capture_output=True,
text=True,
timeout=MC_TIMEOUT,
)
def ensure_alias() -> None:
run_mc(["alias", "set", ALIAS, ENDPOINT, ACCESS_KEY, SECRET_KEY, "--api", "S3v4", "--path", "on"])
def remote_path(key: str) -> str:
return f"{ALIAS}/{BUCKET}/{key}"
def parse_ts(value: str) -> float:
return datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp()
def local_path(rel: str) -> Path:
return ROOT / rel
def touch_local(path: Path, ts: float) -> None:
os.utime(path, (ts, ts))
def prune_empty_dirs(root: Path) -> None:
if not root.exists():
return
for path in sorted((p for p in root.rglob("*") if p.is_dir()), reverse=True):
try:
path.rmdir()
except OSError:
pass
def file_md5(path: Path) -> str:
digest = hashlib.md5()
with path.open("rb") as handle:
while True:
chunk = handle.read(1024 * 1024)
if not chunk:
break
digest.update(chunk)
return digest.hexdigest()
def remote_inventory() -> dict[str, dict]:
inventory: dict[str, dict] = {}
config_proc = run_mc(["stat", "--json", remote_path(REMOTE_CONFIG_KEY)], check=False)
if config_proc.returncode == 0 and config_proc.stdout.strip():
raw = json.loads(config_proc.stdout)
inventory[REMOTE_CONFIG_KEY] = {
"etag": raw["etag"],
"last_modified": parse_ts(raw["lastModified"]),
"remote": remote_path(REMOTE_CONFIG_KEY),
}
auths_proc = run_mc(["ls", "--json", "--recursive", remote_path(REMOTE_AUTHS_PREFIX)], check=False)
if auths_proc.returncode == 0:
for line in auths_proc.stdout.splitlines():
if not line.strip():
continue
raw = json.loads(line)
if raw.get("status") != "success" or raw.get("type") != "file":
continue
rel = f"{REMOTE_AUTHS_PREFIX}/{raw['key']}"
inventory[rel] = {
"etag": raw["etag"],
"last_modified": parse_ts(raw["lastModified"]),
"remote": remote_path(rel),
}
return inventory
def local_inventory() -> dict[str, dict]:
inventory: dict[str, dict] = {}
config_path = local_path(REMOTE_CONFIG_KEY)
if config_path.is_file():
inventory[REMOTE_CONFIG_KEY] = {
"md5": file_md5(config_path),
"mtime": config_path.stat().st_mtime,
}
auth_root = local_path(REMOTE_AUTHS_PREFIX)
if auth_root.is_dir():
for path in sorted(auth_root.rglob("*")):
if not path.is_file():
continue
rel = path.relative_to(ROOT).as_posix()
inventory[rel] = {
"md5": file_md5(path),
"mtime": path.stat().st_mtime,
}
return inventory
def download_file(rel: str, meta: dict) -> None:
dest = local_path(rel)
dest.parent.mkdir(parents=True, exist_ok=True)
run_mc(["cp", meta["remote"], str(dest)])
touch_local(dest, meta["last_modified"])
def upload_file(rel: str) -> None:
src = local_path(rel)
if not src.is_file():
return
run_mc(["cp", str(src), remote_path(rel)])
def delete_remote(rel: str) -> None:
run_mc(["rm", "--force", remote_path(rel)], check=False)
INVALID_STATUS_KEYWORDS = [
"token_invalidated",
"token_revoked",
"invalidated oauth token",
"authentication token has been invalidated",
"account has been deactivated",
"no_organization",
"unauthorized",
"401",
]
def _fetch_invalid_auth_names() -> set[str]:
"""Query local CLIProxyAPI management API for auth names with invalid status."""
mgmt_key = os.environ.get("MANAGEMENT_PASSWORD") or os.environ.get("API_KEY") or ""
port = os.environ.get("PORT", "8317")
if not mgmt_key:
return set()
url = f"http://127.0.0.1:{port}/v0/management/auth-files"
req = Request(url)
req.add_header("Authorization", f"Bearer {mgmt_key}")
try:
with urlopen(req, timeout=10) as resp:
data = json.loads(resp.read())
except (HTTPError, URLError, OSError, ValueError):
return set()
if not isinstance(data, dict):
return set()
files = data.get("files", [])
if not isinstance(files, list):
return set()
invalid_names: set[str] = set()
now = datetime.now(timezone.utc).isoformat()
for entry in files:
if not isinstance(entry, dict):
continue
name = str(entry.get("name") or "").strip()
if not name:
continue
status = str(entry.get("status") or "").strip().lower()
status_message = str(entry.get("status_message") or "").strip().lower()
unavailable = entry.get("unavailable", False)
# Check next_retry_after β€” if still in cooldown, treat as invalid
next_retry = str(entry.get("next_retry_after") or "").strip()
in_cooldown = bool(next_retry and next_retry > now)
if status == "error" or unavailable or in_cooldown:
invalid_names.add(name)
continue
if status_message:
for kw in INVALID_STATUS_KEYWORDS:
if kw in status_message:
invalid_names.add(name)
break
return invalid_names
def restore() -> None:
ensure_alias()
ROOT.mkdir(parents=True, exist_ok=True)
local_path("config").mkdir(parents=True, exist_ok=True)
shutil.rmtree(local_path(REMOTE_AUTHS_PREFIX), ignore_errors=True)
local_path(REMOTE_AUTHS_PREFIX).mkdir(parents=True, exist_ok=True)
remote = remote_inventory()
if REMOTE_CONFIG_KEY in remote:
download_file(REMOTE_CONFIG_KEY, remote[REMOTE_CONFIG_KEY])
elif CONFIG_FALLBACK:
fallback = Path(CONFIG_FALLBACK)
if fallback.is_file():
target = local_path(REMOTE_CONFIG_KEY)
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(fallback, target)
run_mc(["cp", "--recursive", remote_path(f"{REMOTE_AUTHS_PREFIX}/"), str(local_path(REMOTE_AUTHS_PREFIX))], check=False)
def sync() -> None:
ensure_alias()
ROOT.mkdir(parents=True, exist_ok=True)
invalid_names = _fetch_invalid_auth_names()
remote = remote_inventory()
local = local_inventory()
# Remote has, local doesn't β†’ download (unless invalid)
# Both have, md5 differs β†’ compare mtime, newer wins (unless invalid)
for rel, meta in remote.items():
# Extract auth file name from rel path (e.g. "auths/codex-xxx-free.json" β†’ "codex-xxx-free.json")
file_name = rel.rsplit("/", 1)[-1] if "/" in rel else rel
if file_name in invalid_names:
delete_remote(rel)
dest = local_path(rel)
if dest.is_file():
dest.unlink()
continue
local_meta = local.get(rel)
if local_meta is None:
download_file(rel, meta)
continue
if local_meta["md5"] == meta["etag"]:
continue
if local_meta["mtime"] > meta["last_modified"]:
upload_file(rel)
else:
download_file(rel, meta)
# Local has, remote doesn't β†’ upload to remote (new file from management UI)
for rel in sorted(set(local) - set(remote)):
file_name = rel.rsplit("/", 1)[-1] if "/" in rel else rel
if file_name in invalid_names:
dest = local_path(rel)
if dest.is_file():
dest.unlink()
continue
upload_file(rel)
prune_empty_dirs(local_path(REMOTE_AUTHS_PREFIX))
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("mode", choices=("restore", "sync"))
args = parser.parse_args()
if args.mode == "restore":
restore()
else:
sync()
return 0
if __name__ == "__main__":
sys.exit(main())