""" persistent_store.py — HF Spaces-aware persistent storage THE PROBLEM: HF Spaces has an ephemeral filesystem. Every time the Space sleeps and wakes, or is restarted, ALL files in data/ are deleted. This means: - All case memory is lost - All cached responses are lost - All learned skills are lost - All simulation history is lost THE SOLUTION: Use HF Datasets as a key-value store. It's free, persistent across restarts, and works with your existing HF_TOKEN secret. Fallback: if HF_TOKEN is not set (local dev), uses local JSON files as before. USAGE: from app.services.persistent_store import store # Save store.set("cases:abc123", {"query": "...", "answer": "..."}) # Load case = store.get("cases:abc123") # List all keys with prefix keys = store.list_prefix("cases:") """ from __future__ import annotations import json import logging import os import pathlib import threading import time from typing import Any, Optional logger = logging.getLogger(__name__) HF_TOKEN = os.getenv("HF_TOKEN", "") HF_REPO_ID = os.getenv("HF_STORE_REPO", "") # e.g. "DevodG/janus-memory" IS_HF_SPACE = os.getenv("SPACE_ID", "") != "" # HF injects SPACE_ID automatically # Local fallback path try: from app.config import DATA_DIR except ImportError: DATA_DIR = pathlib.Path(__file__).parent.parent / "data" class _LocalStore: """Local JSON file store — for development.""" def __init__(self): self._base = pathlib.Path(DATA_DIR) self._lock = threading.RLock() def _path(self, key: str) -> pathlib.Path: # key like "cases:abc123" → data/cases/abc123.json parts = key.split(":", 1) if len(parts) == 2: folder, name = parts else: folder, name = "misc", parts[0] p = self._base / folder p.mkdir(parents=True, exist_ok=True) return p / f"{name}.json" def get(self, key: str) -> Optional[Any]: with self._lock: p = self._path(key) if not p.exists(): return None try: return json.loads(p.read_text()) except Exception: return None def set(self, key: str, value: Any) -> bool: with self._lock: try: p = self._path(key) p.write_text(json.dumps(value, indent=2, default=str)) return True except Exception as e: logger.warning("LocalStore.set(%s) failed: %s", key, e) return False def delete(self, key: str) -> bool: with self._lock: p = self._path(key) if p.exists(): p.unlink() return True return False def list_prefix(self, prefix: str) -> list[str]: with self._lock: parts = prefix.rstrip(":").split(":", 1) folder = parts[0] folder_path = self._base / folder if not folder_path.exists(): return [] return [ f"{folder}:{f.stem}" for f in folder_path.glob("*.json") ] class _HFDatasetStore: """ Persistent store backed by a private HF Dataset repo. Each key is stored as a file in the dataset repo: cases/abc123.json skills/pattern_xyz.json memory/index.json etc. Writes are batched and committed every 60s to avoid rate limits. Reads always check local cache first. """ def __init__(self): from huggingface_hub import HfApi self._api = HfApi(token=HF_TOKEN) self._repo = HF_REPO_ID self._cache: dict[str, Any] = {} self._dirty: dict[str, Any] = {} # pending writes self._lock = threading.RLock() self._last_commit = 0.0 self._commit_interval = 60 # seconds # Ensure repo exists try: self._api.repo_info(repo_id=self._repo, repo_type="dataset") except Exception: logger.info("Creating HF dataset repo: %s", self._repo) try: self._api.create_repo(repo_id=self._repo, repo_type="dataset", private=True) except Exception as e: logger.error("Could not create HF dataset repo: %s", e) # Background commit thread t = threading.Thread(target=self._commit_loop, daemon=True) t.start() def get(self, key: str) -> Optional[Any]: with self._lock: if key in self._dirty: return self._dirty[key] if key in self._cache: return self._cache[key] # Fetch from HF try: from huggingface_hub import hf_hub_download path = hf_hub_download( repo_id=self._repo, filename=self._key_to_filename(key), repo_type="dataset", token=HF_TOKEN, ) data = json.loads(pathlib.Path(path).read_text()) with self._lock: self._cache[key] = data return data except Exception: return None def set(self, key: str, value: Any) -> bool: with self._lock: self._dirty[key] = value self._cache[key] = value return True def delete(self, key: str) -> bool: with self._lock: self._dirty.pop(key, None) self._cache.pop(key, None) try: self._api.delete_file( path_in_repo=self._key_to_filename(key), repo_id=self._repo, repo_type="dataset", ) return True except Exception: return False def list_prefix(self, prefix: str) -> list[str]: try: files = self._api.list_repo_files(repo_id=self._repo, repo_type="dataset") folder = prefix.rstrip(":").replace(":", "/") return [ self._filename_to_key(f) for f in files if f.startswith(folder + "/") ] except Exception: return [] def flush(self): """Force-commit all pending writes now.""" self._commit_dirty() def _commit_loop(self): while True: time.sleep(self._commit_interval) self._commit_dirty() def _commit_dirty(self): with self._lock: if not self._dirty: return batch = dict(self._dirty) self._dirty.clear() import tempfile ops = [] tmp_files = [] try: for key, value in batch.items(): f = tempfile.NamedTemporaryFile( mode="w", suffix=".json", delete=False ) json.dump(value, f, indent=2, default=str) f.close() tmp_files.append(f.name) ops.append({ "path_in_repo": self._key_to_filename(key), "path_or_fileobj": f.name, }) self._api.upload_folder( repo_id=self._repo, repo_type="dataset", folder_path=None, # type: ignore path_in_repo="", # use individual file uploads instead ) # Simpler: upload each file for op in ops: self._api.upload_file( repo_id=self._repo, repo_type="dataset", path_in_repo=op["path_in_repo"], path_or_fileobj=op["path_or_fileobj"], token=HF_TOKEN, ) logger.info("PersistentStore: committed %d keys to HF", len(batch)) except Exception as e: logger.error("PersistentStore: commit failed: %s", e) # Re-queue failed writes with self._lock: for key, value in batch.items(): if key not in self._dirty: self._dirty[key] = value finally: for f in tmp_files: try: os.unlink(f) except Exception: pass @staticmethod def _key_to_filename(key: str) -> str: """cases:abc123 → cases/abc123.json""" return key.replace(":", "/") + ".json" @staticmethod def _filename_to_key(filename: str) -> str: """cases/abc123.json → cases:abc123""" return filename.replace("/", ":").removesuffix(".json") def _build_store(): """Pick the right backend based on environment.""" if IS_HF_SPACE and HF_TOKEN and HF_REPO_ID: logger.info("PersistentStore: using HF Datasets backend (repo: %s)", HF_REPO_ID) try: return _HFDatasetStore() except ImportError: logger.warning("huggingface_hub not installed — falling back to local store") if IS_HF_SPACE and not HF_REPO_ID: logger.warning( "PersistentStore: running on HF Space but HF_STORE_REPO is not set! " "All data will be lost on restart. Set HF_STORE_REPO=YourUsername/janus-memory " "in Space Secrets." ) return _LocalStore() # Module-level singleton store = _build_store()