neural-runner / session_sync.py
glutamatt's picture
glutamatt HF Staff
feat: initial Neural Runner deployment
89769d5 verified
"""
Session sync with Hugging Face Dataset for persistent storage.
CRITICAL: Dataset MUST be private to protect user tokens!
"""
import os
import tarfile
import logging
from pathlib import Path
from huggingface_hub import HfApi, hf_hub_download, upload_file, create_repo
logger = logging.getLogger(__name__)
# Configuration
DATASET_REPO_ID = os.environ.get("HF_SESSION_DATASET", "glutamatt/neural-runner-sessions")
SESSION_DIR = Path("/home/user/app/data")
BACKUP_FILENAME = "sessions.tar.gz"
class SessionSyncError(Exception):
"""Raised when session sync fails safety checks"""
pass
def check_dataset_is_private(api: HfApi, repo_id: str) -> None:
"""
Verify Dataset is private before any operations.
SECURITY: Sessions contain user credentials and tokens.
They MUST NEVER be in a public Dataset.
Raises:
SessionSyncError: If Dataset is public or doesn't exist as private
"""
try:
info = api.repo_info(repo_id=repo_id, repo_type="dataset")
if not info.private:
raise SessionSyncError(
f"🚨 SECURITY ERROR: Dataset '{repo_id}' is PUBLIC!\n"
f"Sessions contain user credentials and MUST be private.\n"
f"Fix: Go to https://huggingface.co/datasets/{repo_id}/settings\n"
f" and set visibility to 'Private'"
)
logger.info(f"βœ“ Dataset '{repo_id}' is private (secure)")
except Exception as e:
if "not found" in str(e).lower():
# Dataset doesn't exist - we'll create it as private
logger.info(f"Dataset '{repo_id}' doesn't exist yet, will create as private")
else:
raise SessionSyncError(f"Failed to check Dataset privacy: {e}")
def create_private_dataset(api: HfApi, repo_id: str) -> None:
"""
Create Dataset as private if it doesn't exist.
Args:
api: HfApi instance
repo_id: Dataset repository ID
"""
try:
create_repo(
repo_id=repo_id,
repo_type="dataset",
private=True, # CRITICAL: Always create as private!
exist_ok=True
)
logger.info(f"βœ“ Created private Dataset: {repo_id}")
except Exception as e:
logger.error(f"Failed to create Dataset: {e}")
raise
def compress_sessions(source_dir: Path, output_file: Path) -> None:
"""
Compress session directory to tar.gz.
Args:
source_dir: Directory containing session files
output_file: Output tar.gz file path
"""
with tarfile.open(output_file, "w:gz") as tar:
for session_file in source_dir.rglob("*.json"):
tar.add(session_file, arcname=session_file.relative_to(source_dir))
logger.info(f"Compressed {output_file} ({output_file.stat().st_size} bytes)")
def extract_sessions(archive_file: Path, dest_dir: Path) -> None:
"""
Extract session tar.gz to directory.
Args:
archive_file: Input tar.gz file
dest_dir: Destination directory
"""
with tarfile.open(archive_file, "r:gz") as tar:
tar.extractall(dest_dir)
logger.info(f"Extracted sessions to {dest_dir}")
def restore_sessions_from_dataset() -> None:
"""
Download and restore sessions from HF Dataset at startup.
SECURITY: Verifies Dataset is private before downloading.
Raises:
SessionSyncError: If Dataset is public
"""
api = HfApi()
# CRITICAL: Check Dataset is private first!
check_dataset_is_private(api, DATASET_REPO_ID)
try:
# Download sessions archive
logger.info(f"Downloading sessions from Dataset: {DATASET_REPO_ID}")
archive_path = hf_hub_download(
repo_id=DATASET_REPO_ID,
filename=BACKUP_FILENAME,
repo_type="dataset",
local_dir="/tmp"
)
# Extract to session directory
extract_sessions(Path(archive_path), SESSION_DIR)
logger.info(f"βœ“ Restored sessions from Dataset")
except Exception as e:
if "not found" in str(e).lower():
logger.info("No existing sessions in Dataset (first boot)")
else:
logger.error(f"Failed to restore sessions: {e}")
# Don't fail startup - just start with empty sessions
def backup_sessions_to_dataset() -> None:
"""
Compress and upload sessions to HF Dataset.
SECURITY: Verifies Dataset is private before uploading.
Raises:
SessionSyncError: If Dataset is public
"""
api = HfApi()
# CRITICAL: Check Dataset is private first!
check_dataset_is_private(api, DATASET_REPO_ID)
try:
# Check if there are any sessions to backup
session_files = list(SESSION_DIR.rglob("*.json"))
if not session_files:
logger.info("No sessions to backup")
return
# Compress sessions
archive_path = Path("/tmp") / BACKUP_FILENAME
compress_sessions(SESSION_DIR, archive_path)
# Upload to Dataset
logger.info(f"Uploading sessions to Dataset: {DATASET_REPO_ID}")
upload_file(
path_or_fileobj=str(archive_path),
repo_id=DATASET_REPO_ID,
repo_type="dataset",
path_in_repo=BACKUP_FILENAME,
commit_message=f"Backup sessions ({len(session_files)} files)"
)
logger.info(f"βœ“ Backed up {len(session_files)} session files")
except Exception as e:
logger.error(f"Failed to backup sessions: {e}")
# Don't crash - just log the error
def ensure_dataset_exists() -> None:
"""
Ensure private Dataset exists before any operations.
Call this at startup to initialize the Dataset if needed.
"""
api = HfApi()
try:
# Check if Dataset exists and is private
check_dataset_is_private(api, DATASET_REPO_ID)
except SessionSyncError as e:
# Dataset exists but is public - critical error!
raise
except Exception:
# Dataset doesn't exist - create it as private
logger.info(f"Creating private Dataset: {DATASET_REPO_ID}")
create_private_dataset(api, DATASET_REPO_ID)
check_dataset_is_private(api, DATASET_REPO_ID) # Verify
def main():
"""CLI entry point for session sync operations."""
import sys
import argparse
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s'
)
parser = argparse.ArgumentParser(description="Session sync with HF Dataset")
parser.add_argument("command", choices=["restore", "backup"],
help="restore: Download sessions from Dataset | backup: Upload sessions to Dataset")
args = parser.parse_args()
try:
if args.command == "restore":
logger.info("=" * 60)
logger.info("Session Restore - Starting")
logger.info("=" * 60)
if not os.environ.get("HF_TOKEN"):
logger.warning("HF_TOKEN not set - skipping session restore")
logger.info("βœ“ No sessions to restore")
sys.exit(0)
ensure_dataset_exists()
restore_sessions_from_dataset()
logger.info("=" * 60)
logger.info("βœ“ Session restore complete")
logger.info("=" * 60)
elif args.command == "backup":
logger.info("Starting session backup...")
ensure_dataset_exists()
backup_sessions_to_dataset()
logger.info("βœ“ Session backup complete")
except SessionSyncError as e:
logger.error(f"🚨 SECURITY ERROR: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"Session sync failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()