| # LlamaIndex RAG Integration Specification | |
| **Version:** 1.0.0 | |
| **Date:** 2025-11-30 | |
| **Author:** Claude (DeepBoner Singularity Initiative) | |
| **Status:** IMPLEMENTATION READY | |
| ## Executive Summary | |
| This specification details the integration of LlamaIndex RAG into DeepBoner's embedding infrastructure following SOLID principles, DRY patterns, and Gang of Four design patterns. The goal is to wire the orphaned `LlamaIndexRAGService` into the system via a tiered service selection mechanism. | |
| --- | |
| ## Architecture Overview | |
| ### Current State (Problem) | |
| ``` | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β CURRENT ARCHITECTURE β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ | |
| β β | |
| β ResearchMemory βββββββββββββββΊ EmbeddingService (always) β | |
| β β β β | |
| β β βββ sentence-transformers β | |
| β β βββ ChromaDB (in-memory) β | |
| β β βββ NO persistence β | |
| β β β | |
| β β β | |
| β LlamaIndexRAGService βββββββββββΊ ORPHANED (never called) β | |
| β β β β | |
| β β βββ OpenAI embeddings β | |
| β β βββ ChromaDB (persistent) β | |
| β β βββ LlamaIndex RAG β | |
| β β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ``` | |
| ### Target State (Solution) | |
| ``` | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β TARGET ARCHITECTURE β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ | |
| β β | |
| β ResearchMemory βββββββββββββββΊ get_embedding_service() β | |
| β β β β | |
| β β βΌ β | |
| β β βββββββββββββββββββββββ β | |
| β β β Service Selection β β | |
| β β β (Strategy Pattern) β β | |
| β β βββββββββββββββββββββββ β | |
| β β β β β | |
| β β ββββββββββββ ββββββββββββ β | |
| β β βΌ βΌ β | |
| β β βββββββββββββββββββ βββββββββββββββββββββ β | |
| β β β EmbeddingServiceβ βLlamaIndexRAGServiceβ β | |
| β β β (Free Tier) β β(Premium Tier) β β | |
| β β βββββββββββββββββββ€ βββββββββββββββββββββ€ β | |
| β β β sentence-trans. β β OpenAI embeddings β β | |
| β β β In-memory β β Persistent storage β β | |
| β β β No API key req. β β Requires OPENAI_KEYβ β | |
| β β βββββββββββββββββββ βββββββββββββββββββββ β | |
| β β β | |
| β βΌ β | |
| β EmbeddingServiceProtocol βββββ Common Interface (Protocol) β | |
| β β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ``` | |
| --- | |
| ## Design Patterns Applied | |
| ### 1. Strategy Pattern (Gang of Four) | |
| **Purpose:** Allow interchangeable embedding services at runtime. | |
| ```python | |
| # EmbeddingServiceProtocol defines the interface | |
| # EmbeddingService and LlamaIndexRAGService are concrete strategies | |
| # get_embedding_service() is the context that selects the strategy | |
| ``` | |
| ### 2. Protocol Pattern (Structural Typing) | |
| **Purpose:** Define interface without inheritance using Python's `typing.Protocol`. | |
| ```python | |
| from typing import Protocol, Any | |
| from src.utils.models import Evidence | |
| class EmbeddingServiceProtocol(Protocol): | |
| """Duck-typed interface for embedding services.""" | |
| async def add_evidence(self, evidence_id: str, content: str, | |
| metadata: dict[str, Any]) -> None: ... | |
| async def search_similar(self, query: str, | |
| n_results: int = 5) -> list[dict[str, Any]]: ... | |
| async def deduplicate(self, evidence: list[Evidence]) -> list[Evidence]: ... | |
| ``` | |
| ### 3. Factory Method Pattern | |
| **Purpose:** Encapsulate service creation logic. | |
| ```python | |
| def get_embedding_service() -> EmbeddingServiceProtocol: | |
| """Factory method that returns the best available service.""" | |
| if settings.has_openai_key: | |
| return _create_llamaindex_service() | |
| return _create_local_service() | |
| ``` | |
| ### 4. Adapter Pattern | |
| **Purpose:** Make LlamaIndexRAGService async-compatible with the protocol. | |
| ```python | |
| # Wrap sync methods with async wrappers using run_in_executor | |
| async def add_evidence(self, evidence_id: str, content: str, | |
| metadata: dict[str, Any]) -> None: | |
| loop = asyncio.get_running_loop() | |
| await loop.run_in_executor(None, self._sync_add_evidence, | |
| evidence_id, content, metadata) | |
| ``` | |
| ### 5. Dependency Injection | |
| **Purpose:** Allow ResearchMemory to receive any compatible embedding service. | |
| ```python | |
| class ResearchMemory: | |
| def __init__(self, query: str, | |
| embedding_service: EmbeddingServiceProtocol | None = None): | |
| self._embedding_service = embedding_service or get_embedding_service() | |
| ``` | |
| --- | |
| ## SOLID Principles Applied | |
| ### Single Responsibility Principle (SRP) | |
| - `EmbeddingService`: Handles local embeddings only | |
| - `LlamaIndexRAGService`: Handles OpenAI embeddings + persistence only | |
| - `service_loader`: Handles service selection only | |
| - `EmbeddingServiceProtocol`: Defines interface only | |
| ### Open/Closed Principle (OCP) | |
| - New embedding services can be added without modifying existing code | |
| - Just implement `EmbeddingServiceProtocol` and register in `service_loader` | |
| ### Liskov Substitution Principle (LSP) | |
| - Both `EmbeddingService` and `LlamaIndexRAGService` are substitutable | |
| - They implement identical async interfaces | |
| ### Interface Segregation Principle (ISP) | |
| - Protocol includes only methods needed by ResearchMemory | |
| - No "fat interface" with unused methods | |
| ### Dependency Inversion Principle (DIP) | |
| - ResearchMemory depends on `EmbeddingServiceProtocol` (abstraction) | |
| - Not on concrete `EmbeddingService` or `LlamaIndexRAGService` | |
| --- | |
| ## DRY Principle Applied | |
| ### Before (Violation) | |
| ```python | |
| # In EmbeddingService | |
| await self.add_evidence(ev_id, content, { | |
| "source": ev.citation.source, | |
| "title": ev.citation.title, | |
| ... | |
| }) | |
| # In LlamaIndexRAGService - DUPLICATE metadata building | |
| doc = Document(text=ev.content, metadata={ | |
| "source": evidence.citation.source, | |
| "title": evidence.citation.title, | |
| ... | |
| }) | |
| ``` | |
| ### After (DRY) | |
| ```python | |
| # In utils/models.py | |
| class Evidence: | |
| def to_metadata(self) -> dict[str, Any]: | |
| """Convert to storage metadata format.""" | |
| return { | |
| "source": self.citation.source, | |
| "title": self.citation.title, | |
| "date": self.citation.date, | |
| "authors": ",".join(self.citation.authors or []), | |
| "url": self.citation.url, | |
| } | |
| ``` | |
| --- | |
| ## Implementation Files | |
| ### File 1: `src/services/embedding_protocol.py` (NEW) | |
| ```python | |
| """Protocol definition for embedding services. | |
| This module defines the common interface that all embedding services must implement. | |
| Using Protocol (PEP 544) for structural subtyping - no inheritance required. | |
| """ | |
| from typing import Any, Protocol | |
| from src.utils.models import Evidence | |
| class EmbeddingServiceProtocol(Protocol): | |
| """Common interface for embedding services. | |
| Both EmbeddingService (local/free) and LlamaIndexRAGService (OpenAI/premium) | |
| implement this interface, allowing seamless swapping via get_embedding_service(). | |
| Design Pattern: Strategy Pattern (Gang of Four) | |
| - Each implementation is a concrete strategy | |
| - Protocol defines the strategy interface | |
| - service_loader selects the appropriate strategy at runtime | |
| """ | |
| async def add_evidence( | |
| self, evidence_id: str, content: str, metadata: dict[str, Any] | |
| ) -> None: | |
| """Store evidence with embeddings. | |
| Args: | |
| evidence_id: Unique identifier (typically URL) | |
| content: Text content to embed | |
| metadata: Additional metadata for retrieval | |
| """ | |
| ... | |
| async def search_similar( | |
| self, query: str, n_results: int = 5 | |
| ) -> list[dict[str, Any]]: | |
| """Search for semantically similar content. | |
| Args: | |
| query: Search query | |
| n_results: Number of results to return | |
| Returns: | |
| List of dicts with keys: id, content, metadata, distance | |
| """ | |
| ... | |
| async def deduplicate( | |
| self, evidence: list[Evidence], threshold: float = 0.9 | |
| ) -> list[Evidence]: | |
| """Remove duplicate evidence based on semantic similarity. | |
| Args: | |
| evidence: List of evidence items to deduplicate | |
| threshold: Similarity threshold (0.9 = 90% similar is duplicate) | |
| Returns: | |
| List of unique evidence items | |
| """ | |
| ... | |
| ``` | |
| ### File 2: `src/utils/service_loader.py` (MODIFIED) | |
| ```python | |
| """Service loader utility for safe, lazy loading of optional services. | |
| This module handles the import and initialization of services that may | |
| have missing optional dependencies (like Modal or Sentence Transformers), | |
| preventing the application from crashing if they are not available. | |
| Design Patterns: | |
| - Factory Method: get_embedding_service() creates appropriate service | |
| - Strategy Pattern: Selects between EmbeddingService and LlamaIndexRAGService | |
| """ | |
| from typing import TYPE_CHECKING | |
| import structlog | |
| from src.utils.config import settings | |
| if TYPE_CHECKING: | |
| from src.services.embedding_protocol import EmbeddingServiceProtocol | |
| from src.services.embeddings import EmbeddingService | |
| from src.services.llamaindex_rag import LlamaIndexRAGService | |
| from src.services.statistical_analyzer import StatisticalAnalyzer | |
| logger = structlog.get_logger() | |
| def get_embedding_service() -> "EmbeddingServiceProtocol": | |
| """Get the best available embedding service. | |
| Strategy selection (ordered by preference): | |
| 1. LlamaIndexRAGService if OPENAI_API_KEY present (better quality + persistence) | |
| 2. EmbeddingService (free, local, in-memory) as fallback | |
| Design Pattern: Factory Method + Strategy Pattern | |
| - Factory Method: Creates service instance | |
| - Strategy Pattern: Selects between implementations at runtime | |
| Returns: | |
| EmbeddingServiceProtocol: Either LlamaIndexRAGService or EmbeddingService | |
| Raises: | |
| ImportError: If no embedding service dependencies are available | |
| """ | |
| # Try premium tier first (OpenAI + persistence) | |
| if settings.has_openai_key: | |
| try: | |
| from src.services.llamaindex_rag import get_rag_service | |
| service = get_rag_service() | |
| logger.info( | |
| "Using LlamaIndex RAG service", | |
| tier="premium", | |
| persistence="enabled", | |
| embeddings="openai", | |
| ) | |
| return service | |
| except ImportError as e: | |
| logger.info( | |
| "LlamaIndex deps not installed, falling back to local embeddings", | |
| missing=str(e), | |
| ) | |
| except Exception as e: | |
| logger.warning( | |
| "LlamaIndex service failed to initialize, falling back", | |
| error=str(e), | |
| error_type=type(e).__name__, | |
| ) | |
| # Fallback to free tier (local embeddings, in-memory) | |
| try: | |
| from src.services.embeddings import get_embedding_service as get_local_service | |
| service = get_local_service() | |
| logger.info( | |
| "Using local embedding service", | |
| tier="free", | |
| persistence="disabled", | |
| embeddings="sentence-transformers", | |
| ) | |
| return service | |
| except ImportError as e: | |
| logger.error( | |
| "No embedding service available", | |
| error=str(e), | |
| ) | |
| raise ImportError( | |
| "No embedding service available. Install either:\n" | |
| " - uv sync --extra embeddings (for local embeddings)\n" | |
| " - uv sync --extra modal (for LlamaIndex with OpenAI)" | |
| ) from e | |
| def get_embedding_service_if_available() -> "EmbeddingServiceProtocol | None": | |
| """ | |
| Safely attempt to load and initialize an embedding service. | |
| Returns: | |
| EmbeddingServiceProtocol instance if dependencies are met, else None. | |
| """ | |
| try: | |
| return get_embedding_service() | |
| except ImportError as e: | |
| logger.info( | |
| "Embedding service not available (optional dependencies missing)", | |
| missing_dependency=str(e), | |
| ) | |
| except Exception as e: | |
| logger.warning( | |
| "Embedding service initialization failed unexpectedly", | |
| error=str(e), | |
| error_type=type(e).__name__, | |
| ) | |
| return None | |
| def get_analyzer_if_available() -> "StatisticalAnalyzer | None": | |
| """ | |
| Safely attempt to load and initialize the StatisticalAnalyzer. | |
| Returns: | |
| StatisticalAnalyzer instance if Modal is available, else None. | |
| """ | |
| try: | |
| from src.services.statistical_analyzer import get_statistical_analyzer | |
| analyzer = get_statistical_analyzer() | |
| logger.info("StatisticalAnalyzer initialized successfully") | |
| return analyzer | |
| except ImportError as e: | |
| logger.info( | |
| "StatisticalAnalyzer not available (Modal dependencies missing)", | |
| missing_dependency=str(e), | |
| ) | |
| except Exception as e: | |
| logger.warning( | |
| "StatisticalAnalyzer initialization failed unexpectedly", | |
| error=str(e), | |
| error_type=type(e).__name__, | |
| ) | |
| return None | |
| ``` | |
| ### File 3: `src/services/llamaindex_rag.py` (MODIFIED - add async wrappers) | |
| Add these methods to `LlamaIndexRAGService` class: | |
| ```python | |
| # Add to imports at top | |
| import asyncio | |
| # Add these async wrapper methods to the class | |
| async def add_evidence( | |
| self, evidence_id: str, content: str, metadata: dict[str, Any] | |
| ) -> None: | |
| """Async wrapper for adding evidence (Protocol-compatible). | |
| Converts the sync ingest_evidence pattern to the async protocol interface. | |
| Uses run_in_executor to avoid blocking the event loop. | |
| """ | |
| from src.utils.models import Citation, Evidence | |
| # Reconstruct Evidence from parts | |
| citation = Citation( | |
| source=metadata.get("source", "web"), | |
| title=metadata.get("title", "Unknown"), | |
| url=evidence_id, | |
| date=metadata.get("date", "Unknown"), | |
| authors=(metadata.get("authors", "") or "").split(",") if metadata.get("authors") else [], | |
| ) | |
| evidence = Evidence(content=content, citation=citation) | |
| loop = asyncio.get_running_loop() | |
| await loop.run_in_executor(None, self.ingest_evidence, [evidence]) | |
| async def search_similar( | |
| self, query: str, n_results: int = 5 | |
| ) -> list[dict[str, Any]]: | |
| """Async wrapper for retrieve (Protocol-compatible). | |
| Returns results in the same format as EmbeddingService.search_similar(). | |
| """ | |
| loop = asyncio.get_running_loop() | |
| results = await loop.run_in_executor(None, self.retrieve, query, n_results) | |
| # Convert to EmbeddingService format for compatibility | |
| return [ | |
| { | |
| "id": r.get("metadata", {}).get("url", ""), | |
| "content": r.get("text", ""), | |
| "metadata": r.get("metadata", {}), | |
| "distance": 1.0 - (r.get("score", 0.5) or 0.5), # Convert score to distance | |
| } | |
| for r in results | |
| ] | |
| async def deduplicate( | |
| self, evidence: list["Evidence"], threshold: float = 0.9 | |
| ) -> list["Evidence"]: | |
| """Async wrapper for deduplication (Protocol-compatible). | |
| Uses retrieve() to check for existing similar content. | |
| Stores unique evidence and returns the deduplicated list. | |
| """ | |
| unique = [] | |
| for ev in evidence: | |
| try: | |
| # Check for similar existing content | |
| similar = await self.search_similar(ev.content, n_results=1) | |
| # Check similarity threshold | |
| # distance 0 = identical, higher = more different | |
| is_duplicate = similar and similar[0]["distance"] < (1 - threshold) | |
| if not is_duplicate: | |
| unique.append(ev) | |
| # Store the new evidence | |
| await self.add_evidence( | |
| evidence_id=ev.citation.url, | |
| content=ev.content, | |
| metadata={ | |
| "source": ev.citation.source, | |
| "title": ev.citation.title, | |
| "date": ev.citation.date, | |
| "authors": ",".join(ev.citation.authors or []), | |
| }, | |
| ) | |
| except Exception as e: | |
| # Log but don't fail - better to have duplicates than lose data | |
| logger.warning( | |
| "Failed to process evidence in deduplicate", | |
| url=ev.citation.url, | |
| error=str(e), | |
| ) | |
| unique.append(ev) | |
| return unique | |
| ``` | |
| ### File 4: `src/services/research_memory.py` (MODIFIED) | |
| ```python | |
| """Shared research memory layer for all orchestration modes.""" | |
| from typing import TYPE_CHECKING, Any | |
| import structlog | |
| from src.agents.graph.state import Conflict, Hypothesis | |
| from src.utils.models import Citation, Evidence | |
| if TYPE_CHECKING: | |
| from src.services.embedding_protocol import EmbeddingServiceProtocol | |
| logger = structlog.get_logger() | |
| class ResearchMemory: | |
| """Shared cognitive state for research workflows. | |
| This is the memory layer that ALL modes use. | |
| It mimics the LangGraph state management but for manual orchestration. | |
| Design Pattern: Dependency Injection | |
| - Receives embedding service via constructor | |
| - Uses service_loader.get_embedding_service() as default | |
| - Allows testing with mock services | |
| """ | |
| def __init__( | |
| self, | |
| query: str, | |
| embedding_service: "EmbeddingServiceProtocol | None" = None | |
| ): | |
| """Initialize ResearchMemory with a query and optional embedding service. | |
| Args: | |
| query: The research query to track evidence for. | |
| embedding_service: Service for semantic search and deduplication. | |
| Uses get_embedding_service() if not provided. | |
| """ | |
| self.query = query | |
| self.hypotheses: list[Hypothesis] = [] | |
| self.conflicts: list[Conflict] = [] | |
| self.evidence_ids: list[str] = [] | |
| self._evidence_cache: dict[str, Evidence] = {} | |
| self.iteration_count: int = 0 | |
| # Lazy import to avoid circular dependencies | |
| if embedding_service is None: | |
| from src.utils.service_loader import get_embedding_service | |
| self._embedding_service = get_embedding_service() | |
| else: | |
| self._embedding_service = embedding_service | |
| # ... rest of the class remains the same ... | |
| ``` | |
| ### File 5: `tests/unit/services/test_service_loader.py` (NEW) | |
| ```python | |
| """Tests for service loader embedding service selection.""" | |
| from unittest.mock import MagicMock, patch | |
| import pytest | |
| class TestGetEmbeddingService: | |
| """Tests for get_embedding_service() tiered selection.""" | |
| def test_uses_llamaindex_when_openai_key_present(self, monkeypatch): | |
| """Should return LlamaIndexRAGService when OPENAI_API_KEY is set.""" | |
| monkeypatch.setenv("OPENAI_API_KEY", "sk-test-key-12345") | |
| # Reset settings singleton to pick up new env var | |
| with patch("src.utils.service_loader.settings") as mock_settings: | |
| mock_settings.has_openai_key = True | |
| # Mock LlamaIndex service | |
| mock_rag_service = MagicMock() | |
| with patch( | |
| "src.utils.service_loader.get_rag_service", | |
| return_value=mock_rag_service | |
| ): | |
| from src.utils.service_loader import get_embedding_service | |
| service = get_embedding_service() | |
| # Should be the LlamaIndex service | |
| assert service is mock_rag_service | |
| def test_falls_back_to_local_when_no_openai_key(self, monkeypatch): | |
| """Should return EmbeddingService when no OpenAI key.""" | |
| monkeypatch.delenv("OPENAI_API_KEY", raising=False) | |
| with patch("src.utils.service_loader.settings") as mock_settings: | |
| mock_settings.has_openai_key = False | |
| # Mock local service | |
| mock_local_service = MagicMock() | |
| with patch( | |
| "src.services.embeddings.get_embedding_service", | |
| return_value=mock_local_service | |
| ): | |
| from src.utils.service_loader import get_embedding_service | |
| service = get_embedding_service() | |
| # Should be the local service | |
| assert service is mock_local_service | |
| def test_falls_back_when_llamaindex_import_fails(self, monkeypatch): | |
| """Should fallback to local if LlamaIndex deps missing.""" | |
| monkeypatch.setenv("OPENAI_API_KEY", "sk-test-key-12345") | |
| with patch("src.utils.service_loader.settings") as mock_settings: | |
| mock_settings.has_openai_key = True | |
| # LlamaIndex import fails | |
| def raise_import_error(*args, **kwargs): | |
| raise ImportError("llama_index not installed") | |
| mock_local_service = MagicMock() | |
| with patch.dict( | |
| "sys.modules", | |
| {"src.services.llamaindex_rag": None} | |
| ): | |
| with patch( | |
| "src.services.embeddings.get_embedding_service", | |
| return_value=mock_local_service | |
| ): | |
| from src.utils.service_loader import get_embedding_service | |
| # Should fallback gracefully | |
| service = get_embedding_service() | |
| assert service is mock_local_service | |
| def test_raises_when_no_embedding_service_available(self, monkeypatch): | |
| """Should raise ImportError when no embedding service can be loaded.""" | |
| monkeypatch.delenv("OPENAI_API_KEY", raising=False) | |
| with patch("src.utils.service_loader.settings") as mock_settings: | |
| mock_settings.has_openai_key = False | |
| # Both imports fail | |
| with patch.dict( | |
| "sys.modules", | |
| { | |
| "src.services.llamaindex_rag": None, | |
| "src.services.embeddings": None, | |
| } | |
| ): | |
| from src.utils.service_loader import get_embedding_service | |
| with pytest.raises(ImportError) as exc_info: | |
| get_embedding_service() | |
| assert "No embedding service available" in str(exc_info.value) | |
| class TestGetEmbeddingServiceIfAvailable: | |
| """Tests for get_embedding_service_if_available() safe wrapper.""" | |
| def test_returns_none_when_no_service_available(self, monkeypatch): | |
| """Should return None instead of raising when no service available.""" | |
| monkeypatch.delenv("OPENAI_API_KEY", raising=False) | |
| with patch("src.utils.service_loader.settings") as mock_settings: | |
| mock_settings.has_openai_key = False | |
| with patch( | |
| "src.utils.service_loader.get_embedding_service", | |
| side_effect=ImportError("no deps") | |
| ): | |
| from src.utils.service_loader import get_embedding_service_if_available | |
| result = get_embedding_service_if_available() | |
| assert result is None | |
| def test_returns_service_when_available(self, monkeypatch): | |
| """Should return the service when available.""" | |
| mock_service = MagicMock() | |
| with patch( | |
| "src.utils.service_loader.get_embedding_service", | |
| return_value=mock_service | |
| ): | |
| from src.utils.service_loader import get_embedding_service_if_available | |
| result = get_embedding_service_if_available() | |
| assert result is mock_service | |
| ``` | |
| ### File 6: `tests/unit/services/test_llamaindex_rag_protocol.py` (NEW) | |
| ```python | |
| """Tests for LlamaIndexRAGService protocol compliance.""" | |
| from unittest.mock import AsyncMock, MagicMock, patch | |
| import asyncio | |
| import pytest | |
| # Skip if LlamaIndex dependencies not installed | |
| pytest.importorskip("llama_index") | |
| pytest.importorskip("chromadb") | |
| class TestLlamaIndexProtocolCompliance: | |
| """Verify LlamaIndexRAGService implements EmbeddingServiceProtocol.""" | |
| @pytest.fixture | |
| def mock_openai_key(self, monkeypatch): | |
| """Provide a mock OpenAI key.""" | |
| monkeypatch.setenv("OPENAI_API_KEY", "sk-test-key-12345") | |
| @pytest.fixture | |
| def mock_llamaindex_deps(self): | |
| """Mock all LlamaIndex dependencies.""" | |
| with patch("chromadb.PersistentClient") as mock_chroma: | |
| mock_collection = MagicMock() | |
| mock_chroma.return_value.get_collection.return_value = mock_collection | |
| mock_chroma.return_value.create_collection.return_value = mock_collection | |
| with patch("llama_index.core.VectorStoreIndex") as mock_index: | |
| with patch("llama_index.core.Settings"): | |
| with patch("llama_index.embeddings.openai.OpenAIEmbedding"): | |
| with patch("llama_index.llms.openai.OpenAI"): | |
| with patch("llama_index.vector_stores.chroma.ChromaVectorStore"): | |
| yield { | |
| "chroma": mock_chroma, | |
| "collection": mock_collection, | |
| "index": mock_index, | |
| } | |
| @pytest.mark.asyncio | |
| async def test_add_evidence_is_async(self, mock_openai_key, mock_llamaindex_deps): | |
| """add_evidence should be an async method.""" | |
| from src.services.llamaindex_rag import LlamaIndexRAGService | |
| service = LlamaIndexRAGService() | |
| # Should be callable as async | |
| result = service.add_evidence("id", "content", {"source": "pubmed"}) | |
| assert asyncio.iscoroutine(result) | |
| await result # Clean up coroutine | |
| @pytest.mark.asyncio | |
| async def test_search_similar_is_async(self, mock_openai_key, mock_llamaindex_deps): | |
| """search_similar should be an async method.""" | |
| from src.services.llamaindex_rag import LlamaIndexRAGService | |
| service = LlamaIndexRAGService() | |
| # Mock retrieve to avoid actual API call | |
| service.retrieve = MagicMock(return_value=[]) | |
| result = service.search_similar("query", n_results=5) | |
| assert asyncio.iscoroutine(result) | |
| results = await result | |
| assert isinstance(results, list) | |
| @pytest.mark.asyncio | |
| async def test_deduplicate_is_async(self, mock_openai_key, mock_llamaindex_deps): | |
| """deduplicate should be an async method.""" | |
| from src.services.llamaindex_rag import LlamaIndexRAGService | |
| from src.utils.models import Citation, Evidence | |
| service = LlamaIndexRAGService() | |
| # Mock search_similar | |
| service.search_similar = AsyncMock(return_value=[]) | |
| service.add_evidence = AsyncMock() | |
| evidence = [ | |
| Evidence( | |
| content="test", | |
| citation=Citation(source="pubmed", url="u1", title="t1", date="2024"), | |
| ) | |
| ] | |
| result = service.deduplicate(evidence) | |
| assert asyncio.iscoroutine(result) | |
| unique = await result | |
| assert len(unique) == 1 | |
| @pytest.mark.asyncio | |
| async def test_search_similar_returns_correct_format( | |
| self, mock_openai_key, mock_llamaindex_deps | |
| ): | |
| """search_similar should return EmbeddingService-compatible format.""" | |
| from src.services.llamaindex_rag import LlamaIndexRAGService | |
| service = LlamaIndexRAGService() | |
| # Mock retrieve to return LlamaIndex format | |
| service.retrieve = MagicMock(return_value=[ | |
| { | |
| "text": "some content", | |
| "score": 0.9, | |
| "metadata": { | |
| "source": "pubmed", | |
| "title": "Test", | |
| "url": "http://example.com", | |
| }, | |
| } | |
| ]) | |
| results = await service.search_similar("query") | |
| assert len(results) == 1 | |
| result = results[0] | |
| # Verify correct format | |
| assert "id" in result | |
| assert "content" in result | |
| assert "metadata" in result | |
| assert "distance" in result | |
| # Distance should be 1 - score | |
| assert result["distance"] == pytest.approx(0.1, abs=0.01) | |
| ``` | |
| --- | |
| ## Bug Inventory (P0-P3) | |
| ### P0 - Critical (Must Fix) | |
| **BUG-001: LlamaIndexRAGService not async-compatible** | |
| - **Location:** `src/services/llamaindex_rag.py` | |
| - **Issue:** All methods are sync, but ResearchMemory expects async | |
| - **Fix:** Add async wrappers using `run_in_executor()` | |
| - **Status:** PLANNED (this spec) | |
| ### P1 - High (Should Fix) | |
| **BUG-002: ResearchMemory always creates new EmbeddingService** | |
| - **Location:** `src/services/research_memory.py:37` | |
| - **Issue:** `EmbeddingService()` called directly, bypassing service selection | |
| - **Fix:** Use `get_embedding_service()` instead | |
| - **Status:** PLANNED (this spec) | |
| **BUG-003: Duplicate metadata construction logic** | |
| - **Location:** `embeddings.py:156-161`, `llamaindex_rag.py:128-134` | |
| - **Issue:** Same metadata dict built in multiple places (DRY violation) | |
| - **Fix:** Add `Evidence.to_metadata()` method | |
| - **Status:** OPTIONAL (nice-to-have) | |
| ### P2 - Medium (Could Fix) | |
| **BUG-004: LlamaIndex score-to-distance conversion unclear** | |
| - **Location:** `llamaindex_rag.py` (new code) | |
| - **Issue:** LlamaIndex uses similarity scores (higher = better), EmbeddingService uses distance (lower = better) | |
| - **Fix:** Document and test conversion: `distance = 1 - score` | |
| - **Status:** PLANNED (this spec) | |
| **BUG-005: No type hints for EmbeddingServiceProtocol in ResearchMemory** | |
| - **Location:** `src/services/research_memory.py` | |
| - **Issue:** `embedding_service` parameter typed as `EmbeddingService | None` | |
| - **Fix:** Type as `EmbeddingServiceProtocol | None` | |
| - **Status:** PLANNED (this spec) | |
| ### P3 - Low (Nice to Have) | |
| **BUG-006: Singleton pattern for LlamaIndex service not implemented** | |
| - **Location:** `src/services/llamaindex_rag.py` | |
| - **Issue:** Each call to `get_rag_service()` creates new instance | |
| - **Fix:** Add module-level singleton like `_shared_model` in `embeddings.py` | |
| - **Status:** DEFERRED (not critical for hackathon) | |
| **BUG-007: Missing integration test for tiered service selection** | |
| - **Location:** `tests/integration/` | |
| - **Issue:** No test verifies actual service switching with real keys | |
| - **Fix:** Add integration test with conditional skip based on env | |
| - **Status:** DEFERRED | |
| --- | |
| ## Implementation Order (TDD) | |
| ### Phase 1: Tests First (Red) | |
| 1. Create `tests/unit/services/test_service_loader.py` | |
| 2. Create `tests/unit/services/test_llamaindex_rag_protocol.py` | |
| 3. Run tests - all should fail (no implementation yet) | |
| ### Phase 2: Protocol (Green - Part 1) | |
| 1. Create `src/services/embedding_protocol.py` | |
| 2. Verify type checking passes | |
| ### Phase 3: LlamaIndex Async (Green - Part 2) | |
| 1. Add async wrappers to `src/services/llamaindex_rag.py` | |
| 2. Run protocol tests - should pass | |
| ### Phase 4: Service Loader (Green - Part 3) | |
| 1. Update `src/utils/service_loader.py` | |
| 2. Run service loader tests - should pass | |
| ### Phase 5: ResearchMemory (Green - Part 4) | |
| 1. Update `src/services/research_memory.py` | |
| 2. Run existing tests - all should pass | |
| ### Phase 6: Integration (Refactor) | |
| 1. Run `make check` | |
| 2. Fix any type errors or lint issues | |
| 3. Commit with clear message | |
| --- | |
| ## Acceptance Criteria | |
| - [ ] `get_embedding_service()` returns `LlamaIndexRAGService` when `OPENAI_API_KEY` present | |
| - [ ] Falls back to `EmbeddingService` when no OpenAI key | |
| - [ ] Both services have compatible async interfaces (Protocol compliance) | |
| - [ ] Persistence works (evidence survives restart with OpenAI key) | |
| - [ ] All existing tests pass | |
| - [ ] New tests for service selection | |
| - [ ] `make check` passes (lint + typecheck + test) | |
| - [ ] No regression in Gradio app functionality | |
| --- | |
| ## Sources & References | |
| ### LlamaIndex Best Practices 2025 | |
| - [LlamaIndex Production RAG Guide](https://developers.llamaindex.ai/python/framework/optimizing/production_rag/) | |
| - [LlamaIndex + ChromaDB Integration](https://docs.trychroma.com/integrations/frameworks/llamaindex) | |
| - [LlamaIndex Embeddings Documentation](https://developers.llamaindex.ai/python/framework/module_guides/models/embeddings/) | |
| ### Design Patterns | |
| - Gang of Four: Strategy Pattern for service selection | |
| - Python Protocol (PEP 544) for structural typing | |
| - Factory Method for service creation | |
| ### SOLID Principles | |
| - Single Responsibility: Each service has one job | |
| - Open/Closed: New services don't require changes to existing code | |
| - Liskov Substitution: Services are interchangeable | |
| - Interface Segregation: Protocol has minimal methods | |
| - Dependency Inversion: Depend on Protocol, not concrete classes | |
| --- | |
| ## Appendix: Full File Listing | |
| After implementation, the following files will be modified or created: | |
| | File | Status | Purpose | | |
| |------|--------|---------| | |
| | `src/services/embedding_protocol.py` | NEW | Protocol interface definition | | |
| | `src/utils/service_loader.py` | MODIFIED | Add `get_embedding_service()` | | |
| | `src/services/llamaindex_rag.py` | MODIFIED | Add async wrapper methods | | |
| | `src/services/research_memory.py` | MODIFIED | Use service loader | | |
| | `tests/unit/services/test_service_loader.py` | NEW | Service selection tests | | |
| | `tests/unit/services/test_llamaindex_rag_protocol.py` | NEW | Protocol compliance tests | | |