Spaces:
Sleeping
Sleeping
File size: 5,948 Bytes
a91323c 71793d1 a91323c 71793d1 a91323c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | from __future__ import annotations
from typing import Any, Dict, List, Mapping, Sequence, Union
import os
import logging
import chromadb
from chromadb.config import Settings
from chromadb.api.types import Metadata
import numpy as np
class ActionVectorStore:
"""Persistent ChromaDB store for action embeddings.
- Collection name: "actions"
- Persistent directory: "chroma_db/"
- Uses cosine distance and converts to similarity (1 - distance)
"""
def __init__(self, persist_directory: str = "chroma_db") -> None:
# Hard-disable ChromaDB telemetry to avoid PostHog capture errors
os.environ.setdefault("CHROMADB_ANONYMIZED_TELEMETRY", "false")
os.environ.setdefault("ANONYMIZED_TELEMETRY", "false")
os.environ.setdefault("CHROMADB_DISABLE_TELEMETRY", "1")
os.environ.setdefault("CHROMADB_TELEMETRY_IMPLEMENTATION", "noop")
# Ensure default tenant/database environment variables for Chroma 0.5+
os.environ.setdefault("CHROMADB_DEFAULT_TENANT", "default_tenant")
os.environ.setdefault("CHROMADB_DEFAULT_DATABASE", "default_database")
# Monkeypatch PostHog capture to a no-op to avoid signature errors
try: # pragma: no cover
import posthog # type: ignore
def _silent_capture(*args: Any, **kwargs: Any) -> None:
return None
def _silent_identify(*args: Any, **kwargs: Any) -> None:
return None
posthog.capture = _silent_capture # type: ignore[attr-defined]
posthog.identify = _silent_identify # type: ignore[attr-defined]
except Exception:
pass
# Silence telemetry/log noise
logging.getLogger("chromadb").setLevel(logging.ERROR)
logging.getLogger("chromadb.telemetry").setLevel(logging.ERROR)
# Disable telemetry via client settings too, and use absolute path
abs_path = os.path.abspath(persist_directory)
try:
self.client = chromadb.PersistentClient(
path=abs_path,
settings=Settings(anonymized_telemetry=False),
)
except ValueError:
# Fallback: reset directory and retry PersistentClient; if still failing, use local Client
try:
import shutil
shutil.rmtree(abs_path, ignore_errors=True)
except Exception:
pass
os.makedirs(abs_path, exist_ok=True)
try:
self.client = chromadb.PersistentClient(
path=abs_path,
settings=Settings(anonymized_telemetry=False),
)
except ValueError:
# Final fallback to non-tenant local client
self.client = chromadb.Client(
Settings(
anonymized_telemetry=False,
chroma_api_impl="local",
persist_directory=abs_path,
)
)
# Ensure cosine space for distances
self.collection = self.client.get_or_create_collection(
name="actions",
metadata={"hnsw:space": "cosine"},
)
def upsert_actions(
self,
ids: Sequence[str],
documents: Sequence[str],
embeddings: Any,
metadatas: Sequence[Mapping[str, Union[str, int, float, bool]]],
) -> None:
"""Upsert action documents with embeddings and metadata."""
# Convert to float32 numpy array to satisfy Chroma's expected types
embeddings_np = np.asarray(embeddings, dtype=np.float32)
# Sanitize metadata values to primitives (str/int/float/bool)
def _sanitize(md: Mapping[str, Any]) -> Dict[str, Union[str, int, float, bool]]:
out: Dict[str, Union[str, int, float, bool]] = {}
for k, v in md.items():
if v is None:
out[k] = ""
elif isinstance(v, (str, int, float, bool)):
out[k] = v
else:
out[k] = str(v)
return out
metadatas_sanitized: List[Metadata] = [_sanitize(m) for m in list(metadatas)]
# Chroma 0.5+ supports upsert; fall back to add if needed.
if hasattr(self.collection, "upsert"):
self.collection.upsert(
ids=list(ids),
documents=list(documents),
embeddings=embeddings_np,
metadatas=metadatas_sanitized,
)
else: # pragma: no cover
self.collection.add(
ids=list(ids),
documents=list(documents),
embeddings=embeddings_np,
metadatas=metadatas_sanitized,
)
def query_by_embedding(
self, embedding: List[float], top_k: int = 5
) -> List[Dict[str, Any]]:
"""Query similar actions by embedding.
Returns list of dicts: {id, similarity, metadata, document}
"""
res = self.collection.query(
query_embeddings=[list(embedding)],
n_results=top_k,
include=["distances", "metadatas", "documents"],
)
ids = (res.get("ids") or [[]])[0]
dists = (res.get("distances") or [[]])[0]
metas = (res.get("metadatas") or [[]])[0]
docs = (res.get("documents") or [[]])[0]
out: List[Dict[str, Any]] = []
for i, _id in enumerate(ids):
dist = float(dists[i]) if i < len(dists) else 1.0
sim = max(0.0, min(1.0, 1.0 - dist)) # convert cosine distance → similarity
out.append(
{
"id": _id,
"similarity": sim,
"metadata": metas[i] if i < len(metas) else {},
"document": docs[i] if i < len(docs) else "",
}
)
return out
|