#!/usr/bin/env python3 """ Sync Manager - Automatically backs up PostgreSQL to HuggingFace Xet Dataset """ import os import sys import time import subprocess import tempfile import signal from datetime import datetime from pathlib import Path # Configuration SYNC_INTERVAL = int(os.environ.get('SYNC_INTERVAL', 300)) # 5 minutes HF_TOKEN = os.environ.get('HF_TOKEN', '') XET_DATASET = os.environ.get('XET_DATASET', 'your-username/postgres-backup') POSTGRES_HOST = 'localhost' POSTGRES_PORT = 5432 POSTGRES_USER = 'postgres' POSTGRES_DB = 'appdb' PGPASSWORD = os.environ.get('POSTGRES_PASSWORD', 'postgres123') # Global flag for graceful shutdown running = True def log(message: str): """Print timestamped log message.""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] {message}", flush=True) def wait_for_postgres(max_retries: int = 30) -> bool: """Wait for PostgreSQL to be ready.""" import psycopg2 for i in range(max_retries): try: conn = psycopg2.connect( host=POSTGRES_HOST, port=POSTGRES_PORT, user=POSTGRES_USER, password=PGPASSWORD, database='postgres' ) conn.close() log("✅ PostgreSQL is ready") return True except psycopg2.OperationalError: log(f"⏳ Waiting for PostgreSQL... ({i+1}/{max_retries})") time.sleep(2) log("❌ PostgreSQL did not become ready") return False def create_backup() -> str | None: """Create a PostgreSQL backup using pg_dump.""" try: backup_dir = Path('/data/backup') backup_dir.mkdir(parents=True, exist_ok=True) backup_file = backup_dir / 'pg_backup.sql' env = os.environ.copy() env['PGPASSWORD'] = PGPASSWORD # Create backup result = subprocess.run( [ 'pg_dump', '-h', POSTGRES_HOST, '-p', str(POSTGRES_PORT), '-U', POSTGRES_USER, '-d', POSTGRES_DB, '--clean', '--if-exists', '-f', str(backup_file) ], env=env, capture_output=True, text=True ) if result.returncode == 0: size = backup_file.stat().st_size log(f"✅ Backup created: {backup_file} ({size} bytes)") return str(backup_file) else: log(f"❌ Backup failed: {result.stderr}") return None except Exception as e: log(f"❌ Backup error: {e}") return None def upload_to_xet(backup_file: str) -> bool: """Upload backup to HuggingFace Xet dataset.""" if not HF_TOKEN: log("⚠️ No HF_TOKEN set, skipping upload") return False try: from huggingface_hub import HfApi, create_repo api = HfApi(token=HF_TOKEN) # Create dataset repo if it doesn't exist try: create_repo( repo_id=XET_DATASET, repo_type="dataset", private=True, token=HF_TOKEN, exist_ok=True ) except Exception as e: log(f"ℹ️ Repo creation note: {e}") # Upload backup file api.upload_file( path_or_fileobj=backup_file, path_in_repo="backup/pg_backup.sql", repo_id=XET_DATASET, repo_type="dataset", token=HF_TOKEN, commit_message=f"PostgreSQL backup - {datetime.now().isoformat()}" ) log(f"✅ Backup uploaded to {XET_DATASET}") return True except Exception as e: log(f"❌ Upload failed: {e}") return False def sync_once(): """Perform one sync cycle.""" log("🔄 Starting sync cycle...") backup_file = create_backup() if backup_file: upload_to_xet(backup_file) log("✅ Sync cycle complete") def signal_handler(signum, frame): """Handle shutdown signals.""" global running log("🛑 Shutdown signal received, performing final backup...") running = False sync_once() log("👋 Sync manager shutting down") sys.exit(0) def main(): """Main sync loop.""" log("🚀 Sync Manager starting...") log(f"📦 Target dataset: {XET_DATASET}") log(f"⏰ Sync interval: {SYNC_INTERVAL} seconds") # Set up signal handlers signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Wait for PostgreSQL if not wait_for_postgres(): log("❌ Cannot start without PostgreSQL") return # Initial delay time.sleep(30) # Main loop while running: try: sync_once() except Exception as e: log(f"❌ Sync error: {e}") # Sleep with interrupt handling for _ in range(SYNC_INTERVAL): if not running: break time.sleep(1) if __name__ == '__main__': main()