|
|
"""LlamaIndex RAG service for evidence retrieval and indexing. |
|
|
|
|
|
Requires optional dependencies: uv sync --extra rag |
|
|
|
|
|
Migration Note (v1.0 rebrand): |
|
|
Default collection_name changed from "deepcritical_evidence" to "deepboner_evidence". |
|
|
To preserve existing data, explicitly pass collection_name="deepcritical_evidence". |
|
|
|
|
|
Protocol Compliance: |
|
|
This service implements EmbeddingServiceProtocol via async wrapper methods: |
|
|
- add_evidence() - async wrapper for ingest_evidence() |
|
|
- search_similar() - async wrapper for retrieve() |
|
|
- deduplicate() - async wrapper using search_similar() + add_evidence() |
|
|
|
|
|
These wrappers use asyncio.run_in_executor() to avoid blocking the event loop. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from typing import Any |
|
|
|
|
|
import structlog |
|
|
|
|
|
from src.utils.config import settings |
|
|
from src.utils.exceptions import ConfigurationError, EmbeddingError |
|
|
from src.utils.models import Citation, Evidence |
|
|
|
|
|
logger = structlog.get_logger() |
|
|
|
|
|
|
|
|
class LlamaIndexRAGService: |
|
|
"""RAG service using LlamaIndex with ChromaDB vector store. |
|
|
|
|
|
Note: |
|
|
This service is currently OpenAI-only. It uses OpenAI embeddings and LLM |
|
|
regardless of the global `settings.llm_provider` configuration. |
|
|
Requires OPENAI_API_KEY to be set. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
collection_name: str = "deepboner_evidence", |
|
|
persist_dir: str | None = None, |
|
|
embedding_model: str | None = None, |
|
|
similarity_top_k: int = 5, |
|
|
api_key: str | None = None, |
|
|
) -> None: |
|
|
""" |
|
|
Initialize LlamaIndex RAG service. |
|
|
|
|
|
Args: |
|
|
collection_name: Name of the ChromaDB collection |
|
|
persist_dir: Directory to persist ChromaDB data |
|
|
embedding_model: OpenAI embedding model (defaults to settings.openai_embedding_model) |
|
|
similarity_top_k: Number of top results to retrieve |
|
|
api_key: Optional BYOK OpenAI key. Prioritized over env var. |
|
|
""" |
|
|
|
|
|
try: |
|
|
import chromadb |
|
|
from llama_index.core import Document, Settings, StorageContext, VectorStoreIndex |
|
|
from llama_index.core.retrievers import VectorIndexRetriever |
|
|
from llama_index.embeddings.openai import OpenAIEmbedding |
|
|
from llama_index.llms.openai import OpenAI |
|
|
from llama_index.vector_stores.chroma import ChromaVectorStore |
|
|
except ImportError as e: |
|
|
raise ImportError( |
|
|
"LlamaIndex dependencies not installed. Run: uv sync --extra rag" |
|
|
) from e |
|
|
|
|
|
|
|
|
self._chromadb = chromadb |
|
|
self._Document = Document |
|
|
self._Settings = Settings |
|
|
self._StorageContext = StorageContext |
|
|
self._VectorStoreIndex = VectorStoreIndex |
|
|
self._VectorIndexRetriever = VectorIndexRetriever |
|
|
self._ChromaVectorStore = ChromaVectorStore |
|
|
|
|
|
self.collection_name = collection_name |
|
|
self.persist_dir = persist_dir or settings.chroma_db_path |
|
|
self.similarity_top_k = similarity_top_k |
|
|
self.embedding_model = embedding_model or settings.openai_embedding_model |
|
|
|
|
|
|
|
|
self.api_key = api_key |
|
|
if not self.api_key and settings.has_openai_key: |
|
|
self.api_key = settings.openai_api_key |
|
|
|
|
|
|
|
|
if not self.api_key: |
|
|
raise ConfigurationError("OPENAI_API_KEY required for LlamaIndex RAG service") |
|
|
|
|
|
|
|
|
if not self.api_key.startswith("sk-"): |
|
|
raise ConfigurationError( |
|
|
f"Invalid API key format. Expected OpenAI key starting with 'sk-', " |
|
|
f"got key starting with '{self.api_key[:8]}...'." |
|
|
) |
|
|
|
|
|
|
|
|
self._Settings.llm = OpenAI( |
|
|
model=settings.openai_model, |
|
|
api_key=self.api_key, |
|
|
) |
|
|
self._Settings.embed_model = OpenAIEmbedding( |
|
|
model=self.embedding_model, |
|
|
api_key=self.api_key, |
|
|
) |
|
|
|
|
|
|
|
|
self.chroma_client = self._chromadb.PersistentClient(path=self.persist_dir) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
self.collection = self.chroma_client.get_collection(self.collection_name) |
|
|
logger.info("loaded_existing_collection", name=self.collection_name) |
|
|
except Exception as e: |
|
|
|
|
|
if ( |
|
|
"not exist" in str(e).lower() |
|
|
or "not found" in str(e).lower() |
|
|
or isinstance(e, ValueError) |
|
|
): |
|
|
self.collection = self.chroma_client.create_collection(self.collection_name) |
|
|
logger.info("created_new_collection", name=self.collection_name) |
|
|
else: |
|
|
raise |
|
|
|
|
|
|
|
|
self.vector_store = self._ChromaVectorStore(chroma_collection=self.collection) |
|
|
self.storage_context = self._StorageContext.from_defaults(vector_store=self.vector_store) |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
self.index = self._VectorStoreIndex.from_vector_store( |
|
|
vector_store=self.vector_store, |
|
|
storage_context=self.storage_context, |
|
|
) |
|
|
logger.info("loaded_existing_index") |
|
|
except (ValueError, KeyError): |
|
|
|
|
|
self.index = self._VectorStoreIndex([], storage_context=self.storage_context) |
|
|
logger.info("created_new_index") |
|
|
|
|
|
def ingest_evidence(self, evidence_list: list[Evidence]) -> None: |
|
|
""" |
|
|
Ingest evidence into the vector store. |
|
|
|
|
|
Args: |
|
|
evidence_list: List of Evidence objects to ingest |
|
|
""" |
|
|
if not evidence_list: |
|
|
logger.warning("no_evidence_to_ingest") |
|
|
return |
|
|
|
|
|
|
|
|
documents = [] |
|
|
for evidence in evidence_list: |
|
|
metadata = { |
|
|
"source": evidence.citation.source, |
|
|
"title": evidence.citation.title, |
|
|
"url": evidence.citation.url, |
|
|
"date": evidence.citation.date, |
|
|
"authors": ", ".join(evidence.citation.authors), |
|
|
} |
|
|
|
|
|
doc = self._Document( |
|
|
text=evidence.content, |
|
|
metadata=metadata, |
|
|
doc_id=evidence.citation.url, |
|
|
) |
|
|
documents.append(doc) |
|
|
|
|
|
|
|
|
try: |
|
|
for doc in documents: |
|
|
self.index.insert(doc) |
|
|
logger.info("ingested_evidence", count=len(documents)) |
|
|
except (ValueError, RuntimeError) as e: |
|
|
logger.error("failed_to_ingest_evidence", error=str(e)) |
|
|
raise EmbeddingError(f"Failed to ingest evidence: {e}") from e |
|
|
|
|
|
def ingest_documents(self, documents: list[Any]) -> None: |
|
|
""" |
|
|
Ingest raw LlamaIndex Documents. |
|
|
|
|
|
Args: |
|
|
documents: List of LlamaIndex Document objects |
|
|
""" |
|
|
if not documents: |
|
|
logger.warning("no_documents_to_ingest") |
|
|
return |
|
|
|
|
|
try: |
|
|
for doc in documents: |
|
|
self.index.insert(doc) |
|
|
logger.info("ingested_documents", count=len(documents)) |
|
|
except (ValueError, RuntimeError) as e: |
|
|
logger.error("failed_to_ingest_documents", error=str(e)) |
|
|
raise EmbeddingError(f"Failed to ingest documents: {e}") from e |
|
|
|
|
|
def retrieve(self, query: str, top_k: int | None = None) -> list[dict[str, Any]]: |
|
|
""" |
|
|
Retrieve relevant documents for a query. |
|
|
|
|
|
Args: |
|
|
query: Query string |
|
|
top_k: Number of results to return (defaults to similarity_top_k) |
|
|
|
|
|
Returns: |
|
|
List of retrieved documents with metadata and scores |
|
|
""" |
|
|
k = top_k or self.similarity_top_k |
|
|
|
|
|
|
|
|
retriever = self._VectorIndexRetriever( |
|
|
index=self.index, |
|
|
similarity_top_k=k, |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
nodes = retriever.retrieve(query) |
|
|
|
|
|
|
|
|
results = [] |
|
|
for node in nodes: |
|
|
results.append( |
|
|
{ |
|
|
"text": node.node.get_content(), |
|
|
"score": node.score, |
|
|
"metadata": node.node.metadata, |
|
|
} |
|
|
) |
|
|
|
|
|
logger.info("retrieved_documents", query=query[:50], count=len(results)) |
|
|
return results |
|
|
|
|
|
except (ValueError, RuntimeError) as e: |
|
|
logger.error("failed_to_retrieve", error=str(e), query=query[:50]) |
|
|
raise EmbeddingError(f"Failed to retrieve documents: {e}") from e |
|
|
|
|
|
def query(self, query_str: str, top_k: int | None = None) -> str: |
|
|
""" |
|
|
Query the RAG system and get a synthesized response. |
|
|
|
|
|
Args: |
|
|
query_str: Query string |
|
|
top_k: Number of results to use (defaults to similarity_top_k) |
|
|
|
|
|
Returns: |
|
|
Synthesized response string |
|
|
""" |
|
|
k = top_k or self.similarity_top_k |
|
|
|
|
|
|
|
|
query_engine = self.index.as_query_engine( |
|
|
similarity_top_k=k, |
|
|
) |
|
|
|
|
|
try: |
|
|
response = query_engine.query(query_str) |
|
|
logger.info("generated_response", query=query_str[:50]) |
|
|
return str(response) |
|
|
|
|
|
except (ValueError, RuntimeError) as e: |
|
|
logger.error("failed_to_query", error=str(e), query=query_str[:50]) |
|
|
raise EmbeddingError(f"Failed to query RAG system: {e}") from e |
|
|
|
|
|
def clear_collection(self) -> None: |
|
|
"""Clear all documents from the collection.""" |
|
|
try: |
|
|
self.chroma_client.delete_collection(self.collection_name) |
|
|
self.collection = self.chroma_client.create_collection(self.collection_name) |
|
|
self.vector_store = self._ChromaVectorStore(chroma_collection=self.collection) |
|
|
self.storage_context = self._StorageContext.from_defaults( |
|
|
vector_store=self.vector_store |
|
|
) |
|
|
self.index = self._VectorStoreIndex([], storage_context=self.storage_context) |
|
|
logger.info("cleared_collection", name=self.collection_name) |
|
|
except (ValueError, RuntimeError) as e: |
|
|
logger.error("failed_to_clear_collection", error=str(e)) |
|
|
raise EmbeddingError(f"Failed to clear collection: {e}") from e |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def embed(self, text: str) -> list[float]: |
|
|
"""Embed a single text using OpenAI embeddings (Protocol-compatible). |
|
|
|
|
|
Uses the LlamaIndex Settings.embed_model which was configured in __init__. |
|
|
|
|
|
Args: |
|
|
text: Text to embed |
|
|
|
|
|
Returns: |
|
|
Embedding vector as list of floats |
|
|
""" |
|
|
loop = asyncio.get_running_loop() |
|
|
|
|
|
embedding = await loop.run_in_executor( |
|
|
None, self._Settings.embed_model.get_text_embedding, text |
|
|
) |
|
|
return list(embedding) |
|
|
|
|
|
async def embed_batch(self, texts: list[str]) -> list[list[float]]: |
|
|
"""Embed multiple texts efficiently (Protocol-compatible). |
|
|
|
|
|
Uses LlamaIndex's batch embedding for efficiency. |
|
|
|
|
|
Args: |
|
|
texts: List of texts to embed |
|
|
|
|
|
Returns: |
|
|
List of embedding vectors |
|
|
""" |
|
|
if not texts: |
|
|
return [] |
|
|
|
|
|
loop = asyncio.get_running_loop() |
|
|
|
|
|
embeddings = await loop.run_in_executor( |
|
|
None, self._Settings.embed_model.get_text_embedding_batch, texts |
|
|
) |
|
|
return [list(emb) for emb in embeddings] |
|
|
|
|
|
async def add_evidence(self, evidence_id: str, content: str, metadata: dict[str, Any]) -> None: |
|
|
"""Async wrapper for adding evidence (Protocol-compatible). |
|
|
|
|
|
Converts the sync ingest_evidence pattern to the async protocol interface. |
|
|
Uses run_in_executor to avoid blocking the event loop. |
|
|
|
|
|
Args: |
|
|
evidence_id: Unique identifier (typically URL) |
|
|
content: Text content to embed and store |
|
|
metadata: Additional metadata (source, title, date, authors) |
|
|
""" |
|
|
|
|
|
authors_str = metadata.get("authors", "") |
|
|
authors = [a.strip() for a in authors_str.split(",")] if authors_str else [] |
|
|
|
|
|
citation = Citation( |
|
|
source=metadata.get("source", "web"), |
|
|
title=metadata.get("title", "Unknown"), |
|
|
url=evidence_id, |
|
|
date=metadata.get("date", "Unknown"), |
|
|
authors=authors, |
|
|
) |
|
|
evidence = Evidence(content=content, citation=citation) |
|
|
|
|
|
loop = asyncio.get_running_loop() |
|
|
await loop.run_in_executor(None, self.ingest_evidence, [evidence]) |
|
|
|
|
|
async def search_similar(self, query: str, n_results: int = 5) -> list[dict[str, Any]]: |
|
|
"""Async wrapper for retrieve (Protocol-compatible). |
|
|
|
|
|
Returns results in the same format as EmbeddingService.search_similar() |
|
|
for seamless interchangeability. |
|
|
|
|
|
Args: |
|
|
query: Search query text |
|
|
n_results: Maximum number of results to return |
|
|
|
|
|
Returns: |
|
|
List of dicts with keys: id, content, metadata, distance |
|
|
""" |
|
|
loop = asyncio.get_running_loop() |
|
|
results = await loop.run_in_executor(None, self.retrieve, query, n_results) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return [ |
|
|
{ |
|
|
"id": r.get("metadata", {}).get("url", ""), |
|
|
"content": r.get("text", ""), |
|
|
"metadata": r.get("metadata", {}), |
|
|
|
|
|
|
|
|
|
|
|
"distance": 1.0 - r.get("score", 0.5), |
|
|
} |
|
|
for r in results |
|
|
] |
|
|
|
|
|
async def deduplicate(self, evidence: list[Evidence], threshold: float = 0.9) -> list[Evidence]: |
|
|
"""Async wrapper for deduplication (Protocol-compatible). |
|
|
|
|
|
Uses search_similar() to check for existing similar content. |
|
|
Stores unique evidence and returns the deduplicated list. |
|
|
|
|
|
Args: |
|
|
evidence: List of evidence items to deduplicate |
|
|
threshold: Similarity threshold (0.9 = 90% similar is duplicate) |
|
|
Distance range: 0-1 (0 = identical, 1 = orthogonal) |
|
|
Duplicate if: distance < (1 - threshold), e.g., < 0.1 for 90% |
|
|
|
|
|
Returns: |
|
|
List of unique evidence items (duplicates removed) |
|
|
""" |
|
|
unique = [] |
|
|
|
|
|
for ev in evidence: |
|
|
try: |
|
|
|
|
|
similar = await self.search_similar(ev.content, n_results=1) |
|
|
|
|
|
|
|
|
|
|
|
is_duplicate = similar and similar[0]["distance"] < (1 - threshold) |
|
|
|
|
|
if not is_duplicate: |
|
|
unique.append(ev) |
|
|
|
|
|
await self.add_evidence( |
|
|
evidence_id=ev.citation.url, |
|
|
content=ev.content, |
|
|
metadata={ |
|
|
"source": ev.citation.source, |
|
|
"title": ev.citation.title, |
|
|
"date": ev.citation.date, |
|
|
"authors": ",".join(ev.citation.authors or []), |
|
|
}, |
|
|
) |
|
|
except Exception as e: |
|
|
|
|
|
logger.warning( |
|
|
"Failed to process evidence in deduplicate", |
|
|
url=ev.citation.url, |
|
|
error=str(e), |
|
|
) |
|
|
unique.append(ev) |
|
|
|
|
|
return unique |
|
|
|
|
|
|
|
|
def get_rag_service( |
|
|
collection_name: str = "deepboner_evidence", |
|
|
api_key: str | None = None, |
|
|
**kwargs: Any, |
|
|
) -> LlamaIndexRAGService: |
|
|
""" |
|
|
Get or create a RAG service instance. |
|
|
|
|
|
Args: |
|
|
collection_name: Name of the ChromaDB collection |
|
|
api_key: Optional BYOK OpenAI key |
|
|
**kwargs: Additional arguments for LlamaIndexRAGService |
|
|
|
|
|
Returns: |
|
|
Configured LlamaIndexRAGService instance |
|
|
""" |
|
|
return LlamaIndexRAGService(collection_name=collection_name, api_key=api_key, **kwargs) |
|
|
|