""" Session sync with Hugging Face Dataset for persistent storage. CRITICAL: Dataset MUST be private to protect user tokens! """ import os import tarfile import logging from pathlib import Path from huggingface_hub import HfApi, hf_hub_download, upload_file, create_repo logger = logging.getLogger(__name__) # Configuration DATASET_REPO_ID = os.environ.get("HF_SESSION_DATASET", "glutamatt/neural-runner-sessions") SESSION_DIR = Path("/home/user/app/data") BACKUP_FILENAME = "sessions.tar.gz" class SessionSyncError(Exception): """Raised when session sync fails safety checks""" pass def check_dataset_is_private(api: HfApi, repo_id: str) -> None: """ Verify Dataset is private before any operations. SECURITY: Sessions contain user credentials and tokens. They MUST NEVER be in a public Dataset. Raises: SessionSyncError: If Dataset is public or doesn't exist as private """ try: info = api.repo_info(repo_id=repo_id, repo_type="dataset") if not info.private: raise SessionSyncError( f"🚨 SECURITY ERROR: Dataset '{repo_id}' is PUBLIC!\n" f"Sessions contain user credentials and MUST be private.\n" f"Fix: Go to https://huggingface.co/datasets/{repo_id}/settings\n" f" and set visibility to 'Private'" ) logger.info(f"✓ Dataset '{repo_id}' is private (secure)") except Exception as e: if "not found" in str(e).lower(): # Dataset doesn't exist - we'll create it as private logger.info(f"Dataset '{repo_id}' doesn't exist yet, will create as private") else: raise SessionSyncError(f"Failed to check Dataset privacy: {e}") def create_private_dataset(api: HfApi, repo_id: str) -> None: """ Create Dataset as private if it doesn't exist. Args: api: HfApi instance repo_id: Dataset repository ID """ try: create_repo( repo_id=repo_id, repo_type="dataset", private=True, # CRITICAL: Always create as private! exist_ok=True ) logger.info(f"✓ Created private Dataset: {repo_id}") except Exception as e: logger.error(f"Failed to create Dataset: {e}") raise def compress_sessions(source_dir: Path, output_file: Path) -> None: """ Compress session directory to tar.gz. Args: source_dir: Directory containing session files output_file: Output tar.gz file path """ with tarfile.open(output_file, "w:gz") as tar: for session_file in source_dir.rglob("*.json"): tar.add(session_file, arcname=session_file.relative_to(source_dir)) logger.info(f"Compressed {output_file} ({output_file.stat().st_size} bytes)") def extract_sessions(archive_file: Path, dest_dir: Path) -> None: """ Extract session tar.gz to directory. Args: archive_file: Input tar.gz file dest_dir: Destination directory """ with tarfile.open(archive_file, "r:gz") as tar: tar.extractall(dest_dir) logger.info(f"Extracted sessions to {dest_dir}") def restore_sessions_from_dataset() -> None: """ Download and restore sessions from HF Dataset at startup. SECURITY: Verifies Dataset is private before downloading. Raises: SessionSyncError: If Dataset is public """ api = HfApi() # CRITICAL: Check Dataset is private first! check_dataset_is_private(api, DATASET_REPO_ID) try: # Download sessions archive logger.info(f"Downloading sessions from Dataset: {DATASET_REPO_ID}") archive_path = hf_hub_download( repo_id=DATASET_REPO_ID, filename=BACKUP_FILENAME, repo_type="dataset", local_dir="/tmp" ) # Extract to session directory extract_sessions(Path(archive_path), SESSION_DIR) logger.info(f"✓ Restored sessions from Dataset") except Exception as e: if "not found" in str(e).lower(): logger.info("No existing sessions in Dataset (first boot)") else: logger.error(f"Failed to restore sessions: {e}") # Don't fail startup - just start with empty sessions def backup_sessions_to_dataset() -> None: """ Compress and upload sessions to HF Dataset. SECURITY: Verifies Dataset is private before uploading. Raises: SessionSyncError: If Dataset is public """ api = HfApi() # CRITICAL: Check Dataset is private first! check_dataset_is_private(api, DATASET_REPO_ID) try: # Check if there are any sessions to backup session_files = list(SESSION_DIR.rglob("*.json")) if not session_files: logger.info("No sessions to backup") return # Compress sessions archive_path = Path("/tmp") / BACKUP_FILENAME compress_sessions(SESSION_DIR, archive_path) # Upload to Dataset logger.info(f"Uploading sessions to Dataset: {DATASET_REPO_ID}") upload_file( path_or_fileobj=str(archive_path), repo_id=DATASET_REPO_ID, repo_type="dataset", path_in_repo=BACKUP_FILENAME, commit_message=f"Backup sessions ({len(session_files)} files)" ) logger.info(f"✓ Backed up {len(session_files)} session files") except Exception as e: logger.error(f"Failed to backup sessions: {e}") # Don't crash - just log the error def ensure_dataset_exists() -> None: """ Ensure private Dataset exists before any operations. Call this at startup to initialize the Dataset if needed. """ api = HfApi() try: # Check if Dataset exists and is private check_dataset_is_private(api, DATASET_REPO_ID) except SessionSyncError as e: # Dataset exists but is public - critical error! raise except Exception: # Dataset doesn't exist - create it as private logger.info(f"Creating private Dataset: {DATASET_REPO_ID}") create_private_dataset(api, DATASET_REPO_ID) check_dataset_is_private(api, DATASET_REPO_ID) # Verify def main(): """CLI entry point for session sync operations.""" import sys import argparse logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s' ) parser = argparse.ArgumentParser(description="Session sync with HF Dataset") parser.add_argument("command", choices=["restore", "backup"], help="restore: Download sessions from Dataset | backup: Upload sessions to Dataset") args = parser.parse_args() try: if args.command == "restore": logger.info("=" * 60) logger.info("Session Restore - Starting") logger.info("=" * 60) if not os.environ.get("HF_TOKEN"): logger.warning("HF_TOKEN not set - skipping session restore") logger.info("✓ No sessions to restore") sys.exit(0) ensure_dataset_exists() restore_sessions_from_dataset() logger.info("=" * 60) logger.info("✓ Session restore complete") logger.info("=" * 60) elif args.command == "backup": logger.info("Starting session backup...") ensure_dataset_exists() backup_sessions_to_dataset() logger.info("✓ Session backup complete") except SessionSyncError as e: logger.error(f"🚨 SECURITY ERROR: {e}") sys.exit(1) except Exception as e: logger.error(f"Session sync failed: {e}") sys.exit(1) if __name__ == "__main__": main()