| |
|
|
| import argparse |
| import hashlib |
| import json |
| import os |
| import shutil |
| import subprocess |
| import sys |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from urllib.request import Request, urlopen |
| from urllib.error import HTTPError, URLError |
|
|
|
|
| def env_required(name: str) -> str: |
| value = os.environ.get(name, "").strip() |
| if not value: |
| raise SystemExit(f"missing required env: {name}") |
| return value |
|
|
|
|
| MC_BIN = os.environ.get("MC_BIN", "mc") |
| ALIAS = os.environ.get("OBJECTSTORE_ALIAS", "daili-objectstore").strip() or "daili-objectstore" |
| ENDPOINT = env_required("OBJECTSTORE_ENDPOINT") |
| ACCESS_KEY = env_required("OBJECTSTORE_ACCESS_KEY") |
| SECRET_KEY = env_required("OBJECTSTORE_SECRET_KEY") |
| BUCKET = env_required("OBJECTSTORE_BUCKET") |
| ROOT = Path(env_required("OBJECTSTORE_ROOT")).resolve() |
| CONFIG_FALLBACK = os.environ.get("OBJECTSTORE_CONFIG_FALLBACK", "").strip() |
| REMOTE_CONFIG_KEY = "config/config.yaml" |
| REMOTE_AUTHS_PREFIX = "auths" |
|
|
|
|
| def mc_env() -> dict: |
| env = os.environ.copy() |
| for key in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"): |
| env.pop(key, None) |
| return env |
|
|
|
|
| MC_TIMEOUT = int(os.environ.get("MC_TIMEOUT", "30")) |
|
|
|
|
| def run_mc(args: list[str], check: bool = True) -> subprocess.CompletedProcess[str]: |
| return subprocess.run( |
| [MC_BIN, *args], |
| check=check, |
| env=mc_env(), |
| capture_output=True, |
| text=True, |
| timeout=MC_TIMEOUT, |
| ) |
|
|
|
|
| def ensure_alias() -> None: |
| run_mc(["alias", "set", ALIAS, ENDPOINT, ACCESS_KEY, SECRET_KEY, "--api", "S3v4", "--path", "on"]) |
|
|
|
|
| def remote_path(key: str) -> str: |
| return f"{ALIAS}/{BUCKET}/{key}" |
|
|
|
|
| def parse_ts(value: str) -> float: |
| return datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp() |
|
|
|
|
| def local_path(rel: str) -> Path: |
| return ROOT / rel |
|
|
|
|
| def touch_local(path: Path, ts: float) -> None: |
| os.utime(path, (ts, ts)) |
|
|
|
|
| def prune_empty_dirs(root: Path) -> None: |
| if not root.exists(): |
| return |
| for path in sorted((p for p in root.rglob("*") if p.is_dir()), reverse=True): |
| try: |
| path.rmdir() |
| except OSError: |
| pass |
|
|
|
|
| def file_md5(path: Path) -> str: |
| digest = hashlib.md5() |
| with path.open("rb") as handle: |
| while True: |
| chunk = handle.read(1024 * 1024) |
| if not chunk: |
| break |
| digest.update(chunk) |
| return digest.hexdigest() |
|
|
|
|
| def remote_inventory() -> dict[str, dict]: |
| inventory: dict[str, dict] = {} |
|
|
| config_proc = run_mc(["stat", "--json", remote_path(REMOTE_CONFIG_KEY)], check=False) |
| if config_proc.returncode == 0 and config_proc.stdout.strip(): |
| raw = json.loads(config_proc.stdout) |
| inventory[REMOTE_CONFIG_KEY] = { |
| "etag": raw["etag"], |
| "last_modified": parse_ts(raw["lastModified"]), |
| "remote": remote_path(REMOTE_CONFIG_KEY), |
| } |
|
|
| auths_proc = run_mc(["ls", "--json", "--recursive", remote_path(REMOTE_AUTHS_PREFIX)], check=False) |
| if auths_proc.returncode == 0: |
| for line in auths_proc.stdout.splitlines(): |
| if not line.strip(): |
| continue |
| raw = json.loads(line) |
| if raw.get("status") != "success" or raw.get("type") != "file": |
| continue |
| rel = f"{REMOTE_AUTHS_PREFIX}/{raw['key']}" |
| inventory[rel] = { |
| "etag": raw["etag"], |
| "last_modified": parse_ts(raw["lastModified"]), |
| "remote": remote_path(rel), |
| } |
|
|
| return inventory |
|
|
|
|
| def local_inventory() -> dict[str, dict]: |
| inventory: dict[str, dict] = {} |
|
|
| config_path = local_path(REMOTE_CONFIG_KEY) |
| if config_path.is_file(): |
| inventory[REMOTE_CONFIG_KEY] = { |
| "md5": file_md5(config_path), |
| "mtime": config_path.stat().st_mtime, |
| } |
|
|
| auth_root = local_path(REMOTE_AUTHS_PREFIX) |
| if auth_root.is_dir(): |
| for path in sorted(auth_root.rglob("*")): |
| if not path.is_file(): |
| continue |
| rel = path.relative_to(ROOT).as_posix() |
| inventory[rel] = { |
| "md5": file_md5(path), |
| "mtime": path.stat().st_mtime, |
| } |
|
|
| return inventory |
|
|
|
|
| def download_file(rel: str, meta: dict) -> None: |
| dest = local_path(rel) |
| dest.parent.mkdir(parents=True, exist_ok=True) |
| run_mc(["cp", meta["remote"], str(dest)]) |
| touch_local(dest, meta["last_modified"]) |
|
|
|
|
| def upload_file(rel: str) -> None: |
| src = local_path(rel) |
| if not src.is_file(): |
| return |
| run_mc(["cp", str(src), remote_path(rel)]) |
|
|
|
|
| def delete_remote(rel: str) -> None: |
| run_mc(["rm", "--force", remote_path(rel)], check=False) |
|
|
|
|
| INVALID_STATUS_KEYWORDS = [ |
| "token_invalidated", |
| "token_revoked", |
| "invalidated oauth token", |
| "authentication token has been invalidated", |
| "account has been deactivated", |
| "no_organization", |
| "unauthorized", |
| "401", |
| ] |
|
|
|
|
| def _fetch_invalid_auth_names() -> set[str]: |
| """Query local CLIProxyAPI management API for auth names with invalid status.""" |
| mgmt_key = os.environ.get("MANAGEMENT_PASSWORD") or os.environ.get("API_KEY") or "" |
| port = os.environ.get("PORT", "8317") |
| if not mgmt_key: |
| return set() |
| url = f"http://127.0.0.1:{port}/v0/management/auth-files" |
| req = Request(url) |
| req.add_header("Authorization", f"Bearer {mgmt_key}") |
| try: |
| with urlopen(req, timeout=10) as resp: |
| data = json.loads(resp.read()) |
| except (HTTPError, URLError, OSError, ValueError): |
| return set() |
| if not isinstance(data, dict): |
| return set() |
| files = data.get("files", []) |
| if not isinstance(files, list): |
| return set() |
|
|
| invalid_names: set[str] = set() |
| now = datetime.now(timezone.utc).isoformat() |
| for entry in files: |
| if not isinstance(entry, dict): |
| continue |
| name = str(entry.get("name") or "").strip() |
| if not name: |
| continue |
| status = str(entry.get("status") or "").strip().lower() |
| status_message = str(entry.get("status_message") or "").strip().lower() |
| unavailable = entry.get("unavailable", False) |
| |
| next_retry = str(entry.get("next_retry_after") or "").strip() |
| in_cooldown = bool(next_retry and next_retry > now) |
| if status == "error" or unavailable or in_cooldown: |
| invalid_names.add(name) |
| continue |
| if status_message: |
| for kw in INVALID_STATUS_KEYWORDS: |
| if kw in status_message: |
| invalid_names.add(name) |
| break |
| return invalid_names |
|
|
|
|
| def restore() -> None: |
| ensure_alias() |
| ROOT.mkdir(parents=True, exist_ok=True) |
| local_path("config").mkdir(parents=True, exist_ok=True) |
| shutil.rmtree(local_path(REMOTE_AUTHS_PREFIX), ignore_errors=True) |
| local_path(REMOTE_AUTHS_PREFIX).mkdir(parents=True, exist_ok=True) |
|
|
| remote = remote_inventory() |
| if REMOTE_CONFIG_KEY in remote: |
| download_file(REMOTE_CONFIG_KEY, remote[REMOTE_CONFIG_KEY]) |
| elif CONFIG_FALLBACK: |
| fallback = Path(CONFIG_FALLBACK) |
| if fallback.is_file(): |
| target = local_path(REMOTE_CONFIG_KEY) |
| target.parent.mkdir(parents=True, exist_ok=True) |
| shutil.copy2(fallback, target) |
|
|
| run_mc(["cp", "--recursive", remote_path(f"{REMOTE_AUTHS_PREFIX}/"), str(local_path(REMOTE_AUTHS_PREFIX))], check=False) |
|
|
|
|
| def sync() -> None: |
| ensure_alias() |
| ROOT.mkdir(parents=True, exist_ok=True) |
|
|
| invalid_names = _fetch_invalid_auth_names() |
|
|
| remote = remote_inventory() |
| local = local_inventory() |
|
|
| |
| |
| for rel, meta in remote.items(): |
| |
| file_name = rel.rsplit("/", 1)[-1] if "/" in rel else rel |
| if file_name in invalid_names: |
| delete_remote(rel) |
| dest = local_path(rel) |
| if dest.is_file(): |
| dest.unlink() |
| continue |
|
|
| local_meta = local.get(rel) |
| if local_meta is None: |
| download_file(rel, meta) |
| continue |
| if local_meta["md5"] == meta["etag"]: |
| continue |
| if local_meta["mtime"] > meta["last_modified"]: |
| upload_file(rel) |
| else: |
| download_file(rel, meta) |
|
|
| |
| for rel in sorted(set(local) - set(remote)): |
| file_name = rel.rsplit("/", 1)[-1] if "/" in rel else rel |
| if file_name in invalid_names: |
| dest = local_path(rel) |
| if dest.is_file(): |
| dest.unlink() |
| continue |
| upload_file(rel) |
|
|
| prune_empty_dirs(local_path(REMOTE_AUTHS_PREFIX)) |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("mode", choices=("restore", "sync")) |
| args = parser.parse_args() |
|
|
| if args.mode == "restore": |
| restore() |
| else: |
| sync() |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|