hindsight / scripts /backup.py
Arnwald84's picture
fix: backup guard — refuse empty dumps, increase history retention
daf0f64 verified
#!/usr/bin/env python3
"""
Hindsight Backup — pg_dump to HF Dataset.
Uses pg_dump for a consistent snapshot of the embedded PostgreSQL database
while Hindsight is running. Safe for periodic and shutdown backups.
Usage (called by entrypoint.sh):
python3 /opt/backup/backup.py [reason]
Env vars:
HF_TOKEN — HuggingFace write token
HF_BACKUP_REPO — Dataset repo (default: Arnwald84/atum-hindsight-backup)
"""
import glob
import os
import subprocess
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
HF_TOKEN = os.environ.get("HF_TOKEN", "")
HF_REPO = os.environ.get("HF_BACKUP_REPO", "Arnwald84/atum-hindsight-backup")
MAX_HISTORY = 10
MIN_DUMP_SIZE_KB = 200 # Refuse to upload if dump is smaller (likely empty/corrupt)
PG_USER = "hindsight"
PG_PASSWORD = "hindsight"
PG_DATABASE = "hindsight"
PG_PORT = "5432"
REASON = sys.argv[1] if len(sys.argv) > 1 else "manual"
def log(msg: str) -> None:
print(f"[BACKUP] {msg}", flush=True)
def find_pg_bin(name: str) -> str:
"""Find a PostgreSQL binary in the pg0 installation."""
pattern = os.path.expanduser(f"~/.pg0/installation/*/bin/{name}")
matches = sorted(glob.glob(pattern))
if matches:
return matches[-1] # latest version
raise FileNotFoundError(f"{name} not found in ~/.pg0/installation/")
def create_dump() -> str:
"""Create a pg_dump in custom format (consistent while PG is running)."""
pg_dump = find_pg_bin("pg_dump")
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
dump_file = os.path.join(tempfile.gettempdir(), f"hindsight-{timestamp}.pgdump")
env = os.environ.copy()
env["PGPASSWORD"] = PG_PASSWORD
result = subprocess.run(
[
pg_dump,
"-U", PG_USER,
"-d", PG_DATABASE,
"-p", PG_PORT,
"-Fc",
"--no-owner",
"--no-acl",
"-f", dump_file,
],
capture_output=True,
text=True,
env=env,
)
if result.returncode != 0:
raise RuntimeError(f"pg_dump failed: {result.stderr}")
return dump_file
def upload_to_hf(dump_file: str) -> None:
"""Upload pg_dump to HF Dataset as latest + timestamped history."""
from huggingface_hub import HfApi
api = HfApi(token=HF_TOKEN)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
# Ensure dataset repo exists (private, idempotent)
api.create_repo(repo_id=HF_REPO, repo_type="dataset", exist_ok=True, private=True)
# Upload as latest (overwrite)
api.upload_file(
path_or_fileobj=dump_file,
path_in_repo="snapshots/latest.pgdump",
repo_id=HF_REPO,
repo_type="dataset",
commit_message=f"Backup {timestamp} ({REASON})",
)
log(f"Uploaded snapshots/latest.pgdump to {HF_REPO}")
# Upload timestamped copy for history
api.upload_file(
path_or_fileobj=dump_file,
path_in_repo=f"snapshots/history/{timestamp}.pgdump",
repo_id=HF_REPO,
repo_type="dataset",
commit_message=f"History snapshot {timestamp}",
)
log(f"Uploaded snapshots/history/{timestamp}.pgdump")
# Rotate: keep only the last N history snapshots
all_files = list(api.list_repo_files(repo_id=HF_REPO, repo_type="dataset"))
history_files = sorted(
[f for f in all_files if f.startswith("snapshots/history/") and f.endswith(".pgdump")],
reverse=True,
)
for old_file in history_files[MAX_HISTORY:]:
api.delete_file(
path_in_repo=old_file,
repo_id=HF_REPO,
repo_type="dataset",
commit_message=f"Rotate old snapshot {old_file}",
)
log(f"Deleted old snapshot: {old_file}")
def main() -> None:
if not HF_TOKEN:
log("HF_TOKEN not set — skipping backup")
return
log(f"Starting backup (reason: {REASON})...")
dump_file = create_dump()
size_kb = Path(dump_file).stat().st_size / 1024
log(f"pg_dump created: {size_kb:.0f} KB")
# Guard: refuse to overwrite good backups with empty/corrupt dumps
if size_kb < MIN_DUMP_SIZE_KB:
log(f"SKIPPED: dump too small ({size_kb:.0f} KB < {MIN_DUMP_SIZE_KB} KB) — likely empty database, refusing to overwrite good backup")
Path(dump_file).unlink(missing_ok=True)
return
try:
upload_to_hf(dump_file)
finally:
Path(dump_file).unlink(missing_ok=True)
log("Backup complete")
if __name__ == "__main__":
try:
main()
except Exception as e:
log(f"FAILED: {e}")
sys.exit(1)