Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from typing import Any, Dict, List, Mapping, Sequence, Union | |
| import os | |
| import logging | |
| import chromadb | |
| from chromadb.config import Settings | |
| from chromadb.api.types import Metadata | |
| import numpy as np | |
| class ActionVectorStore: | |
| """Persistent ChromaDB store for action embeddings. | |
| - Collection name: "actions" | |
| - Persistent directory: "chroma_db/" | |
| - Uses cosine distance and converts to similarity (1 - distance) | |
| """ | |
| def __init__(self, persist_directory: str = "chroma_db") -> None: | |
| # Hard-disable ChromaDB telemetry to avoid PostHog capture errors | |
| os.environ.setdefault("CHROMADB_ANONYMIZED_TELEMETRY", "false") | |
| os.environ.setdefault("ANONYMIZED_TELEMETRY", "false") | |
| os.environ.setdefault("CHROMADB_DISABLE_TELEMETRY", "1") | |
| os.environ.setdefault("CHROMADB_TELEMETRY_IMPLEMENTATION", "noop") | |
| # Ensure default tenant/database environment variables for Chroma 0.5+ | |
| os.environ.setdefault("CHROMADB_DEFAULT_TENANT", "default_tenant") | |
| os.environ.setdefault("CHROMADB_DEFAULT_DATABASE", "default_database") | |
| # Monkeypatch PostHog capture to a no-op to avoid signature errors | |
| try: # pragma: no cover | |
| import posthog # type: ignore | |
| def _silent_capture(*args: Any, **kwargs: Any) -> None: | |
| return None | |
| def _silent_identify(*args: Any, **kwargs: Any) -> None: | |
| return None | |
| posthog.capture = _silent_capture # type: ignore[attr-defined] | |
| posthog.identify = _silent_identify # type: ignore[attr-defined] | |
| except Exception: | |
| pass | |
| # Silence telemetry/log noise | |
| logging.getLogger("chromadb").setLevel(logging.ERROR) | |
| logging.getLogger("chromadb.telemetry").setLevel(logging.ERROR) | |
| # Disable telemetry via client settings too, and use absolute path | |
| abs_path = os.path.abspath(persist_directory) | |
| try: | |
| self.client = chromadb.PersistentClient( | |
| path=abs_path, | |
| settings=Settings(anonymized_telemetry=False), | |
| ) | |
| except ValueError: | |
| # Fallback: reset directory and retry PersistentClient; if still failing, use local Client | |
| try: | |
| import shutil | |
| shutil.rmtree(abs_path, ignore_errors=True) | |
| except Exception: | |
| pass | |
| os.makedirs(abs_path, exist_ok=True) | |
| try: | |
| self.client = chromadb.PersistentClient( | |
| path=abs_path, | |
| settings=Settings(anonymized_telemetry=False), | |
| ) | |
| except ValueError: | |
| # Final fallback to non-tenant local client | |
| self.client = chromadb.Client( | |
| Settings( | |
| anonymized_telemetry=False, | |
| chroma_api_impl="local", | |
| persist_directory=abs_path, | |
| ) | |
| ) | |
| # Ensure cosine space for distances | |
| self.collection = self.client.get_or_create_collection( | |
| name="actions", | |
| metadata={"hnsw:space": "cosine"}, | |
| ) | |
| def upsert_actions( | |
| self, | |
| ids: Sequence[str], | |
| documents: Sequence[str], | |
| embeddings: Any, | |
| metadatas: Sequence[Mapping[str, Union[str, int, float, bool]]], | |
| ) -> None: | |
| """Upsert action documents with embeddings and metadata.""" | |
| # Convert to float32 numpy array to satisfy Chroma's expected types | |
| embeddings_np = np.asarray(embeddings, dtype=np.float32) | |
| # Sanitize metadata values to primitives (str/int/float/bool) | |
| def _sanitize(md: Mapping[str, Any]) -> Dict[str, Union[str, int, float, bool]]: | |
| out: Dict[str, Union[str, int, float, bool]] = {} | |
| for k, v in md.items(): | |
| if v is None: | |
| out[k] = "" | |
| elif isinstance(v, (str, int, float, bool)): | |
| out[k] = v | |
| else: | |
| out[k] = str(v) | |
| return out | |
| metadatas_sanitized: List[Metadata] = [_sanitize(m) for m in list(metadatas)] | |
| # Chroma 0.5+ supports upsert; fall back to add if needed. | |
| if hasattr(self.collection, "upsert"): | |
| self.collection.upsert( | |
| ids=list(ids), | |
| documents=list(documents), | |
| embeddings=embeddings_np, | |
| metadatas=metadatas_sanitized, | |
| ) | |
| else: # pragma: no cover | |
| self.collection.add( | |
| ids=list(ids), | |
| documents=list(documents), | |
| embeddings=embeddings_np, | |
| metadatas=metadatas_sanitized, | |
| ) | |
| def query_by_embedding( | |
| self, embedding: List[float], top_k: int = 5 | |
| ) -> List[Dict[str, Any]]: | |
| """Query similar actions by embedding. | |
| Returns list of dicts: {id, similarity, metadata, document} | |
| """ | |
| res = self.collection.query( | |
| query_embeddings=[list(embedding)], | |
| n_results=top_k, | |
| include=["distances", "metadatas", "documents"], | |
| ) | |
| ids = (res.get("ids") or [[]])[0] | |
| dists = (res.get("distances") or [[]])[0] | |
| metas = (res.get("metadatas") or [[]])[0] | |
| docs = (res.get("documents") or [[]])[0] | |
| out: List[Dict[str, Any]] = [] | |
| for i, _id in enumerate(ids): | |
| dist = float(dists[i]) if i < len(dists) else 1.0 | |
| sim = max(0.0, min(1.0, 1.0 - dist)) # convert cosine distance → similarity | |
| out.append( | |
| { | |
| "id": _id, | |
| "similarity": sim, | |
| "metadata": metas[i] if i < len(metas) else {}, | |
| "document": docs[i] if i < len(docs) else "", | |
| } | |
| ) | |
| return out | |