"""Shared-state store for brain runtime via Google Drive, local files, or Firebase.""" from __future__ import annotations import json import logging import os import threading import time from pathlib import Path from typing import Any from shared.google_drive_state import GoogleDriveStateClient class FirebaseStore: def __init__(self, component: str, logger_name: str = "kapo.brain.firebase") -> None: self.component = component self.logger = logging.getLogger(logger_name) self._db = None self._lock = threading.Lock() self._read_cache: dict[str, tuple[float, Any]] = {} self._list_cache: dict[str, tuple[float, list[dict[str, Any]]]] = {} self._write_cache: dict[str, tuple[float, str]] = {} self._quota_backoff_until: float = 0.0 self._drive = GoogleDriveStateClient(self.logger) def backend(self) -> str: configured = str(os.getenv("KAPO_SHARED_STATE_BACKEND", "")).strip().lower() if configured in {"google_drive", "drive", "gdrive"}: return "google_drive" if configured in {"file", "files"}: return "file" if configured in {"firebase", "firestore"}: return "firebase" if configured in {"disabled", "off", "none"}: return "disabled" if self._drive.enabled(): return "google_drive" if str(os.getenv("FIREBASE_ENABLED", "0")).strip().lower() in {"1", "true", "yes", "on"}: return "firebase" return "file" def enabled(self) -> bool: return self.backend() != "disabled" def namespace(self) -> str: return str(os.getenv("FIREBASE_NAMESPACE", "kapo")).strip() or "kapo" def storage_root(self) -> Path: configured = str(os.getenv("KAPO_SHARED_STATE_DIR", "")).strip() if configured: root = Path(configured).expanduser() if not root.is_absolute(): root = Path.cwd().resolve() / root else: root = (Path.cwd().resolve() / "data" / "local" / "shared_state").resolve() root.mkdir(parents=True, exist_ok=True) return root def _service_payload(self) -> dict[str, Any] | None: raw = str(os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON", "")).strip() if not raw: return None try: return json.loads(raw) except Exception: self.logger.exception("Invalid Firebase service account JSON") return None @staticmethod def _service_path() -> str: return str(os.getenv("FIREBASE_SERVICE_ACCOUNT_PATH", "")).strip() def _client(self): if self.backend() != "firebase": return None with self._lock: if self._db is not None: return self._db try: import firebase_admin from firebase_admin import credentials, firestore if not firebase_admin._apps: payload = self._service_payload() if payload: cred = credentials.Certificate(payload) else: service_path = self._service_path() if not service_path: return None path_obj = Path(service_path).expanduser() if not path_obj.exists() or not path_obj.is_file(): self.logger.warning( "Firebase service account path is unavailable on this runtime: %s", service_path, ) return None cred = credentials.Certificate(str(path_obj.resolve())) options = {} project_id = str(os.getenv("FIREBASE_PROJECT_ID", "")).strip() if project_id: options["projectId"] = project_id firebase_admin.initialize_app(cred, options or None) self._db = firestore.client() return self._db except Exception: self.logger.exception("Failed to initialize Firebase client") return None def _collection(self, name: str) -> str: return f"{self.namespace()}_{name}" @staticmethod def _safe_id(value: str, default: str = "default") -> str: text = str(value or "").strip() or default return "".join(ch if ch.isalnum() or ch in {"-", "_", "."} else "_" for ch in text)[:180] @staticmethod def _payload_hash(payload: Any) -> str: return json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str) @staticmethod def _is_quota_error(exc: Exception) -> bool: text = str(exc or "").lower() return "resourceexhausted" in text or "quota exceeded" in text or "429" in text def _quota_backoff_active(self) -> bool: return time.time() < self._quota_backoff_until def _activate_quota_backoff(self, seconds: float | None = None) -> None: delay = float(seconds or os.getenv("FIREBASE_QUOTA_BACKOFF_SEC", "120") or 120) self._quota_backoff_until = max(self._quota_backoff_until, time.time() + max(5.0, delay)) def _should_skip_write(self, key: str, payload: Any, min_interval_sec: float) -> bool: now = time.time() payload_hash = self._payload_hash(payload) last = self._write_cache.get(key) if last and last[1] == payload_hash and (now - last[0]) < min_interval_sec: return True self._write_cache[key] = (now, payload_hash) return False def _file_collection_dir(self, collection: str) -> Path: path = self.storage_root() / self._safe_id(collection, "collection") path.mkdir(parents=True, exist_ok=True) return path def _file_doc_path(self, collection: str, doc_id: str) -> Path: return self._file_collection_dir(collection) / f"{self._safe_id(doc_id)}.json" def _write_json_atomic(self, path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(f"{path.suffix}.tmp") tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") tmp.replace(path) def get_document(self, collection: str, doc_id: str, ttl_sec: float = 12.0) -> dict[str, Any]: safe_doc = self._safe_id(doc_id) cache_key = f"{collection}:{safe_doc}" now = time.time() cached = self._read_cache.get(cache_key) if cached and (now - cached[0]) < ttl_sec: return dict(cached[1] or {}) backend = self.backend() if backend == "google_drive": payload = self._drive.get_document(collection, safe_doc) self._read_cache[cache_key] = (now, payload) return dict(payload or {}) if backend == "file": path = self._file_doc_path(collection, safe_doc) if not path.exists(): return {} try: payload = json.loads(path.read_text(encoding="utf-8")) except Exception: self.logger.warning("Failed to read shared-state file %s", path, exc_info=True) return {} self._read_cache[cache_key] = (now, payload) return dict(payload or {}) if self._quota_backoff_active(): return dict(cached[1] or {}) if cached else {} db = self._client() if db is None: return dict(cached[1] or {}) if cached else {} try: snapshot = db.collection(self._collection(collection)).document(safe_doc).get() payload = snapshot.to_dict() if snapshot.exists else {} self._read_cache[cache_key] = (now, payload) return dict(payload or {}) except Exception as exc: if self._is_quota_error(exc): self._activate_quota_backoff() self.logger.warning("Firebase quota exceeded while reading %s/%s; using cache/backoff", collection, safe_doc) else: self.logger.exception("Failed to read Firebase document %s/%s", collection, safe_doc) return dict(cached[1] or {}) if cached else {} def set_document(self, collection: str, doc_id: str, payload: dict[str, Any], merge: bool = True, min_interval_sec: float = 5.0) -> bool: safe_doc = self._safe_id(doc_id) cache_key = f"{collection}:{safe_doc}" body = dict(payload or {}) body["component"] = self.component body["updated_at"] = time.time() if self._should_skip_write(cache_key, body, min_interval_sec): return True backend = self.backend() if backend == "google_drive": stored = self._drive.set_document(collection, safe_doc, body, merge=merge) if stored: self._read_cache.pop(cache_key, None) stale_prefix = f"{collection}:list:" for key in list(self._list_cache.keys()): if key.startswith(stale_prefix): self._list_cache.pop(key, None) return stored if backend == "file": try: path = self._file_doc_path(collection, safe_doc) existing = {} if merge and path.exists(): existing = json.loads(path.read_text(encoding="utf-8")) combined = {**existing, **body} if merge else body combined.setdefault("id", safe_doc) self._write_json_atomic(path, combined) self._read_cache.pop(cache_key, None) stale_prefix = f"{collection}:list:" for key in list(self._list_cache.keys()): if key.startswith(stale_prefix): self._list_cache.pop(key, None) return True except Exception: self.logger.warning("Failed to write shared-state file %s/%s", collection, safe_doc, exc_info=True) return False if self._quota_backoff_active(): return False db = self._client() if db is None: return False try: if self._should_skip_write(cache_key, body, min_interval_sec): return True db.collection(self._collection(collection)).document(safe_doc).set(body, merge=merge) self._read_cache.pop(cache_key, None) stale_prefix = f"{collection}:list:" for key in list(self._list_cache.keys()): if key.startswith(stale_prefix): self._list_cache.pop(key, None) return True except Exception as exc: if self._is_quota_error(exc): self._activate_quota_backoff() self.logger.warning("Firebase quota exceeded while writing %s/%s; write skipped", collection, safe_doc) else: self.logger.exception("Failed to write Firebase document %s/%s", collection, safe_doc) return False def list_documents(self, collection: str, limit: int = 200, ttl_sec: float = 30.0) -> list[dict[str, Any]]: cache_key = f"{collection}:list:{max(1, int(limit))}" now = time.time() cached = self._list_cache.get(cache_key) if cached and (now - cached[0]) < ttl_sec: return [dict(item) for item in (cached[1] or [])] backend = self.backend() if backend == "google_drive": items = self._drive.list_documents(collection, limit=max(1, int(limit))) self._list_cache[cache_key] = (now, [dict(item) for item in items]) return items if backend == "file": items: list[dict[str, Any]] = [] try: paths = sorted( self._file_collection_dir(collection).glob("*.json"), key=lambda path: path.stat().st_mtime, reverse=True, ) for path in paths[: max(1, int(limit))]: payload = json.loads(path.read_text(encoding="utf-8")) payload.setdefault("id", path.stem) items.append(payload) except Exception: self.logger.warning("Failed to list shared-state files for %s", collection, exc_info=True) self._list_cache[cache_key] = (now, [dict(item) for item in items]) return items if self._quota_backoff_active(): return [dict(item) for item in ((cached[1] if cached else []) or [])] db = self._client() if db is None: return [dict(item) for item in ((cached[1] if cached else []) or [])] try: docs = db.collection(self._collection(collection)).limit(max(1, int(limit))).stream() items: list[dict[str, Any]] = [] for doc in docs: payload = doc.to_dict() or {} payload.setdefault("id", doc.id) items.append(payload) self._list_cache[cache_key] = (now, [dict(item) for item in items]) return items except Exception as exc: if self._is_quota_error(exc): self._activate_quota_backoff() self.logger.warning("Firebase quota exceeded while listing %s; using cache/backoff", collection) else: self.logger.exception("Failed to list Firebase collection %s", collection) return [dict(item) for item in ((cached[1] if cached else []) or [])] def delete_document(self, collection: str, doc_id: str) -> bool: safe_doc = self._safe_id(doc_id) backend = self.backend() if backend == "google_drive": deleted = self._drive.delete_document(collection, safe_doc) if deleted: self._read_cache.pop(f"{collection}:{safe_doc}", None) self._write_cache.pop(f"{collection}:{safe_doc}", None) stale_prefix = f"{collection}:list:" for key in list(self._list_cache.keys()): if key.startswith(stale_prefix): self._list_cache.pop(key, None) return deleted if backend == "file": try: path = self._file_doc_path(collection, safe_doc) if path.exists(): path.unlink() self._read_cache.pop(f"{collection}:{safe_doc}", None) self._write_cache.pop(f"{collection}:{safe_doc}", None) stale_prefix = f"{collection}:list:" for key in list(self._list_cache.keys()): if key.startswith(stale_prefix): self._list_cache.pop(key, None) return True except Exception: self.logger.warning("Failed to delete shared-state file %s/%s", collection, safe_doc, exc_info=True) return False if self._quota_backoff_active(): return False db = self._client() if db is None: return False try: db.collection(self._collection(collection)).document(safe_doc).delete() self._read_cache.pop(f"{collection}:{safe_doc}", None) self._write_cache.pop(f"{collection}:{safe_doc}", None) stale_prefix = f"{collection}:list:" for key in list(self._list_cache.keys()): if key.startswith(stale_prefix): self._list_cache.pop(key, None) return True except Exception as exc: if self._is_quota_error(exc): self._activate_quota_backoff() self.logger.warning("Firebase quota exceeded while deleting %s/%s; delete skipped", collection, safe_doc) else: self.logger.exception("Failed to delete Firebase document %s/%s", collection, safe_doc) return False