Janus-backend / backend /app /services /curation /persistence_manager.py
DevodG's picture
feat: stable janus intelligence with kaggle distillation
5f91e0b
"""
Persistence Manager for HF Spaces.
Syncs local models and metrics with an HF dataset repo to survive restarts.
"""
import os
import json
import logging
import shutil
from pathlib import Path
from typing import List, Dict, Any
from huggingface_hub import HfApi
logger = logging.getLogger(__name__)
class PersistenceManager:
"""Manages syncing of critical data to HF Dataset repository."""
def __init__(self):
self.repo_id = os.getenv("HF_STORE_REPO") # e.g., "username/janus-memory"
self.token = os.getenv("HUGGINGFACE_API_KEY") or os.getenv("HF_TOKEN")
from app.config import DATA_DIR
self.data_dir = Path(DATA_DIR)
# Directories to sync
self.sync_dirs = [
"distilled_models",
"metrics",
"learning",
"knowledge",
"skills"
]
self.api = None
if self.repo_id and self.token:
try:
self.api = HfApi(token=self.token)
logger.info(f"βœ“ PersistenceManager initialized for repo: {self.repo_id}")
except Exception as e:
logger.error(f"Failed to init HF API for persistence: {e}")
def download_all(self):
"""Download all synced directories from HF on startup."""
if not self.api:
logger.warning("Persistence sync skipped: HF_STORE_REPO or HF_TOKEN not set")
return
logger.info(f"Pulling persisted data from {self.repo_id}...")
for folder in self.sync_dirs:
try:
local_path = self.data_dir / folder
local_path.mkdir(parents=True, exist_ok=True)
# Check if folder exists in repo
files = self.api.list_repo_tree(self.repo_id, path_in_repo=folder, repo_type="dataset")
if not files:
continue
for file_info in files:
if file_info.path.endswith('.json') or file_info.path.endswith('.jsonl'):
self.api.hf_hub_download(
repo_id=self.repo_id,
filename=file_info.path,
repo_type="dataset",
local_dir=str(self.data_dir),
force_download=True
)
logger.info(f" βœ“ Synced {folder}")
except Exception as e:
logger.error(f" βœ— Failed to sync {folder}: {e}")
def upload_all(self):
"""Upload all synced directories to HF."""
if not self.api:
return
logger.info(f"Pushing data to {self.repo_id}...")
for folder in self.sync_dirs:
local_path = self.data_dir / folder
if not local_path.exists():
continue
try:
self.api.upload_folder(
folder_path=str(local_path),
path_in_repo=folder,
repo_id=self.repo_id,
repo_type="dataset",
commit_message=f"Sync {folder} from Janus instance"
)
logger.info(f" βœ“ Pushed {folder}")
except Exception as e:
logger.error(f" βœ— Failed to push {folder}: {e}")
def upload_file(self, local_file_path: str, path_in_repo: str):
"""Upload a specific file to the repo."""
if not self.api:
return
try:
self.api.upload_file(
path_or_fileobj=local_file_path,
path_in_repo=path_in_repo,
repo_id=self.repo_id,
repo_type="dataset"
)
except Exception as e:
logger.error(f"Failed to upload file {local_file_path}: {e}")