Spaces:
Sleeping
Sleeping
| """Shared-state store for brain runtime via Google Drive, local files, or Firebase.""" | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import os | |
| import threading | |
| import time | |
| from pathlib import Path | |
| from typing import Any | |
| from shared.google_drive_state import GoogleDriveStateClient | |
| class FirebaseStore: | |
| def __init__(self, component: str, logger_name: str = "kapo.brain.firebase") -> None: | |
| self.component = component | |
| self.logger = logging.getLogger(logger_name) | |
| self._db = None | |
| self._lock = threading.Lock() | |
| self._read_cache: dict[str, tuple[float, Any]] = {} | |
| self._list_cache: dict[str, tuple[float, list[dict[str, Any]]]] = {} | |
| self._write_cache: dict[str, tuple[float, str]] = {} | |
| self._quota_backoff_until: float = 0.0 | |
| self._drive = GoogleDriveStateClient(self.logger) | |
| def backend(self) -> str: | |
| configured = str(os.getenv("KAPO_SHARED_STATE_BACKEND", "")).strip().lower() | |
| if configured in {"google_drive", "drive", "gdrive"}: | |
| return "google_drive" | |
| if configured in {"file", "files"}: | |
| return "file" | |
| if configured in {"firebase", "firestore"}: | |
| return "firebase" | |
| if configured in {"disabled", "off", "none"}: | |
| return "disabled" | |
| if self._drive.enabled(): | |
| return "google_drive" | |
| if str(os.getenv("FIREBASE_ENABLED", "0")).strip().lower() in {"1", "true", "yes", "on"}: | |
| return "firebase" | |
| return "file" | |
| def enabled(self) -> bool: | |
| return self.backend() != "disabled" | |
| def namespace(self) -> str: | |
| return str(os.getenv("FIREBASE_NAMESPACE", "kapo")).strip() or "kapo" | |
| def storage_root(self) -> Path: | |
| configured = str(os.getenv("KAPO_SHARED_STATE_DIR", "")).strip() | |
| if configured: | |
| root = Path(configured).expanduser() | |
| if not root.is_absolute(): | |
| root = Path.cwd().resolve() / root | |
| else: | |
| root = (Path.cwd().resolve() / "data" / "local" / "shared_state").resolve() | |
| root.mkdir(parents=True, exist_ok=True) | |
| return root | |
| def _service_payload(self) -> dict[str, Any] | None: | |
| raw = str(os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON", "")).strip() | |
| if not raw: | |
| return None | |
| try: | |
| return json.loads(raw) | |
| except Exception: | |
| self.logger.exception("Invalid Firebase service account JSON") | |
| return None | |
| def _service_path() -> str: | |
| return str(os.getenv("FIREBASE_SERVICE_ACCOUNT_PATH", "")).strip() | |
| def _client(self): | |
| if self.backend() != "firebase": | |
| return None | |
| with self._lock: | |
| if self._db is not None: | |
| return self._db | |
| try: | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| if not firebase_admin._apps: | |
| payload = self._service_payload() | |
| if payload: | |
| cred = credentials.Certificate(payload) | |
| else: | |
| service_path = self._service_path() | |
| if not service_path: | |
| return None | |
| path_obj = Path(service_path).expanduser() | |
| if not path_obj.exists() or not path_obj.is_file(): | |
| self.logger.warning( | |
| "Firebase service account path is unavailable on this runtime: %s", | |
| service_path, | |
| ) | |
| return None | |
| cred = credentials.Certificate(str(path_obj.resolve())) | |
| options = {} | |
| project_id = str(os.getenv("FIREBASE_PROJECT_ID", "")).strip() | |
| if project_id: | |
| options["projectId"] = project_id | |
| firebase_admin.initialize_app(cred, options or None) | |
| self._db = firestore.client() | |
| return self._db | |
| except Exception: | |
| self.logger.exception("Failed to initialize Firebase client") | |
| return None | |
| def _collection(self, name: str) -> str: | |
| return f"{self.namespace()}_{name}" | |
| def _safe_id(value: str, default: str = "default") -> str: | |
| text = str(value or "").strip() or default | |
| return "".join(ch if ch.isalnum() or ch in {"-", "_", "."} else "_" for ch in text)[:180] | |
| def _payload_hash(payload: Any) -> str: | |
| return json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str) | |
| def _is_quota_error(exc: Exception) -> bool: | |
| text = str(exc or "").lower() | |
| return "resourceexhausted" in text or "quota exceeded" in text or "429" in text | |
| def _quota_backoff_active(self) -> bool: | |
| return time.time() < self._quota_backoff_until | |
| def _activate_quota_backoff(self, seconds: float | None = None) -> None: | |
| delay = float(seconds or os.getenv("FIREBASE_QUOTA_BACKOFF_SEC", "120") or 120) | |
| self._quota_backoff_until = max(self._quota_backoff_until, time.time() + max(5.0, delay)) | |
| def _should_skip_write(self, key: str, payload: Any, min_interval_sec: float) -> bool: | |
| now = time.time() | |
| payload_hash = self._payload_hash(payload) | |
| last = self._write_cache.get(key) | |
| if last and last[1] == payload_hash and (now - last[0]) < min_interval_sec: | |
| return True | |
| self._write_cache[key] = (now, payload_hash) | |
| return False | |
| def _file_collection_dir(self, collection: str) -> Path: | |
| path = self.storage_root() / self._safe_id(collection, "collection") | |
| path.mkdir(parents=True, exist_ok=True) | |
| return path | |
| def _file_doc_path(self, collection: str, doc_id: str) -> Path: | |
| return self._file_collection_dir(collection) / f"{self._safe_id(doc_id)}.json" | |
| def _write_json_atomic(self, path: Path, payload: dict[str, Any]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| tmp = path.with_suffix(f"{path.suffix}.tmp") | |
| tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| tmp.replace(path) | |
| def get_document(self, collection: str, doc_id: str, ttl_sec: float = 12.0) -> dict[str, Any]: | |
| safe_doc = self._safe_id(doc_id) | |
| cache_key = f"{collection}:{safe_doc}" | |
| now = time.time() | |
| cached = self._read_cache.get(cache_key) | |
| if cached and (now - cached[0]) < ttl_sec: | |
| return dict(cached[1] or {}) | |
| backend = self.backend() | |
| if backend == "google_drive": | |
| payload = self._drive.get_document(collection, safe_doc) | |
| self._read_cache[cache_key] = (now, payload) | |
| return dict(payload or {}) | |
| if backend == "file": | |
| path = self._file_doc_path(collection, safe_doc) | |
| if not path.exists(): | |
| return {} | |
| try: | |
| payload = json.loads(path.read_text(encoding="utf-8")) | |
| except Exception: | |
| self.logger.warning("Failed to read shared-state file %s", path, exc_info=True) | |
| return {} | |
| self._read_cache[cache_key] = (now, payload) | |
| return dict(payload or {}) | |
| if self._quota_backoff_active(): | |
| return dict(cached[1] or {}) if cached else {} | |
| db = self._client() | |
| if db is None: | |
| return dict(cached[1] or {}) if cached else {} | |
| try: | |
| snapshot = db.collection(self._collection(collection)).document(safe_doc).get() | |
| payload = snapshot.to_dict() if snapshot.exists else {} | |
| self._read_cache[cache_key] = (now, payload) | |
| return dict(payload or {}) | |
| except Exception as exc: | |
| if self._is_quota_error(exc): | |
| self._activate_quota_backoff() | |
| self.logger.warning("Firebase quota exceeded while reading %s/%s; using cache/backoff", collection, safe_doc) | |
| else: | |
| self.logger.exception("Failed to read Firebase document %s/%s", collection, safe_doc) | |
| return dict(cached[1] or {}) if cached else {} | |
| def set_document(self, collection: str, doc_id: str, payload: dict[str, Any], merge: bool = True, min_interval_sec: float = 5.0) -> bool: | |
| safe_doc = self._safe_id(doc_id) | |
| cache_key = f"{collection}:{safe_doc}" | |
| body = dict(payload or {}) | |
| body["component"] = self.component | |
| body["updated_at"] = time.time() | |
| if self._should_skip_write(cache_key, body, min_interval_sec): | |
| return True | |
| backend = self.backend() | |
| if backend == "google_drive": | |
| stored = self._drive.set_document(collection, safe_doc, body, merge=merge) | |
| if stored: | |
| self._read_cache.pop(cache_key, None) | |
| stale_prefix = f"{collection}:list:" | |
| for key in list(self._list_cache.keys()): | |
| if key.startswith(stale_prefix): | |
| self._list_cache.pop(key, None) | |
| return stored | |
| if backend == "file": | |
| try: | |
| path = self._file_doc_path(collection, safe_doc) | |
| existing = {} | |
| if merge and path.exists(): | |
| existing = json.loads(path.read_text(encoding="utf-8")) | |
| combined = {**existing, **body} if merge else body | |
| combined.setdefault("id", safe_doc) | |
| self._write_json_atomic(path, combined) | |
| self._read_cache.pop(cache_key, None) | |
| stale_prefix = f"{collection}:list:" | |
| for key in list(self._list_cache.keys()): | |
| if key.startswith(stale_prefix): | |
| self._list_cache.pop(key, None) | |
| return True | |
| except Exception: | |
| self.logger.warning("Failed to write shared-state file %s/%s", collection, safe_doc, exc_info=True) | |
| return False | |
| if self._quota_backoff_active(): | |
| return False | |
| db = self._client() | |
| if db is None: | |
| return False | |
| try: | |
| if self._should_skip_write(cache_key, body, min_interval_sec): | |
| return True | |
| db.collection(self._collection(collection)).document(safe_doc).set(body, merge=merge) | |
| self._read_cache.pop(cache_key, None) | |
| stale_prefix = f"{collection}:list:" | |
| for key in list(self._list_cache.keys()): | |
| if key.startswith(stale_prefix): | |
| self._list_cache.pop(key, None) | |
| return True | |
| except Exception as exc: | |
| if self._is_quota_error(exc): | |
| self._activate_quota_backoff() | |
| self.logger.warning("Firebase quota exceeded while writing %s/%s; write skipped", collection, safe_doc) | |
| else: | |
| self.logger.exception("Failed to write Firebase document %s/%s", collection, safe_doc) | |
| return False | |
| def list_documents(self, collection: str, limit: int = 200, ttl_sec: float = 30.0) -> list[dict[str, Any]]: | |
| cache_key = f"{collection}:list:{max(1, int(limit))}" | |
| now = time.time() | |
| cached = self._list_cache.get(cache_key) | |
| if cached and (now - cached[0]) < ttl_sec: | |
| return [dict(item) for item in (cached[1] or [])] | |
| backend = self.backend() | |
| if backend == "google_drive": | |
| items = self._drive.list_documents(collection, limit=max(1, int(limit))) | |
| self._list_cache[cache_key] = (now, [dict(item) for item in items]) | |
| return items | |
| if backend == "file": | |
| items: list[dict[str, Any]] = [] | |
| try: | |
| paths = sorted( | |
| self._file_collection_dir(collection).glob("*.json"), | |
| key=lambda path: path.stat().st_mtime, | |
| reverse=True, | |
| ) | |
| for path in paths[: max(1, int(limit))]: | |
| payload = json.loads(path.read_text(encoding="utf-8")) | |
| payload.setdefault("id", path.stem) | |
| items.append(payload) | |
| except Exception: | |
| self.logger.warning("Failed to list shared-state files for %s", collection, exc_info=True) | |
| self._list_cache[cache_key] = (now, [dict(item) for item in items]) | |
| return items | |
| if self._quota_backoff_active(): | |
| return [dict(item) for item in ((cached[1] if cached else []) or [])] | |
| db = self._client() | |
| if db is None: | |
| return [dict(item) for item in ((cached[1] if cached else []) or [])] | |
| try: | |
| docs = db.collection(self._collection(collection)).limit(max(1, int(limit))).stream() | |
| items: list[dict[str, Any]] = [] | |
| for doc in docs: | |
| payload = doc.to_dict() or {} | |
| payload.setdefault("id", doc.id) | |
| items.append(payload) | |
| self._list_cache[cache_key] = (now, [dict(item) for item in items]) | |
| return items | |
| except Exception as exc: | |
| if self._is_quota_error(exc): | |
| self._activate_quota_backoff() | |
| self.logger.warning("Firebase quota exceeded while listing %s; using cache/backoff", collection) | |
| else: | |
| self.logger.exception("Failed to list Firebase collection %s", collection) | |
| return [dict(item) for item in ((cached[1] if cached else []) or [])] | |
| def delete_document(self, collection: str, doc_id: str) -> bool: | |
| safe_doc = self._safe_id(doc_id) | |
| backend = self.backend() | |
| if backend == "google_drive": | |
| deleted = self._drive.delete_document(collection, safe_doc) | |
| if deleted: | |
| self._read_cache.pop(f"{collection}:{safe_doc}", None) | |
| self._write_cache.pop(f"{collection}:{safe_doc}", None) | |
| stale_prefix = f"{collection}:list:" | |
| for key in list(self._list_cache.keys()): | |
| if key.startswith(stale_prefix): | |
| self._list_cache.pop(key, None) | |
| return deleted | |
| if backend == "file": | |
| try: | |
| path = self._file_doc_path(collection, safe_doc) | |
| if path.exists(): | |
| path.unlink() | |
| self._read_cache.pop(f"{collection}:{safe_doc}", None) | |
| self._write_cache.pop(f"{collection}:{safe_doc}", None) | |
| stale_prefix = f"{collection}:list:" | |
| for key in list(self._list_cache.keys()): | |
| if key.startswith(stale_prefix): | |
| self._list_cache.pop(key, None) | |
| return True | |
| except Exception: | |
| self.logger.warning("Failed to delete shared-state file %s/%s", collection, safe_doc, exc_info=True) | |
| return False | |
| if self._quota_backoff_active(): | |
| return False | |
| db = self._client() | |
| if db is None: | |
| return False | |
| try: | |
| db.collection(self._collection(collection)).document(safe_doc).delete() | |
| self._read_cache.pop(f"{collection}:{safe_doc}", None) | |
| self._write_cache.pop(f"{collection}:{safe_doc}", None) | |
| stale_prefix = f"{collection}:list:" | |
| for key in list(self._list_cache.keys()): | |
| if key.startswith(stale_prefix): | |
| self._list_cache.pop(key, None) | |
| return True | |
| except Exception as exc: | |
| if self._is_quota_error(exc): | |
| self._activate_quota_backoff() | |
| self.logger.warning("Firebase quota exceeded while deleting %s/%s; delete skipped", collection, safe_doc) | |
| else: | |
| self.logger.exception("Failed to delete Firebase document %s/%s", collection, safe_doc) | |
| return False | |