File size: 5,948 Bytes
a91323c
 
 
 
 
 
 
 
71793d1
a91323c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71793d1
a91323c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from __future__ import annotations

from typing import Any, Dict, List, Mapping, Sequence, Union
import os
import logging

import chromadb
from chromadb.config import Settings
from chromadb.api.types import Metadata
import numpy as np


class ActionVectorStore:
    """Persistent ChromaDB store for action embeddings.

    - Collection name: "actions"
    - Persistent directory: "chroma_db/"
    - Uses cosine distance and converts to similarity (1 - distance)
    """

    def __init__(self, persist_directory: str = "chroma_db") -> None:
        # Hard-disable ChromaDB telemetry to avoid PostHog capture errors
        os.environ.setdefault("CHROMADB_ANONYMIZED_TELEMETRY", "false")
        os.environ.setdefault("ANONYMIZED_TELEMETRY", "false")
        os.environ.setdefault("CHROMADB_DISABLE_TELEMETRY", "1")
        os.environ.setdefault("CHROMADB_TELEMETRY_IMPLEMENTATION", "noop")
        # Ensure default tenant/database environment variables for Chroma 0.5+
        os.environ.setdefault("CHROMADB_DEFAULT_TENANT", "default_tenant")
        os.environ.setdefault("CHROMADB_DEFAULT_DATABASE", "default_database")
        # Monkeypatch PostHog capture to a no-op to avoid signature errors
        try:  # pragma: no cover
            import posthog  # type: ignore

            def _silent_capture(*args: Any, **kwargs: Any) -> None:
                return None

            def _silent_identify(*args: Any, **kwargs: Any) -> None:
                return None

            posthog.capture = _silent_capture  # type: ignore[attr-defined]
            posthog.identify = _silent_identify  # type: ignore[attr-defined]
        except Exception:
            pass
        # Silence telemetry/log noise
        logging.getLogger("chromadb").setLevel(logging.ERROR)
        logging.getLogger("chromadb.telemetry").setLevel(logging.ERROR)

        # Disable telemetry via client settings too, and use absolute path
        abs_path = os.path.abspath(persist_directory)
        try:
            self.client = chromadb.PersistentClient(
                path=abs_path,
                settings=Settings(anonymized_telemetry=False),
            )
        except ValueError:
            # Fallback: reset directory and retry PersistentClient; if still failing, use local Client
            try:
                import shutil

                shutil.rmtree(abs_path, ignore_errors=True)
            except Exception:
                pass
            os.makedirs(abs_path, exist_ok=True)
            try:
                self.client = chromadb.PersistentClient(
                    path=abs_path,
                    settings=Settings(anonymized_telemetry=False),
                )
            except ValueError:
                # Final fallback to non-tenant local client
                self.client = chromadb.Client(
                    Settings(
                        anonymized_telemetry=False,
                        chroma_api_impl="local",
                        persist_directory=abs_path,
                    )
                )
        # Ensure cosine space for distances
        self.collection = self.client.get_or_create_collection(
            name="actions",
            metadata={"hnsw:space": "cosine"},
        )

    def upsert_actions(
        self,
        ids: Sequence[str],
        documents: Sequence[str],
        embeddings: Any,
        metadatas: Sequence[Mapping[str, Union[str, int, float, bool]]],
    ) -> None:
        """Upsert action documents with embeddings and metadata."""
        # Convert to float32 numpy array to satisfy Chroma's expected types
        embeddings_np = np.asarray(embeddings, dtype=np.float32)

        # Sanitize metadata values to primitives (str/int/float/bool)
        def _sanitize(md: Mapping[str, Any]) -> Dict[str, Union[str, int, float, bool]]:
            out: Dict[str, Union[str, int, float, bool]] = {}
            for k, v in md.items():
                if v is None:
                    out[k] = ""
                elif isinstance(v, (str, int, float, bool)):
                    out[k] = v
                else:
                    out[k] = str(v)
            return out

        metadatas_sanitized: List[Metadata] = [_sanitize(m) for m in list(metadatas)]
        # Chroma 0.5+ supports upsert; fall back to add if needed.
        if hasattr(self.collection, "upsert"):
            self.collection.upsert(
                ids=list(ids),
                documents=list(documents),
                embeddings=embeddings_np,
                metadatas=metadatas_sanitized,
            )
        else:  # pragma: no cover
            self.collection.add(
                ids=list(ids),
                documents=list(documents),
                embeddings=embeddings_np,
                metadatas=metadatas_sanitized,
            )

    def query_by_embedding(
        self, embedding: List[float], top_k: int = 5
    ) -> List[Dict[str, Any]]:
        """Query similar actions by embedding.

        Returns list of dicts: {id, similarity, metadata, document}
        """
        res = self.collection.query(
            query_embeddings=[list(embedding)],
            n_results=top_k,
            include=["distances", "metadatas", "documents"],
        )
        ids = (res.get("ids") or [[]])[0]
        dists = (res.get("distances") or [[]])[0]
        metas = (res.get("metadatas") or [[]])[0]
        docs = (res.get("documents") or [[]])[0]

        out: List[Dict[str, Any]] = []
        for i, _id in enumerate(ids):
            dist = float(dists[i]) if i < len(dists) else 1.0
            sim = max(0.0, min(1.0, 1.0 - dist))  # convert cosine distance → similarity
            out.append(
                {
                    "id": _id,
                    "similarity": sim,
                    "metadata": metas[i] if i < len(metas) else {},
                    "document": docs[i] if i < len(docs) else "",
                }
            )
        return out