AiDebuggerClean / brain_server /api /firebase_store.py
MrA7A3's picture
Initial modernized KAPO runtime upload
564b5ea verified
"""Shared-state store for brain runtime via Google Drive, local files, or Firebase."""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from pathlib import Path
from typing import Any
from shared.google_drive_state import GoogleDriveStateClient
class FirebaseStore:
def __init__(self, component: str, logger_name: str = "kapo.brain.firebase") -> None:
self.component = component
self.logger = logging.getLogger(logger_name)
self._db = None
self._lock = threading.Lock()
self._read_cache: dict[str, tuple[float, Any]] = {}
self._list_cache: dict[str, tuple[float, list[dict[str, Any]]]] = {}
self._write_cache: dict[str, tuple[float, str]] = {}
self._quota_backoff_until: float = 0.0
self._drive = GoogleDriveStateClient(self.logger)
def backend(self) -> str:
configured = str(os.getenv("KAPO_SHARED_STATE_BACKEND", "")).strip().lower()
if configured in {"google_drive", "drive", "gdrive"}:
return "google_drive"
if configured in {"file", "files"}:
return "file"
if configured in {"firebase", "firestore"}:
return "firebase"
if configured in {"disabled", "off", "none"}:
return "disabled"
if self._drive.enabled():
return "google_drive"
if str(os.getenv("FIREBASE_ENABLED", "0")).strip().lower() in {"1", "true", "yes", "on"}:
return "firebase"
return "file"
def enabled(self) -> bool:
return self.backend() != "disabled"
def namespace(self) -> str:
return str(os.getenv("FIREBASE_NAMESPACE", "kapo")).strip() or "kapo"
def storage_root(self) -> Path:
configured = str(os.getenv("KAPO_SHARED_STATE_DIR", "")).strip()
if configured:
root = Path(configured).expanduser()
if not root.is_absolute():
root = Path.cwd().resolve() / root
else:
root = (Path.cwd().resolve() / "data" / "local" / "shared_state").resolve()
root.mkdir(parents=True, exist_ok=True)
return root
def _service_payload(self) -> dict[str, Any] | None:
raw = str(os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON", "")).strip()
if not raw:
return None
try:
return json.loads(raw)
except Exception:
self.logger.exception("Invalid Firebase service account JSON")
return None
@staticmethod
def _service_path() -> str:
return str(os.getenv("FIREBASE_SERVICE_ACCOUNT_PATH", "")).strip()
def _client(self):
if self.backend() != "firebase":
return None
with self._lock:
if self._db is not None:
return self._db
try:
import firebase_admin
from firebase_admin import credentials, firestore
if not firebase_admin._apps:
payload = self._service_payload()
if payload:
cred = credentials.Certificate(payload)
else:
service_path = self._service_path()
if not service_path:
return None
path_obj = Path(service_path).expanduser()
if not path_obj.exists() or not path_obj.is_file():
self.logger.warning(
"Firebase service account path is unavailable on this runtime: %s",
service_path,
)
return None
cred = credentials.Certificate(str(path_obj.resolve()))
options = {}
project_id = str(os.getenv("FIREBASE_PROJECT_ID", "")).strip()
if project_id:
options["projectId"] = project_id
firebase_admin.initialize_app(cred, options or None)
self._db = firestore.client()
return self._db
except Exception:
self.logger.exception("Failed to initialize Firebase client")
return None
def _collection(self, name: str) -> str:
return f"{self.namespace()}_{name}"
@staticmethod
def _safe_id(value: str, default: str = "default") -> str:
text = str(value or "").strip() or default
return "".join(ch if ch.isalnum() or ch in {"-", "_", "."} else "_" for ch in text)[:180]
@staticmethod
def _payload_hash(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str)
@staticmethod
def _is_quota_error(exc: Exception) -> bool:
text = str(exc or "").lower()
return "resourceexhausted" in text or "quota exceeded" in text or "429" in text
def _quota_backoff_active(self) -> bool:
return time.time() < self._quota_backoff_until
def _activate_quota_backoff(self, seconds: float | None = None) -> None:
delay = float(seconds or os.getenv("FIREBASE_QUOTA_BACKOFF_SEC", "120") or 120)
self._quota_backoff_until = max(self._quota_backoff_until, time.time() + max(5.0, delay))
def _should_skip_write(self, key: str, payload: Any, min_interval_sec: float) -> bool:
now = time.time()
payload_hash = self._payload_hash(payload)
last = self._write_cache.get(key)
if last and last[1] == payload_hash and (now - last[0]) < min_interval_sec:
return True
self._write_cache[key] = (now, payload_hash)
return False
def _file_collection_dir(self, collection: str) -> Path:
path = self.storage_root() / self._safe_id(collection, "collection")
path.mkdir(parents=True, exist_ok=True)
return path
def _file_doc_path(self, collection: str, doc_id: str) -> Path:
return self._file_collection_dir(collection) / f"{self._safe_id(doc_id)}.json"
def _write_json_atomic(self, path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(f"{path.suffix}.tmp")
tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
tmp.replace(path)
def get_document(self, collection: str, doc_id: str, ttl_sec: float = 12.0) -> dict[str, Any]:
safe_doc = self._safe_id(doc_id)
cache_key = f"{collection}:{safe_doc}"
now = time.time()
cached = self._read_cache.get(cache_key)
if cached and (now - cached[0]) < ttl_sec:
return dict(cached[1] or {})
backend = self.backend()
if backend == "google_drive":
payload = self._drive.get_document(collection, safe_doc)
self._read_cache[cache_key] = (now, payload)
return dict(payload or {})
if backend == "file":
path = self._file_doc_path(collection, safe_doc)
if not path.exists():
return {}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
self.logger.warning("Failed to read shared-state file %s", path, exc_info=True)
return {}
self._read_cache[cache_key] = (now, payload)
return dict(payload or {})
if self._quota_backoff_active():
return dict(cached[1] or {}) if cached else {}
db = self._client()
if db is None:
return dict(cached[1] or {}) if cached else {}
try:
snapshot = db.collection(self._collection(collection)).document(safe_doc).get()
payload = snapshot.to_dict() if snapshot.exists else {}
self._read_cache[cache_key] = (now, payload)
return dict(payload or {})
except Exception as exc:
if self._is_quota_error(exc):
self._activate_quota_backoff()
self.logger.warning("Firebase quota exceeded while reading %s/%s; using cache/backoff", collection, safe_doc)
else:
self.logger.exception("Failed to read Firebase document %s/%s", collection, safe_doc)
return dict(cached[1] or {}) if cached else {}
def set_document(self, collection: str, doc_id: str, payload: dict[str, Any], merge: bool = True, min_interval_sec: float = 5.0) -> bool:
safe_doc = self._safe_id(doc_id)
cache_key = f"{collection}:{safe_doc}"
body = dict(payload or {})
body["component"] = self.component
body["updated_at"] = time.time()
if self._should_skip_write(cache_key, body, min_interval_sec):
return True
backend = self.backend()
if backend == "google_drive":
stored = self._drive.set_document(collection, safe_doc, body, merge=merge)
if stored:
self._read_cache.pop(cache_key, None)
stale_prefix = f"{collection}:list:"
for key in list(self._list_cache.keys()):
if key.startswith(stale_prefix):
self._list_cache.pop(key, None)
return stored
if backend == "file":
try:
path = self._file_doc_path(collection, safe_doc)
existing = {}
if merge and path.exists():
existing = json.loads(path.read_text(encoding="utf-8"))
combined = {**existing, **body} if merge else body
combined.setdefault("id", safe_doc)
self._write_json_atomic(path, combined)
self._read_cache.pop(cache_key, None)
stale_prefix = f"{collection}:list:"
for key in list(self._list_cache.keys()):
if key.startswith(stale_prefix):
self._list_cache.pop(key, None)
return True
except Exception:
self.logger.warning("Failed to write shared-state file %s/%s", collection, safe_doc, exc_info=True)
return False
if self._quota_backoff_active():
return False
db = self._client()
if db is None:
return False
try:
if self._should_skip_write(cache_key, body, min_interval_sec):
return True
db.collection(self._collection(collection)).document(safe_doc).set(body, merge=merge)
self._read_cache.pop(cache_key, None)
stale_prefix = f"{collection}:list:"
for key in list(self._list_cache.keys()):
if key.startswith(stale_prefix):
self._list_cache.pop(key, None)
return True
except Exception as exc:
if self._is_quota_error(exc):
self._activate_quota_backoff()
self.logger.warning("Firebase quota exceeded while writing %s/%s; write skipped", collection, safe_doc)
else:
self.logger.exception("Failed to write Firebase document %s/%s", collection, safe_doc)
return False
def list_documents(self, collection: str, limit: int = 200, ttl_sec: float = 30.0) -> list[dict[str, Any]]:
cache_key = f"{collection}:list:{max(1, int(limit))}"
now = time.time()
cached = self._list_cache.get(cache_key)
if cached and (now - cached[0]) < ttl_sec:
return [dict(item) for item in (cached[1] or [])]
backend = self.backend()
if backend == "google_drive":
items = self._drive.list_documents(collection, limit=max(1, int(limit)))
self._list_cache[cache_key] = (now, [dict(item) for item in items])
return items
if backend == "file":
items: list[dict[str, Any]] = []
try:
paths = sorted(
self._file_collection_dir(collection).glob("*.json"),
key=lambda path: path.stat().st_mtime,
reverse=True,
)
for path in paths[: max(1, int(limit))]:
payload = json.loads(path.read_text(encoding="utf-8"))
payload.setdefault("id", path.stem)
items.append(payload)
except Exception:
self.logger.warning("Failed to list shared-state files for %s", collection, exc_info=True)
self._list_cache[cache_key] = (now, [dict(item) for item in items])
return items
if self._quota_backoff_active():
return [dict(item) for item in ((cached[1] if cached else []) or [])]
db = self._client()
if db is None:
return [dict(item) for item in ((cached[1] if cached else []) or [])]
try:
docs = db.collection(self._collection(collection)).limit(max(1, int(limit))).stream()
items: list[dict[str, Any]] = []
for doc in docs:
payload = doc.to_dict() or {}
payload.setdefault("id", doc.id)
items.append(payload)
self._list_cache[cache_key] = (now, [dict(item) for item in items])
return items
except Exception as exc:
if self._is_quota_error(exc):
self._activate_quota_backoff()
self.logger.warning("Firebase quota exceeded while listing %s; using cache/backoff", collection)
else:
self.logger.exception("Failed to list Firebase collection %s", collection)
return [dict(item) for item in ((cached[1] if cached else []) or [])]
def delete_document(self, collection: str, doc_id: str) -> bool:
safe_doc = self._safe_id(doc_id)
backend = self.backend()
if backend == "google_drive":
deleted = self._drive.delete_document(collection, safe_doc)
if deleted:
self._read_cache.pop(f"{collection}:{safe_doc}", None)
self._write_cache.pop(f"{collection}:{safe_doc}", None)
stale_prefix = f"{collection}:list:"
for key in list(self._list_cache.keys()):
if key.startswith(stale_prefix):
self._list_cache.pop(key, None)
return deleted
if backend == "file":
try:
path = self._file_doc_path(collection, safe_doc)
if path.exists():
path.unlink()
self._read_cache.pop(f"{collection}:{safe_doc}", None)
self._write_cache.pop(f"{collection}:{safe_doc}", None)
stale_prefix = f"{collection}:list:"
for key in list(self._list_cache.keys()):
if key.startswith(stale_prefix):
self._list_cache.pop(key, None)
return True
except Exception:
self.logger.warning("Failed to delete shared-state file %s/%s", collection, safe_doc, exc_info=True)
return False
if self._quota_backoff_active():
return False
db = self._client()
if db is None:
return False
try:
db.collection(self._collection(collection)).document(safe_doc).delete()
self._read_cache.pop(f"{collection}:{safe_doc}", None)
self._write_cache.pop(f"{collection}:{safe_doc}", None)
stale_prefix = f"{collection}:list:"
for key in list(self._list_cache.keys()):
if key.startswith(stale_prefix):
self._list_cache.pop(key, None)
return True
except Exception as exc:
if self._is_quota_error(exc):
self._activate_quota_backoff()
self.logger.warning("Firebase quota exceeded while deleting %s/%s; delete skipped", collection, safe_doc)
else:
self.logger.exception("Failed to delete Firebase document %s/%s", collection, safe_doc)
return False