File size: 3,861 Bytes
5f91e0b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
Persistence Manager for HF Spaces.
Syncs local models and metrics with an HF dataset repo to survive restarts.
"""

import os
import json
import logging
import shutil
from pathlib import Path
from typing import List, Dict, Any
from huggingface_hub import HfApi

logger = logging.getLogger(__name__)

class PersistenceManager:
    """Manages syncing of critical data to HF Dataset repository."""
    
    def __init__(self):
        self.repo_id = os.getenv("HF_STORE_REPO")  # e.g., "username/janus-memory"
        self.token = os.getenv("HUGGINGFACE_API_KEY") or os.getenv("HF_TOKEN")
        
        from app.config import DATA_DIR
        self.data_dir = Path(DATA_DIR)
        
        # Directories to sync
        self.sync_dirs = [
            "distilled_models",
            "metrics",
            "learning",
            "knowledge",
            "skills"
        ]
        
        self.api = None
        if self.repo_id and self.token:
            try:
                self.api = HfApi(token=self.token)
                logger.info(f"✓ PersistenceManager initialized for repo: {self.repo_id}")
            except Exception as e:
                logger.error(f"Failed to init HF API for persistence: {e}")

    def download_all(self):
        """Download all synced directories from HF on startup."""
        if not self.api:
            logger.warning("Persistence sync skipped: HF_STORE_REPO or HF_TOKEN not set")
            return

        logger.info(f"Pulling persisted data from {self.repo_id}...")
        for folder in self.sync_dirs:
            try:
                local_path = self.data_dir / folder
                local_path.mkdir(parents=True, exist_ok=True)
                
                # Check if folder exists in repo
                files = self.api.list_repo_tree(self.repo_id, path_in_repo=folder, repo_type="dataset")
                if not files:
                    continue

                for file_info in files:
                    if file_info.path.endswith('.json') or file_info.path.endswith('.jsonl'):
                        self.api.hf_hub_download(
                            repo_id=self.repo_id,
                            filename=file_info.path,
                            repo_type="dataset",
                            local_dir=str(self.data_dir),
                            force_download=True
                        )
                logger.info(f"  ✓ Synced {folder}")
            except Exception as e:
                logger.error(f"  ✗ Failed to sync {folder}: {e}")

    def upload_all(self):
        """Upload all synced directories to HF."""
        if not self.api:
            return

        logger.info(f"Pushing data to {self.repo_id}...")
        for folder in self.sync_dirs:
            local_path = self.data_dir / folder
            if not local_path.exists():
                continue

            try:
                self.api.upload_folder(
                    folder_path=str(local_path),
                    path_in_repo=folder,
                    repo_id=self.repo_id,
                    repo_type="dataset",
                    commit_message=f"Sync {folder} from Janus instance"
                )
                logger.info(f"  ✓ Pushed {folder}")
            except Exception as e:
                logger.error(f"  ✗ Failed to push {folder}: {e}")

    def upload_file(self, local_file_path: str, path_in_repo: str):
        """Upload a specific file to the repo."""
        if not self.api:
            return
        
        try:
            self.api.upload_file(
                path_or_fileobj=local_file_path,
                path_in_repo=path_in_repo,
                repo_id=self.repo_id,
                repo_type="dataset"
            )
        except Exception as e:
            logger.error(f"Failed to upload file {local_file_path}: {e}")