| |
| """ |
| Hindsight Backup — pg_dump to HF Dataset. |
| |
| Uses pg_dump for a consistent snapshot of the embedded PostgreSQL database |
| while Hindsight is running. Safe for periodic and shutdown backups. |
| |
| Usage (called by entrypoint.sh): |
| python3 /opt/backup/backup.py [reason] |
| |
| Env vars: |
| HF_TOKEN — HuggingFace write token |
| HF_BACKUP_REPO — Dataset repo (default: Arnwald84/atum-hindsight-backup) |
| """ |
|
|
| import glob |
| import os |
| import subprocess |
| import sys |
| import tempfile |
| from datetime import datetime, timezone |
| from pathlib import Path |
|
|
| HF_TOKEN = os.environ.get("HF_TOKEN", "") |
| HF_REPO = os.environ.get("HF_BACKUP_REPO", "Arnwald84/atum-hindsight-backup") |
| MAX_HISTORY = 10 |
| MIN_DUMP_SIZE_KB = 200 |
|
|
| PG_USER = "hindsight" |
| PG_PASSWORD = "hindsight" |
| PG_DATABASE = "hindsight" |
| PG_PORT = "5432" |
|
|
| REASON = sys.argv[1] if len(sys.argv) > 1 else "manual" |
|
|
|
|
| def log(msg: str) -> None: |
| print(f"[BACKUP] {msg}", flush=True) |
|
|
|
|
| def find_pg_bin(name: str) -> str: |
| """Find a PostgreSQL binary in the pg0 installation.""" |
| pattern = os.path.expanduser(f"~/.pg0/installation/*/bin/{name}") |
| matches = sorted(glob.glob(pattern)) |
| if matches: |
| return matches[-1] |
| raise FileNotFoundError(f"{name} not found in ~/.pg0/installation/") |
|
|
|
|
| def create_dump() -> str: |
| """Create a pg_dump in custom format (consistent while PG is running).""" |
| pg_dump = find_pg_bin("pg_dump") |
| timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") |
| dump_file = os.path.join(tempfile.gettempdir(), f"hindsight-{timestamp}.pgdump") |
|
|
| env = os.environ.copy() |
| env["PGPASSWORD"] = PG_PASSWORD |
|
|
| result = subprocess.run( |
| [ |
| pg_dump, |
| "-U", PG_USER, |
| "-d", PG_DATABASE, |
| "-p", PG_PORT, |
| "-Fc", |
| "--no-owner", |
| "--no-acl", |
| "-f", dump_file, |
| ], |
| capture_output=True, |
| text=True, |
| env=env, |
| ) |
| if result.returncode != 0: |
| raise RuntimeError(f"pg_dump failed: {result.stderr}") |
|
|
| return dump_file |
|
|
|
|
| def upload_to_hf(dump_file: str) -> None: |
| """Upload pg_dump to HF Dataset as latest + timestamped history.""" |
| from huggingface_hub import HfApi |
|
|
| api = HfApi(token=HF_TOKEN) |
| timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") |
|
|
| |
| api.create_repo(repo_id=HF_REPO, repo_type="dataset", exist_ok=True, private=True) |
|
|
| |
| api.upload_file( |
| path_or_fileobj=dump_file, |
| path_in_repo="snapshots/latest.pgdump", |
| repo_id=HF_REPO, |
| repo_type="dataset", |
| commit_message=f"Backup {timestamp} ({REASON})", |
| ) |
| log(f"Uploaded snapshots/latest.pgdump to {HF_REPO}") |
|
|
| |
| api.upload_file( |
| path_or_fileobj=dump_file, |
| path_in_repo=f"snapshots/history/{timestamp}.pgdump", |
| repo_id=HF_REPO, |
| repo_type="dataset", |
| commit_message=f"History snapshot {timestamp}", |
| ) |
| log(f"Uploaded snapshots/history/{timestamp}.pgdump") |
|
|
| |
| all_files = list(api.list_repo_files(repo_id=HF_REPO, repo_type="dataset")) |
| history_files = sorted( |
| [f for f in all_files if f.startswith("snapshots/history/") and f.endswith(".pgdump")], |
| reverse=True, |
| ) |
| for old_file in history_files[MAX_HISTORY:]: |
| api.delete_file( |
| path_in_repo=old_file, |
| repo_id=HF_REPO, |
| repo_type="dataset", |
| commit_message=f"Rotate old snapshot {old_file}", |
| ) |
| log(f"Deleted old snapshot: {old_file}") |
|
|
|
|
| def main() -> None: |
| if not HF_TOKEN: |
| log("HF_TOKEN not set — skipping backup") |
| return |
|
|
| log(f"Starting backup (reason: {REASON})...") |
|
|
| dump_file = create_dump() |
| size_kb = Path(dump_file).stat().st_size / 1024 |
| log(f"pg_dump created: {size_kb:.0f} KB") |
|
|
| |
| if size_kb < MIN_DUMP_SIZE_KB: |
| log(f"SKIPPED: dump too small ({size_kb:.0f} KB < {MIN_DUMP_SIZE_KB} KB) — likely empty database, refusing to overwrite good backup") |
| Path(dump_file).unlink(missing_ok=True) |
| return |
|
|
| try: |
| upload_to_hf(dump_file) |
| finally: |
| Path(dump_file).unlink(missing_ok=True) |
|
|
| log("Backup complete") |
|
|
|
|
| if __name__ == "__main__": |
| try: |
| main() |
| except Exception as e: |
| log(f"FAILED: {e}") |
| sys.exit(1) |
|
|