#!/usr/bin/env python3 """Sync PostgreSQL backups to HuggingFace Xet Dataset""" import os import sys import time import subprocess import signal from datetime import datetime from pathlib import Path SYNC_INTERVAL = int(os.environ.get('SYNC_INTERVAL', 300)) HF_TOKEN = os.environ.get('HF_TOKEN', '') XET_DATASET = os.environ.get('XET_DATASET', '') running = True def log(msg): print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}", flush=True) def wait_for_postgres(retries=30): for i in range(retries): result = subprocess.run( ['pg_isready', '-h', 'localhost', '-p', '5432'], capture_output=True ) if result.returncode == 0: log("✅ PostgreSQL ready") return True time.sleep(2) return False def backup_and_upload(): if not HF_TOKEN or not XET_DATASET: log("⚠️ HF_TOKEN or XET_DATASET not set") return try: backup_dir = Path('/data/backup') backup_dir.mkdir(parents=True, exist_ok=True) backup_file = backup_dir / 'pg_backup.sql' # Create backup result = subprocess.run( ['pg_dump', '-h', 'localhost', '-U', 'postgres', '-d', 'appdb', '--clean', '--if-exists', '-f', str(backup_file)], capture_output=True, text=True ) if result.returncode != 0: log(f"❌ Backup failed: {result.stderr}") return log(f"✅ Backup created ({backup_file.stat().st_size} bytes)") # Upload to HuggingFace from huggingface_hub import HfApi, create_repo api = HfApi(token=HF_TOKEN) try: create_repo(XET_DATASET, repo_type="dataset", private=True, token=HF_TOKEN, exist_ok=True) except: pass api.upload_file( path_or_fileobj=str(backup_file), path_in_repo="backup/pg_backup.sql", repo_id=XET_DATASET, repo_type="dataset", token=HF_TOKEN, commit_message=f"Backup {datetime.now().isoformat()}" ) log(f"✅ Uploaded to {XET_DATASET}") except Exception as e: log(f"❌ Error: {e}") def signal_handler(signum, frame): global running log("🛑 Shutting down, final backup...") running = False backup_and_upload() sys.exit(0) def main(): log("🚀 Sync Manager started") log(f"📦 Dataset: {XET_DATASET or 'NOT SET'}") log(f"⏰ Interval: {SYNC_INTERVAL}s") signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) if not wait_for_postgres(): log("❌ PostgreSQL not available") return time.sleep(30) # Initial delay while running: backup_and_upload() for _ in range(SYNC_INTERVAL): if not running: break time.sleep(1) if __name__ == '__main__': main()