File size: 17,294 Bytes
055a3a7
 
 
7d28de6
 
 
 
7baf8ba
 
 
 
 
 
 
 
055a3a7
3aa91e9
7baf8ba
3aa91e9
 
 
 
 
7baf8ba
 
3aa91e9
 
 
 
 
b50fffd
 
 
 
 
 
 
3aa91e9
 
 
ff16d8e
3aa91e9
20c7bad
3aa91e9
 
 
 
 
 
7d28de6
 
3aa91e9
20c7bad
3aa91e9
 
055a3a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3aa91e9
 
 
20c7bad
3aa91e9
2f0030b
 
 
 
055a3a7
 
 
3aa91e9
 
055a3a7
20c7bad
3aa91e9
 
 
 
055a3a7
3aa91e9
 
7baf8ba
 
 
3aa91e9
 
 
7baf8ba
 
 
 
 
 
 
 
 
 
 
3aa91e9
 
055a3a7
 
3aa91e9
 
7baf8ba
3aa91e9
055a3a7
3aa91e9
 
 
 
7baf8ba
 
055a3a7
3aa91e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
055a3a7
3aa91e9
 
 
 
 
 
 
 
 
 
 
7baf8ba
3aa91e9
7baf8ba
3aa91e9
055a3a7
3aa91e9
 
 
 
 
 
 
 
 
 
 
 
 
 
7baf8ba
3aa91e9
7baf8ba
3aa91e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
055a3a7
3aa91e9
 
 
 
 
 
 
 
 
 
 
 
 
3f90da8
3aa91e9
 
 
 
 
 
 
 
7baf8ba
3aa91e9
7baf8ba
3aa91e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7baf8ba
3aa91e9
7baf8ba
3aa91e9
 
 
 
 
 
055a3a7
 
 
 
 
3aa91e9
7baf8ba
3aa91e9
7baf8ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3aa91e9
 
 
ff16d8e
3aa91e9
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
"""LlamaIndex RAG service for evidence retrieval and indexing.

Requires optional dependencies: uv sync --extra modal

Migration Note (v1.0 rebrand):
    Default collection_name changed from "deepcritical_evidence" to "deepboner_evidence".
    To preserve existing data, explicitly pass collection_name="deepcritical_evidence".

Protocol Compliance:
    This service implements EmbeddingServiceProtocol via async wrapper methods:
    - add_evidence() - async wrapper for ingest_evidence()
    - search_similar() - async wrapper for retrieve()
    - deduplicate() - async wrapper using search_similar() + add_evidence()

    These wrappers use asyncio.run_in_executor() to avoid blocking the event loop.
"""

import asyncio
from typing import Any

import structlog

from src.utils.config import settings
from src.utils.exceptions import ConfigurationError, EmbeddingError
from src.utils.models import Citation, Evidence

logger = structlog.get_logger()


class LlamaIndexRAGService:
    """RAG service using LlamaIndex with ChromaDB vector store.

    Note:
        This service is currently OpenAI-only. It uses OpenAI embeddings and LLM
        regardless of the global `settings.llm_provider` configuration.
        Requires OPENAI_API_KEY to be set.
    """

    def __init__(
        self,
        collection_name: str = "deepboner_evidence",
        persist_dir: str | None = None,
        embedding_model: str | None = None,
        similarity_top_k: int = 5,
    ) -> None:
        """
        Initialize LlamaIndex RAG service.

        Args:
            collection_name: Name of the ChromaDB collection (default changed from
                "deepcritical_evidence" to "deepboner_evidence" in v1.0 rebrand)
            persist_dir: Directory to persist ChromaDB data
            embedding_model: OpenAI embedding model (defaults to settings.openai_embedding_model)
            similarity_top_k: Number of top results to retrieve
        """
        # Lazy import - only when instantiated
        try:
            import chromadb
            from llama_index.core import Document, Settings, StorageContext, VectorStoreIndex
            from llama_index.core.retrievers import VectorIndexRetriever
            from llama_index.embeddings.openai import OpenAIEmbedding
            from llama_index.llms.openai import OpenAI
            from llama_index.vector_stores.chroma import ChromaVectorStore
        except ImportError as e:
            raise ImportError(
                "LlamaIndex dependencies not installed. Run: uv sync --extra modal"
            ) from e

        # Store references for use in other methods
        self._chromadb = chromadb
        self._Document = Document
        self._Settings = Settings
        self._StorageContext = StorageContext
        self._VectorStoreIndex = VectorStoreIndex
        self._VectorIndexRetriever = VectorIndexRetriever
        self._ChromaVectorStore = ChromaVectorStore

        self.collection_name = collection_name
        self.persist_dir = persist_dir or settings.chroma_db_path
        self.similarity_top_k = similarity_top_k
        self.embedding_model = embedding_model or settings.openai_embedding_model

        # Validate API key before use
        if not settings.openai_api_key:
            raise ConfigurationError("OPENAI_API_KEY required for LlamaIndex RAG service")

        # Configure LlamaIndex settings (use centralized config)
        self._Settings.llm = OpenAI(
            model=settings.openai_model,
            api_key=settings.openai_api_key,
        )
        self._Settings.embed_model = OpenAIEmbedding(
            model=self.embedding_model,
            api_key=settings.openai_api_key,
        )

        # Initialize ChromaDB client
        self.chroma_client = self._chromadb.PersistentClient(path=self.persist_dir)

        # Get or create collection
        # ChromaDB raises different exceptions depending on version:
        # - ValueError (older versions)
        # - InvalidCollectionException / NotFoundError (newer versions)
        try:
            self.collection = self.chroma_client.get_collection(self.collection_name)
            logger.info("loaded_existing_collection", name=self.collection_name)
        except Exception as e:
            # Catch any collection-not-found error and create it
            if (
                "not exist" in str(e).lower()
                or "not found" in str(e).lower()
                or isinstance(e, ValueError)
            ):
                self.collection = self.chroma_client.create_collection(self.collection_name)
                logger.info("created_new_collection", name=self.collection_name)
            else:
                raise

        # Initialize vector store and index
        self.vector_store = self._ChromaVectorStore(chroma_collection=self.collection)
        self.storage_context = self._StorageContext.from_defaults(vector_store=self.vector_store)

        # Try to load existing index, or create empty one
        # LlamaIndex raises ValueError for empty/invalid stores
        try:
            self.index = self._VectorStoreIndex.from_vector_store(
                vector_store=self.vector_store,
                storage_context=self.storage_context,
            )
            logger.info("loaded_existing_index")
        except (ValueError, KeyError):
            # Empty or newly created store - create fresh index
            self.index = self._VectorStoreIndex([], storage_context=self.storage_context)
            logger.info("created_new_index")

    def ingest_evidence(self, evidence_list: list[Evidence]) -> None:
        """
        Ingest evidence into the vector store.

        Args:
            evidence_list: List of Evidence objects to ingest
        """
        if not evidence_list:
            logger.warning("no_evidence_to_ingest")
            return

        # Convert Evidence objects to LlamaIndex Documents
        documents = []
        for evidence in evidence_list:
            metadata = {
                "source": evidence.citation.source,
                "title": evidence.citation.title,
                "url": evidence.citation.url,
                "date": evidence.citation.date,
                "authors": ", ".join(evidence.citation.authors),
            }

            doc = self._Document(
                text=evidence.content,
                metadata=metadata,
                doc_id=evidence.citation.url,  # Use URL as unique ID
            )
            documents.append(doc)

        # Insert documents into index
        try:
            for doc in documents:
                self.index.insert(doc)
            logger.info("ingested_evidence", count=len(documents))
        except (ValueError, RuntimeError) as e:
            logger.error("failed_to_ingest_evidence", error=str(e))
            raise EmbeddingError(f"Failed to ingest evidence: {e}") from e

    def ingest_documents(self, documents: list[Any]) -> None:
        """
        Ingest raw LlamaIndex Documents.

        Args:
            documents: List of LlamaIndex Document objects
        """
        if not documents:
            logger.warning("no_documents_to_ingest")
            return

        try:
            for doc in documents:
                self.index.insert(doc)
            logger.info("ingested_documents", count=len(documents))
        except (ValueError, RuntimeError) as e:
            logger.error("failed_to_ingest_documents", error=str(e))
            raise EmbeddingError(f"Failed to ingest documents: {e}") from e

    def retrieve(self, query: str, top_k: int | None = None) -> list[dict[str, Any]]:
        """
        Retrieve relevant documents for a query.

        Args:
            query: Query string
            top_k: Number of results to return (defaults to similarity_top_k)

        Returns:
            List of retrieved documents with metadata and scores
        """
        k = top_k or self.similarity_top_k

        # Create retriever
        retriever = self._VectorIndexRetriever(
            index=self.index,
            similarity_top_k=k,
        )

        try:
            # Retrieve nodes
            nodes = retriever.retrieve(query)

            # Convert to dict format
            results = []
            for node in nodes:
                results.append(
                    {
                        "text": node.node.get_content(),
                        "score": node.score,
                        "metadata": node.node.metadata,
                    }
                )

            logger.info("retrieved_documents", query=query[:50], count=len(results))
            return results

        except (ValueError, RuntimeError) as e:
            logger.error("failed_to_retrieve", error=str(e), query=query[:50])
            raise EmbeddingError(f"Failed to retrieve documents: {e}") from e

    def query(self, query_str: str, top_k: int | None = None) -> str:
        """
        Query the RAG system and get a synthesized response.

        Args:
            query_str: Query string
            top_k: Number of results to use (defaults to similarity_top_k)

        Returns:
            Synthesized response string
        """
        k = top_k or self.similarity_top_k

        # Create query engine
        query_engine = self.index.as_query_engine(
            similarity_top_k=k,
        )

        try:
            response = query_engine.query(query_str)
            logger.info("generated_response", query=query_str[:50])
            return str(response)

        except (ValueError, RuntimeError) as e:
            logger.error("failed_to_query", error=str(e), query=query_str[:50])
            raise EmbeddingError(f"Failed to query RAG system: {e}") from e

    def clear_collection(self) -> None:
        """Clear all documents from the collection."""
        try:
            self.chroma_client.delete_collection(self.collection_name)
            self.collection = self.chroma_client.create_collection(self.collection_name)
            self.vector_store = self._ChromaVectorStore(chroma_collection=self.collection)
            self.storage_context = self._StorageContext.from_defaults(
                vector_store=self.vector_store
            )
            self.index = self._VectorStoreIndex([], storage_context=self.storage_context)
            logger.info("cleared_collection", name=self.collection_name)
        except (ValueError, RuntimeError) as e:
            logger.error("failed_to_clear_collection", error=str(e))
            raise EmbeddingError(f"Failed to clear collection: {e}") from e

    # ─────────────────────────────────────────────────────────────────
    # Async Protocol Methods (EmbeddingServiceProtocol compliance)
    # ─────────────────────────────────────────────────────────────────

    async def embed(self, text: str) -> list[float]:
        """Embed a single text using OpenAI embeddings (Protocol-compatible).

        Uses the LlamaIndex Settings.embed_model which was configured in __init__.

        Args:
            text: Text to embed

        Returns:
            Embedding vector as list of floats
        """
        loop = asyncio.get_running_loop()
        # LlamaIndex embed_model has get_text_embedding method
        embedding = await loop.run_in_executor(
            None, self._Settings.embed_model.get_text_embedding, text
        )
        return list(embedding)

    async def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Embed multiple texts efficiently (Protocol-compatible).

        Uses LlamaIndex's batch embedding for efficiency.

        Args:
            texts: List of texts to embed

        Returns:
            List of embedding vectors
        """
        if not texts:
            return []

        loop = asyncio.get_running_loop()
        # LlamaIndex embed_model has get_text_embedding_batch method
        embeddings = await loop.run_in_executor(
            None, self._Settings.embed_model.get_text_embedding_batch, texts
        )
        return [list(emb) for emb in embeddings]

    async def add_evidence(self, evidence_id: str, content: str, metadata: dict[str, Any]) -> None:
        """Async wrapper for adding evidence (Protocol-compatible).

        Converts the sync ingest_evidence pattern to the async protocol interface.
        Uses run_in_executor to avoid blocking the event loop.

        Args:
            evidence_id: Unique identifier (typically URL)
            content: Text content to embed and store
            metadata: Additional metadata (source, title, date, authors)
        """
        # Reconstruct Evidence from parts
        authors_str = metadata.get("authors", "")
        authors = [a.strip() for a in authors_str.split(",")] if authors_str else []

        citation = Citation(
            source=metadata.get("source", "web"),
            title=metadata.get("title", "Unknown"),
            url=evidence_id,
            date=metadata.get("date", "Unknown"),
            authors=authors,
        )
        evidence = Evidence(content=content, citation=citation)

        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, self.ingest_evidence, [evidence])

    async def search_similar(self, query: str, n_results: int = 5) -> list[dict[str, Any]]:
        """Async wrapper for retrieve (Protocol-compatible).

        Returns results in the same format as EmbeddingService.search_similar()
        for seamless interchangeability.

        Args:
            query: Search query text
            n_results: Maximum number of results to return

        Returns:
            List of dicts with keys: id, content, metadata, distance
        """
        loop = asyncio.get_running_loop()
        results = await loop.run_in_executor(None, self.retrieve, query, n_results)

        # Convert LlamaIndex format to EmbeddingService format for compatibility
        # LlamaIndex: {"text": ..., "score": ..., "metadata": ...}
        # EmbeddingService: {"id": ..., "content": ..., "metadata": ..., "distance": ...}
        return [
            {
                "id": r.get("metadata", {}).get("url", ""),
                "content": r.get("text", ""),
                "metadata": r.get("metadata", {}),
                # Convert similarity score to distance
                # LlamaIndex score: 0-1 (higher = more similar)
                # Output distance: 0-1 (lower = more similar, matches ChromaDB behavior)
                "distance": 1.0 - r.get("score", 0.5),
            }
            for r in results
        ]

    async def deduplicate(self, evidence: list[Evidence], threshold: float = 0.9) -> list[Evidence]:
        """Async wrapper for deduplication (Protocol-compatible).

        Uses search_similar() to check for existing similar content.
        Stores unique evidence and returns the deduplicated list.

        Args:
            evidence: List of evidence items to deduplicate
            threshold: Similarity threshold (0.9 = 90% similar is duplicate)
                Distance range: 0-1 (0 = identical, 1 = orthogonal)
                Duplicate if: distance < (1 - threshold), e.g., < 0.1 for 90%

        Returns:
            List of unique evidence items (duplicates removed)
        """
        unique = []

        for ev in evidence:
            try:
                # Check for similar existing content
                similar = await self.search_similar(ev.content, n_results=1)

                # Check similarity threshold
                # distance 0 = identical, higher = more different
                is_duplicate = similar and similar[0]["distance"] < (1 - threshold)

                if not is_duplicate:
                    unique.append(ev)
                    # Store the new evidence
                    await self.add_evidence(
                        evidence_id=ev.citation.url,
                        content=ev.content,
                        metadata={
                            "source": ev.citation.source,
                            "title": ev.citation.title,
                            "date": ev.citation.date,
                            "authors": ",".join(ev.citation.authors or []),
                        },
                    )
            except Exception as e:
                # Log but don't fail - better to have duplicates than lose data
                logger.warning(
                    "Failed to process evidence in deduplicate",
                    url=ev.citation.url,
                    error=str(e),
                )
                unique.append(ev)

        return unique


def get_rag_service(
    collection_name: str = "deepboner_evidence",
    **kwargs: Any,
) -> LlamaIndexRAGService:
    """
    Get or create a RAG service instance.

    Args:
        collection_name: Name of the ChromaDB collection
        **kwargs: Additional arguments for LlamaIndexRAGService

    Returns:
        Configured LlamaIndexRAGService instance
    """
    return LlamaIndexRAGService(collection_name=collection_name, **kwargs)