Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| Sync Manager - Automatically backs up PostgreSQL to HuggingFace Xet Dataset | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import subprocess | |
| import tempfile | |
| import signal | |
| from datetime import datetime | |
| from pathlib import Path | |
| # Configuration | |
| SYNC_INTERVAL = int(os.environ.get('SYNC_INTERVAL', 300)) # 5 minutes | |
| HF_TOKEN = os.environ.get('HF_TOKEN', '') | |
| XET_DATASET = os.environ.get('XET_DATASET', 'your-username/postgres-backup') | |
| POSTGRES_HOST = 'localhost' | |
| POSTGRES_PORT = 5432 | |
| POSTGRES_USER = 'postgres' | |
| POSTGRES_DB = 'appdb' | |
| PGPASSWORD = os.environ.get('POSTGRES_PASSWORD', 'postgres123') | |
| # Global flag for graceful shutdown | |
| running = True | |
| def log(message: str): | |
| """Print timestamped log message.""" | |
| timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| print(f"[{timestamp}] {message}", flush=True) | |
| def wait_for_postgres(max_retries: int = 30) -> bool: | |
| """Wait for PostgreSQL to be ready.""" | |
| import psycopg2 | |
| for i in range(max_retries): | |
| try: | |
| conn = psycopg2.connect( | |
| host=POSTGRES_HOST, | |
| port=POSTGRES_PORT, | |
| user=POSTGRES_USER, | |
| password=PGPASSWORD, | |
| database='postgres' | |
| ) | |
| conn.close() | |
| log("β PostgreSQL is ready") | |
| return True | |
| except psycopg2.OperationalError: | |
| log(f"β³ Waiting for PostgreSQL... ({i+1}/{max_retries})") | |
| time.sleep(2) | |
| log("β PostgreSQL did not become ready") | |
| return False | |
| def create_backup() -> str | None: | |
| """Create a PostgreSQL backup using pg_dump.""" | |
| try: | |
| backup_dir = Path('/data/backup') | |
| backup_dir.mkdir(parents=True, exist_ok=True) | |
| backup_file = backup_dir / 'pg_backup.sql' | |
| env = os.environ.copy() | |
| env['PGPASSWORD'] = PGPASSWORD | |
| # Create backup | |
| result = subprocess.run( | |
| [ | |
| 'pg_dump', | |
| '-h', POSTGRES_HOST, | |
| '-p', str(POSTGRES_PORT), | |
| '-U', POSTGRES_USER, | |
| '-d', POSTGRES_DB, | |
| '--clean', | |
| '--if-exists', | |
| '-f', str(backup_file) | |
| ], | |
| env=env, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result.returncode == 0: | |
| size = backup_file.stat().st_size | |
| log(f"β Backup created: {backup_file} ({size} bytes)") | |
| return str(backup_file) | |
| else: | |
| log(f"β Backup failed: {result.stderr}") | |
| return None | |
| except Exception as e: | |
| log(f"β Backup error: {e}") | |
| return None | |
| def upload_to_xet(backup_file: str) -> bool: | |
| """Upload backup to HuggingFace Xet dataset.""" | |
| if not HF_TOKEN: | |
| log("β οΈ No HF_TOKEN set, skipping upload") | |
| return False | |
| try: | |
| from huggingface_hub import HfApi, create_repo | |
| api = HfApi(token=HF_TOKEN) | |
| # Create dataset repo if it doesn't exist | |
| try: | |
| create_repo( | |
| repo_id=XET_DATASET, | |
| repo_type="dataset", | |
| private=True, | |
| token=HF_TOKEN, | |
| exist_ok=True | |
| ) | |
| except Exception as e: | |
| log(f"βΉοΈ Repo creation note: {e}") | |
| # Upload backup file | |
| api.upload_file( | |
| path_or_fileobj=backup_file, | |
| path_in_repo="backup/pg_backup.sql", | |
| repo_id=XET_DATASET, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message=f"PostgreSQL backup - {datetime.now().isoformat()}" | |
| ) | |
| log(f"β Backup uploaded to {XET_DATASET}") | |
| return True | |
| except Exception as e: | |
| log(f"β Upload failed: {e}") | |
| return False | |
| def sync_once(): | |
| """Perform one sync cycle.""" | |
| log("π Starting sync cycle...") | |
| backup_file = create_backup() | |
| if backup_file: | |
| upload_to_xet(backup_file) | |
| log("β Sync cycle complete") | |
| def signal_handler(signum, frame): | |
| """Handle shutdown signals.""" | |
| global running | |
| log("π Shutdown signal received, performing final backup...") | |
| running = False | |
| sync_once() | |
| log("π Sync manager shutting down") | |
| sys.exit(0) | |
| def main(): | |
| """Main sync loop.""" | |
| log("π Sync Manager starting...") | |
| log(f"π¦ Target dataset: {XET_DATASET}") | |
| log(f"β° Sync interval: {SYNC_INTERVAL} seconds") | |
| # Set up signal handlers | |
| signal.signal(signal.SIGTERM, signal_handler) | |
| signal.signal(signal.SIGINT, signal_handler) | |
| # Wait for PostgreSQL | |
| if not wait_for_postgres(): | |
| log("β Cannot start without PostgreSQL") | |
| return | |
| # Initial delay | |
| time.sleep(30) | |
| # Main loop | |
| while running: | |
| try: | |
| sync_once() | |
| except Exception as e: | |
| log(f"β Sync error: {e}") | |
| # Sleep with interrupt handling | |
| for _ in range(SYNC_INTERVAL): | |
| if not running: | |
| break | |
| time.sleep(1) | |
| if __name__ == '__main__': | |
| main() |