strategy-sync-ai / src /vector_store.py
hirumunasinghe's picture
Upload folder using huggingface_hub
71793d1 verified
from __future__ import annotations
from typing import Any, Dict, List, Mapping, Sequence, Union
import os
import logging
import chromadb
from chromadb.config import Settings
from chromadb.api.types import Metadata
import numpy as np
class ActionVectorStore:
"""Persistent ChromaDB store for action embeddings.
- Collection name: "actions"
- Persistent directory: "chroma_db/"
- Uses cosine distance and converts to similarity (1 - distance)
"""
def __init__(self, persist_directory: str = "chroma_db") -> None:
# Hard-disable ChromaDB telemetry to avoid PostHog capture errors
os.environ.setdefault("CHROMADB_ANONYMIZED_TELEMETRY", "false")
os.environ.setdefault("ANONYMIZED_TELEMETRY", "false")
os.environ.setdefault("CHROMADB_DISABLE_TELEMETRY", "1")
os.environ.setdefault("CHROMADB_TELEMETRY_IMPLEMENTATION", "noop")
# Ensure default tenant/database environment variables for Chroma 0.5+
os.environ.setdefault("CHROMADB_DEFAULT_TENANT", "default_tenant")
os.environ.setdefault("CHROMADB_DEFAULT_DATABASE", "default_database")
# Monkeypatch PostHog capture to a no-op to avoid signature errors
try: # pragma: no cover
import posthog # type: ignore
def _silent_capture(*args: Any, **kwargs: Any) -> None:
return None
def _silent_identify(*args: Any, **kwargs: Any) -> None:
return None
posthog.capture = _silent_capture # type: ignore[attr-defined]
posthog.identify = _silent_identify # type: ignore[attr-defined]
except Exception:
pass
# Silence telemetry/log noise
logging.getLogger("chromadb").setLevel(logging.ERROR)
logging.getLogger("chromadb.telemetry").setLevel(logging.ERROR)
# Disable telemetry via client settings too, and use absolute path
abs_path = os.path.abspath(persist_directory)
try:
self.client = chromadb.PersistentClient(
path=abs_path,
settings=Settings(anonymized_telemetry=False),
)
except ValueError:
# Fallback: reset directory and retry PersistentClient; if still failing, use local Client
try:
import shutil
shutil.rmtree(abs_path, ignore_errors=True)
except Exception:
pass
os.makedirs(abs_path, exist_ok=True)
try:
self.client = chromadb.PersistentClient(
path=abs_path,
settings=Settings(anonymized_telemetry=False),
)
except ValueError:
# Final fallback to non-tenant local client
self.client = chromadb.Client(
Settings(
anonymized_telemetry=False,
chroma_api_impl="local",
persist_directory=abs_path,
)
)
# Ensure cosine space for distances
self.collection = self.client.get_or_create_collection(
name="actions",
metadata={"hnsw:space": "cosine"},
)
def upsert_actions(
self,
ids: Sequence[str],
documents: Sequence[str],
embeddings: Any,
metadatas: Sequence[Mapping[str, Union[str, int, float, bool]]],
) -> None:
"""Upsert action documents with embeddings and metadata."""
# Convert to float32 numpy array to satisfy Chroma's expected types
embeddings_np = np.asarray(embeddings, dtype=np.float32)
# Sanitize metadata values to primitives (str/int/float/bool)
def _sanitize(md: Mapping[str, Any]) -> Dict[str, Union[str, int, float, bool]]:
out: Dict[str, Union[str, int, float, bool]] = {}
for k, v in md.items():
if v is None:
out[k] = ""
elif isinstance(v, (str, int, float, bool)):
out[k] = v
else:
out[k] = str(v)
return out
metadatas_sanitized: List[Metadata] = [_sanitize(m) for m in list(metadatas)]
# Chroma 0.5+ supports upsert; fall back to add if needed.
if hasattr(self.collection, "upsert"):
self.collection.upsert(
ids=list(ids),
documents=list(documents),
embeddings=embeddings_np,
metadatas=metadatas_sanitized,
)
else: # pragma: no cover
self.collection.add(
ids=list(ids),
documents=list(documents),
embeddings=embeddings_np,
metadatas=metadatas_sanitized,
)
def query_by_embedding(
self, embedding: List[float], top_k: int = 5
) -> List[Dict[str, Any]]:
"""Query similar actions by embedding.
Returns list of dicts: {id, similarity, metadata, document}
"""
res = self.collection.query(
query_embeddings=[list(embedding)],
n_results=top_k,
include=["distances", "metadatas", "documents"],
)
ids = (res.get("ids") or [[]])[0]
dists = (res.get("distances") or [[]])[0]
metas = (res.get("metadatas") or [[]])[0]
docs = (res.get("documents") or [[]])[0]
out: List[Dict[str, Any]] = []
for i, _id in enumerate(ids):
dist = float(dists[i]) if i < len(dists) else 1.0
sim = max(0.0, min(1.0, 1.0 - dist)) # convert cosine distance → similarity
out.append(
{
"id": _id,
"similarity": sim,
"metadata": metas[i] if i < len(metas) else {},
"document": docs[i] if i < len(docs) else "",
}
)
return out