"""Restore Aether SQLite db from HF Dataset. Downloads `latest-backup.tar.gz` from the configured Dataset repo, unpacks it into AETHER_BACKUP_DIR, and prints a status line. Quietly succeeds if the backup file does not exist yet (first launch). """ import os import sys import tarfile from pathlib import Path from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError token = os.environ["HF_TOKEN"] repo_id = os.environ["AETHER_BACKUP_REPO"] dest_dir = Path(os.environ.get("AETHER_BACKUP_DIR", "/opt/aether/data")) filename = os.environ.get("AETHER_BACKUP_LATEST_NAME", "latest-backup.tar.gz") dest_dir.mkdir(parents=True, exist_ok=True) api = HfApi(token=token) try: local_tar = hf_hub_download( repo_id=repo_id, filename=filename, repo_type="dataset", token=token, force_download=True, ) except EntryNotFoundError: print("aether-restore: no backup yet, fresh start", file=sys.stderr) sys.exit(0) except RepositoryNotFoundError: print( f"aether-restore: backup repo {repo_id} not found, skip", file=sys.stderr, ) sys.exit(0) except Exception as exc: print(f"aether-restore: download error: {exc}", file=sys.stderr) sys.exit(1) print(f"aether-restore: downloaded {local_tar}", file=sys.stderr) with tarfile.open(local_tar, "r:gz") as tar: tar.extractall(path=dest_dir) print(f"aether-restore: restored into {dest_dir}", file=sys.stderr)