Spaces:
Running
Running
| """ | |
| Persistence Manager for HF Spaces. | |
| Syncs local models and metrics with an HF dataset repo to survive restarts. | |
| """ | |
| import os | |
| import json | |
| import logging | |
| import shutil | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| from huggingface_hub import HfApi | |
| logger = logging.getLogger(__name__) | |
| class PersistenceManager: | |
| """Manages syncing of critical data to HF Dataset repository.""" | |
| def __init__(self): | |
| self.repo_id = os.getenv("HF_STORE_REPO") # e.g., "username/janus-memory" | |
| self.token = os.getenv("HUGGINGFACE_API_KEY") or os.getenv("HF_TOKEN") | |
| from app.config import DATA_DIR | |
| self.data_dir = Path(DATA_DIR) | |
| # Directories to sync | |
| self.sync_dirs = [ | |
| "distilled_models", | |
| "metrics", | |
| "learning", | |
| "knowledge", | |
| "skills" | |
| ] | |
| self.api = None | |
| if self.repo_id and self.token: | |
| try: | |
| self.api = HfApi(token=self.token) | |
| logger.info(f"β PersistenceManager initialized for repo: {self.repo_id}") | |
| except Exception as e: | |
| logger.error(f"Failed to init HF API for persistence: {e}") | |
| def download_all(self): | |
| """Download all synced directories from HF on startup.""" | |
| if not self.api: | |
| logger.warning("Persistence sync skipped: HF_STORE_REPO or HF_TOKEN not set") | |
| return | |
| logger.info(f"Pulling persisted data from {self.repo_id}...") | |
| for folder in self.sync_dirs: | |
| try: | |
| local_path = self.data_dir / folder | |
| local_path.mkdir(parents=True, exist_ok=True) | |
| # Check if folder exists in repo | |
| files = self.api.list_repo_tree(self.repo_id, path_in_repo=folder, repo_type="dataset") | |
| if not files: | |
| continue | |
| for file_info in files: | |
| if file_info.path.endswith('.json') or file_info.path.endswith('.jsonl'): | |
| self.api.hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=file_info.path, | |
| repo_type="dataset", | |
| local_dir=str(self.data_dir), | |
| force_download=True | |
| ) | |
| logger.info(f" β Synced {folder}") | |
| except Exception as e: | |
| logger.error(f" β Failed to sync {folder}: {e}") | |
| def upload_all(self): | |
| """Upload all synced directories to HF.""" | |
| if not self.api: | |
| return | |
| logger.info(f"Pushing data to {self.repo_id}...") | |
| for folder in self.sync_dirs: | |
| local_path = self.data_dir / folder | |
| if not local_path.exists(): | |
| continue | |
| try: | |
| self.api.upload_folder( | |
| folder_path=str(local_path), | |
| path_in_repo=folder, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Sync {folder} from Janus instance" | |
| ) | |
| logger.info(f" β Pushed {folder}") | |
| except Exception as e: | |
| logger.error(f" β Failed to push {folder}: {e}") | |
| def upload_file(self, local_file_path: str, path_in_repo: str): | |
| """Upload a specific file to the repo.""" | |
| if not self.api: | |
| return | |
| try: | |
| self.api.upload_file( | |
| path_or_fileobj=local_file_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=self.repo_id, | |
| repo_type="dataset" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to upload file {local_file_path}: {e}") | |