Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| HuggingClip Database Sync - PostgreSQL Backup/Restore to Hugging Face Dataset | |
| Syncs Paperclip's PostgreSQL database to HF Dataset for persistence across restarts. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import tarfile | |
| import tempfile | |
| import subprocess | |
| import logging | |
| import warnings | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| # Suppress huggingface_hub deprecation noise about local_dir_use_symlinks | |
| warnings.filterwarnings('ignore', category=UserWarning, module='huggingface_hub') | |
| from huggingface_hub import HfApi | |
| from huggingface_hub.utils import RepositoryNotFoundError, EntryNotFoundError | |
| import huggingface_hub | |
| # Disable all HF Hub progress bars (they clutter container logs) | |
| huggingface_hub.utils.disable_progress_bars() | |
| # ============================================================================ | |
| # Configuration | |
| # ============================================================================ | |
| # Logging: WARNING level keeps errors visible, suppresses routine INFO chatter | |
| logging.basicConfig( | |
| level=logging.WARNING, | |
| format='[sync] %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Set our own logger to INFO so backup/restore start+finish lines still print | |
| logger.setLevel(logging.INFO) | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("huggingface_hub").setLevel(logging.WARNING) | |
| # Environment variables | |
| HF_TOKEN = os.environ.get('HF_TOKEN') | |
| HF_USERNAME = os.environ.get('HF_USERNAME') | |
| DATABASE_URL = os.environ.get('DATABASE_URL', 'postgres://postgres:paperclip@localhost:5432/paperclip') | |
| BACKUP_DATASET_NAME = os.environ.get('BACKUP_DATASET_NAME', 'huggingclip-backup') | |
| SYNC_MAX_FILE_BYTES = int(os.environ.get('SYNC_MAX_FILE_BYTES', '52428800')) # 50MB | |
| PAPERCLIP_HOME = os.environ.get('PAPERCLIP_HOME', '/paperclip') | |
| # Status file for dashboard | |
| STATUS_FILE = Path('/tmp/sync-status.json') | |
| STATE_FILE = Path('/tmp/huggingclip-sync-state.json') | |
| # ============================================================================ | |
| # Helper Functions | |
| # ============================================================================ | |
| def parse_db_url(db_url: str) -> dict: | |
| """Parse PostgreSQL connection URL""" | |
| # Format: postgres://user:password@host:port/database | |
| try: | |
| # Remove protocol | |
| connection_str = db_url.replace('postgres://', '').replace('postgresql://', '') | |
| # Parse credentials | |
| if '@' in connection_str: | |
| creds, host_db = connection_str.split('@') | |
| if ':' in creds: | |
| user, password = creds.split(':', 1) | |
| else: | |
| user = creds | |
| password = '' | |
| else: | |
| user = 'postgres' | |
| password = '' | |
| host_db = connection_str | |
| # Parse host and database | |
| if ':' in host_db and '/' in host_db: | |
| host_port, database = host_db.rsplit('/', 1) | |
| if ':' in host_port: | |
| host, port = host_port.rsplit(':', 1) | |
| else: | |
| host = host_port | |
| port = '5432' | |
| else: | |
| host = host_db.split('/')[0] | |
| port = '5432' | |
| database = host_db.split('/')[-1] if '/' in host_db else 'paperclip' | |
| return { | |
| 'user': user, | |
| 'password': password, | |
| 'host': host, | |
| 'port': port, | |
| 'database': database | |
| } | |
| except Exception as e: | |
| logger.error(f'Failed to parse DATABASE_URL: {e}') | |
| return None | |
| def write_status(status: dict): | |
| """Write sync status to file for dashboard""" | |
| try: | |
| # Ensure compatibility with dashboard fields: `status` and `message` | |
| try: | |
| if 'db_status' in status and 'status' not in status: | |
| status['status'] = status['db_status'] | |
| if 'last_error' in status and 'message' not in status: | |
| status['message'] = status['last_error'] | |
| except Exception: | |
| pass | |
| STATUS_FILE.write_text(json.dumps(status, indent=2)) | |
| except Exception as e: | |
| logger.error(f'Failed to write status file: {e}') | |
| def read_status() -> dict: | |
| """Read current sync status""" | |
| if STATUS_FILE.exists(): | |
| try: | |
| return json.loads(STATUS_FILE.read_text()) | |
| except Exception as e: | |
| logger.error(f'Failed to read status file: {e}') | |
| return { | |
| 'db_status': 'unknown', | |
| 'last_sync_time': None, | |
| 'last_error': None, | |
| 'sync_count': 0 | |
| } | |
| def backup_database() -> tuple[str, bool]: | |
| """ | |
| Backup PostgreSQL database to SQL dump. | |
| Returns (filepath, success) | |
| """ | |
| logger.debug('Starting database backup...') | |
| db_config = parse_db_url(DATABASE_URL) | |
| if not db_config: | |
| return None, False | |
| temp_dir = tempfile.mkdtemp() | |
| dump_file = Path(temp_dir) / 'paperclip.sql' | |
| try: | |
| # Build pg_dump command | |
| env = os.environ.copy() | |
| if db_config['password']: | |
| env['PGPASSWORD'] = db_config['password'] | |
| cmd = [ | |
| 'pg_dump', | |
| f'--host={db_config["host"]}', | |
| f'--port={db_config["port"]}', | |
| f'--username={db_config["user"]}', | |
| '--format=plain', | |
| '--verbose', | |
| db_config['database'] | |
| ] | |
| # Execute pg_dump | |
| with open(dump_file, 'w') as f: | |
| result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, env=env, timeout=300) | |
| if result.returncode != 0: | |
| error_msg = result.stderr.decode('utf-8', errors='ignore') | |
| logger.error(f'pg_dump failed: {error_msg}') | |
| return None, False | |
| dump_size = dump_file.stat().st_size | |
| logger.debug(f'Database dumped ({dump_size / 1024 / 1024:.2f} MB)') | |
| return str(dump_file), True | |
| except subprocess.TimeoutExpired: | |
| logger.error('Database backup timed out (>300s)') | |
| return None, False | |
| except Exception as e: | |
| logger.error(f'Database backup error: {e}') | |
| return None, False | |
| def create_backup_tarball(dump_file: str) -> tuple[str, bool]: | |
| """ | |
| Create tarball with database dump and Paperclip data files. | |
| Returns (tarball_path, success) | |
| """ | |
| logger.debug('Creating backup tarball...') | |
| temp_dir = tempfile.mkdtemp() | |
| tarball_file = Path(temp_dir) / 'paperclip-backup.tar.gz' | |
| try: | |
| with tarfile.open(tarball_file, 'w:gz') as tar: | |
| # Add SQL dump | |
| tar.add(dump_file, arcname='paperclip.sql') | |
| # Add Paperclip home directory if it exists | |
| if Path(PAPERCLIP_HOME).exists(): | |
| tar.add(PAPERCLIP_HOME, arcname='paperclip-data') | |
| else: | |
| logger.warning(f'PAPERCLIP_HOME not found: {PAPERCLIP_HOME}') | |
| tarball_size = tarball_file.stat().st_size | |
| logger.debug(f'Tarball created ({tarball_size / 1024 / 1024:.2f} MB)') | |
| # Check size limit | |
| if tarball_size > SYNC_MAX_FILE_BYTES: | |
| logger.error(f'Backup too large ({tarball_size / SYNC_MAX_FILE_BYTES * 100:.0f}% of limit)') | |
| return None, False | |
| return str(tarball_file), True | |
| except Exception as e: | |
| logger.error(f'Failed to create tarball: {e}') | |
| return None, False | |
| def sync_to_hf(backup_file: str) -> bool: | |
| """Upload backup tarball to Hugging Face Dataset.""" | |
| if not HF_TOKEN: | |
| logger.warning('HF_TOKEN not set - skipping backup upload') | |
| return False | |
| logger.debug('Uploading backup to Hugging Face...') | |
| try: | |
| api = HfApi(token=HF_TOKEN) | |
| username = HF_USERNAME | |
| if not username: | |
| try: | |
| user_info = api.whoami() | |
| username = user_info['name'] | |
| except Exception: | |
| logger.error('Failed to get HF username') | |
| return False | |
| dataset_id = f'{username}/{BACKUP_DATASET_NAME}' | |
| api.create_repo(repo_id=dataset_id, repo_type='dataset', private=True, exist_ok=True) | |
| api.upload_file( | |
| path_or_fileobj=backup_file, | |
| path_in_repo='snapshots/latest.tar.gz', | |
| repo_id=dataset_id, | |
| repo_type='dataset' | |
| ) | |
| logger.debug(f'Backup uploaded to {dataset_id}') | |
| return True | |
| except Exception as e: | |
| logger.error(f'Failed to upload to HF: {e}') | |
| return False | |
| def restore_database(restore_file: str) -> bool: | |
| """ | |
| Restore PostgreSQL database from SQL dump. | |
| """ | |
| logger.debug('Restoring database from backup...') | |
| db_config = parse_db_url(DATABASE_URL) | |
| if not db_config: | |
| return False | |
| try: | |
| # Ensure database exists | |
| admin_cmd = [ | |
| 'psql', | |
| f'--host={db_config["host"]}', | |
| f'--port={db_config["port"]}', | |
| f'--username={db_config["user"]}', | |
| '--no-password', | |
| '-c', | |
| f'CREATE DATABASE IF NOT EXISTS {db_config["database"]};' | |
| ] | |
| env = os.environ.copy() | |
| if db_config['password']: | |
| env['PGPASSWORD'] = db_config['password'] | |
| subprocess.run(admin_cmd, env=env, capture_output=True) | |
| # Restore from dump | |
| restore_cmd = [ | |
| 'psql', | |
| f'--host={db_config["host"]}', | |
| f'--port={db_config["port"]}', | |
| f'--username={db_config["user"]}', | |
| '--no-password', | |
| db_config['database'] | |
| ] | |
| with open(restore_file, 'r') as f: | |
| result = subprocess.run( | |
| restore_cmd, | |
| stdin=f, | |
| stdout=subprocess.DEVNULL, # suppress CREATE TABLE / ALTER TABLE / COPY noise | |
| stderr=subprocess.PIPE, | |
| env=env, | |
| timeout=300 | |
| ) | |
| if result.returncode != 0: | |
| error_msg = result.stderr.decode('utf-8', errors='ignore') | |
| logger.error(f'Restore failed: {error_msg}') | |
| return False | |
| logger.debug('Database restored') | |
| return True | |
| except subprocess.TimeoutExpired: | |
| logger.error('Database restore timed out (>300s)') | |
| return False | |
| except Exception as e: | |
| logger.error(f'Database restore error: {e}') | |
| return False | |
| def sync_from_hf() -> bool: | |
| """ | |
| Download backup from Hugging Face and restore database. | |
| """ | |
| if not HF_TOKEN: | |
| logger.warning('HF_TOKEN not set - skipping restore') | |
| return False | |
| logger.debug('Downloading backup from Hugging Face...') | |
| try: | |
| api = HfApi(token=HF_TOKEN) | |
| # Get username | |
| username = HF_USERNAME | |
| if not username: | |
| try: | |
| user_info = api.whoami() | |
| username = user_info['name'] | |
| except Exception: | |
| logger.warning('Failed to get HF username') | |
| return False | |
| dataset_id = f'{username}/{BACKUP_DATASET_NAME}' | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| snapshot_path = api.hf_hub_download( | |
| repo_id=dataset_id, | |
| repo_type='dataset', | |
| filename='snapshots/latest.tar.gz', | |
| local_dir=temp_dir, | |
| local_dir_use_symlinks=False | |
| ) | |
| except (RepositoryNotFoundError, EntryNotFoundError): | |
| logger.info(f'No backup found in {dataset_id} β fresh instance') | |
| return None # not an error β just no backup yet | |
| logger.debug(f'Downloaded backup from {dataset_id}') | |
| # Extract tarball | |
| logger.debug('Extracting backup...') | |
| with tarfile.open(snapshot_path, 'r:gz') as tar: | |
| tar.extractall(temp_dir, filter='data') | |
| dump_file = Path(temp_dir) / 'paperclip.sql' | |
| if not dump_file.exists(): | |
| logger.error('SQL dump not found in backup') | |
| return False | |
| # Restore database | |
| success = restore_database(str(dump_file)) | |
| # Restore Paperclip data files if present | |
| paperclip_data_dir = Path(temp_dir) / 'paperclip-data' | |
| if paperclip_data_dir.exists(): | |
| logger.debug('Restoring Paperclip data files...') | |
| import shutil | |
| try: | |
| for item in paperclip_data_dir.iterdir(): | |
| target = Path(PAPERCLIP_HOME) / item.name | |
| if target.exists(): | |
| shutil.rmtree(target) if target.is_dir() else target.unlink() | |
| shutil.copytree(item, target) if item.is_dir() else shutil.copy2(item, target) | |
| logger.debug('Data files restored') | |
| except Exception as e: | |
| logger.error(f'Failed to restore data files: {e}') | |
| return success | |
| except Exception as e: | |
| logger.error(f'Failed to restore from HF: {e}') | |
| return False | |
| # ============================================================================ | |
| # Change detection helpers | |
| # ============================================================================ | |
| def _get_db_marker() -> int: | |
| """Return cumulative DB activity count from pg_stat_database. -1 on error.""" | |
| db = parse_db_url(DATABASE_URL) | |
| if not db: | |
| return -1 | |
| try: | |
| env = os.environ.copy() | |
| if db['password']: | |
| env['PGPASSWORD'] = db['password'] | |
| db_name = db['database'] | |
| result = subprocess.run( | |
| [ | |
| 'psql', | |
| f'--host={db["host"]}', | |
| f'--port={db["port"]}', | |
| f'--username={db["user"]}', | |
| '--no-password', '--tuples-only', '--no-align', | |
| '-c', | |
| f"SELECT xact_commit + xact_rollback + tup_inserted + tup_updated + tup_deleted " | |
| f"FROM pg_stat_database WHERE datname = '{db_name}'", | |
| ], | |
| env=env, capture_output=True, text=True, timeout=10, | |
| ) | |
| if result.returncode == 0 and result.stdout.strip(): | |
| return int(result.stdout.strip()) | |
| except Exception: | |
| pass | |
| return -1 | |
| def _fs_marker(root: str) -> tuple[int, int, int]: | |
| p = Path(root) | |
| if not p.exists(): | |
| return (0, 0, 0) | |
| fc = ts = nm = 0 | |
| for path in p.rglob("*"): | |
| if not path.is_file(): | |
| continue | |
| try: | |
| st = path.stat() | |
| fc += 1 | |
| ts += int(st.st_size) | |
| nm = max(nm, int(st.st_mtime_ns)) | |
| except OSError: | |
| continue | |
| return (fc, ts, nm) | |
| def _current_marker() -> tuple: | |
| return (_get_db_marker(),) + _fs_marker(PAPERCLIP_HOME) | |
| def _load_sync_state(): | |
| try: | |
| if STATE_FILE.exists(): | |
| d = json.loads(STATE_FILE.read_text()) | |
| m = d.get('marker') | |
| if m and len(m) == 4: | |
| return tuple(m) | |
| except Exception: | |
| pass | |
| return None | |
| def _save_sync_state(marker: tuple) -> None: | |
| try: | |
| STATE_FILE.write_text(json.dumps({'marker': list(marker)})) | |
| except Exception as e: | |
| logger.debug(f'Could not save sync state: {e}') | |
| # ============================================================================ | |
| # Main Sync Operations | |
| # ============================================================================ | |
| def sync_to_backup() -> bool: | |
| """Full backup operation: dump DB β create tarball β upload to HF""" | |
| logger.info('Syncing backup to HF Dataset...') | |
| last_marker = _load_sync_state() | |
| current_marker = _current_marker() | |
| if last_marker is not None and current_marker == last_marker: | |
| status = read_status() | |
| status['status'] = 'synced' | |
| status['message'] = 'No state changes detected.' | |
| write_status(status) | |
| logger.info('No state changes detected β skipping backup.') | |
| return True | |
| status = read_status() | |
| try: | |
| # Step 1: Backup database | |
| dump_file, success = backup_database() | |
| if not success or not dump_file: | |
| status.update({'last_error': 'Database backup failed', 'db_status': 'error', | |
| 'status': 'error', 'message': 'Backup failed: pg_dump error'}) | |
| write_status(status) | |
| return False | |
| # Step 2: Create tarball | |
| tarball_file, success = create_backup_tarball(dump_file) | |
| if not success or not tarball_file: | |
| status.update({'last_error': 'Tarball creation failed', 'db_status': 'error', | |
| 'status': 'error', 'message': 'Backup failed: tarball too large or I/O error'}) | |
| write_status(status) | |
| return False | |
| # Step 3: Upload to HF | |
| success = sync_to_hf(tarball_file) | |
| # Update status | |
| ts = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') | |
| status['last_sync_time'] = ts | |
| status['db_status'] = 'connected' if success else 'error' | |
| status['last_error'] = None if success else 'Upload failed' | |
| status['sync_count'] = status.get('sync_count', 0) + 1 | |
| status['status'] = 'success' if success else 'error' | |
| status['message'] = 'Backup uploaded to HF Dataset' if success else 'Upload to HF Dataset failed' | |
| write_status(status) | |
| if success: | |
| logger.info('Backup synced OK') | |
| _save_sync_state(current_marker) | |
| else: | |
| logger.warning('Backup sync failed') | |
| return success | |
| except Exception as e: | |
| logger.error(f'Backup operation failed: {e}') | |
| status['last_error'] = str(e) | |
| status['db_status'] = 'error' | |
| status['status'] = 'error' | |
| status['message'] = f'Backup error: {e}' | |
| write_status(status) | |
| return False | |
| def sync_from_backup() -> bool: | |
| """Full restore operation: download from HF β extract β restore DB""" | |
| logger.info('Restoring from HF Dataset...') | |
| status = read_status() | |
| try: | |
| success = sync_from_hf() | |
| if success is None: | |
| # No backup exists yet (first boot) β not an error | |
| status['db_status'] = 'connected' | |
| status['last_error'] = None | |
| status['status'] = 'configured' | |
| status['message'] = 'Fresh instance β no prior backup' | |
| write_status(status) | |
| logger.info('No prior backup β fresh instance') | |
| return True | |
| elif success: | |
| status['db_status'] = 'connected' | |
| status['last_error'] = None | |
| status['status'] = 'restored' | |
| status['message'] = 'Restored from HF Dataset' | |
| write_status(status) | |
| logger.info('Restore OK') | |
| return True | |
| else: | |
| status['db_status'] = 'error' | |
| status['last_error'] = 'Restore failed' | |
| status['status'] = 'error' | |
| status['message'] = 'Restore from HF Dataset failed' | |
| write_status(status) | |
| logger.warning('Restore operation failed') | |
| return False | |
| except Exception as e: | |
| logger.error(f'Restore operation failed: {e}') | |
| status['last_error'] = str(e) | |
| status['db_status'] = 'error' | |
| status['status'] = 'error' | |
| status['message'] = f'Restore error: {e}' | |
| write_status(status) | |
| return False | |
| # ============================================================================ | |
| # CLI | |
| # ============================================================================ | |
| def main(): | |
| if len(sys.argv) < 2: | |
| print('Usage: python3 paperclip-sync.py <command>') | |
| print('Commands:') | |
| print(' sync - Backup database to HF Dataset') | |
| print(' restore - Restore database from HF Dataset backup') | |
| sys.exit(1) | |
| command = sys.argv[1] | |
| if command == 'sync': | |
| success = sync_to_backup() | |
| sys.exit(0 if success else 1) | |
| elif command == 'restore': | |
| success = sync_from_backup() | |
| sys.exit(0 if success else 1) | |
| else: | |
| print(f'Unknown command: {command}') | |
| sys.exit(1) | |
| if __name__ == '__main__': | |
| main() | |