Spaces:
Running
Running
| # Phase 6 Implementation Spec: Embeddings & Semantic Search | |
| **Goal**: Add vector search for semantic evidence retrieval. | |
| **Philosophy**: "Find what you mean, not just what you type." | |
| **Prerequisite**: Phase 5 complete (Magentic working) | |
| --- | |
| ## 1. Why Embeddings? | |
| Current limitation: **Keyword-only search misses semantically related papers.** | |
| Example problem: | |
| - User searches: "metformin alzheimer" | |
| - PubMed returns: Papers with exact keywords | |
| - MISSED: Papers about "AMPK activation neuroprotection" (same mechanism, different words) | |
| With embeddings: | |
| - Embed the query AND all evidence | |
| - Find semantically similar papers even without keyword match | |
| - Deduplicate by meaning, not just URL | |
| --- | |
| ## 2. Architecture | |
| ### Current (Phase 5) | |
| ``` | |
| Query β SearchAgent β PubMed/Web (keyword) β Evidence | |
| ``` | |
| ### Phase 6 | |
| ``` | |
| Query β Embed(Query) β SearchAgent | |
| βββ PubMed/Web (keyword) β Evidence | |
| βββ VectorDB (semantic) β Related Evidence | |
| β | |
| Evidence β Embed β Store | |
| ``` | |
| ### Shared Context Enhancement | |
| ```python | |
| # Current | |
| evidence_store = {"current": []} | |
| # Phase 6 | |
| evidence_store = { | |
| "current": [], # Raw evidence | |
| "embeddings": {}, # URL -> embedding vector | |
| "vector_index": None, # ChromaDB collection | |
| } | |
| ``` | |
| --- | |
| ## 3. Technology Choice | |
| ### ChromaDB (Recommended) | |
| - **Free**, open-source, local-first | |
| - No API keys, no cloud dependency | |
| - Supports sentence-transformers out of the box | |
| - Perfect for hackathon (no infra setup) | |
| ### Embedding Model | |
| - `sentence-transformers/all-MiniLM-L6-v2` (fast, good quality) | |
| - Or `BAAI/bge-small-en-v1.5` (better quality, still fast) | |
| --- | |
| ## 4. Implementation | |
| ### 4.1 Dependencies | |
| Add to `pyproject.toml`: | |
| ```toml | |
| [project.optional-dependencies] | |
| embeddings = [ | |
| "chromadb>=0.4.0", | |
| "sentence-transformers>=2.2.0", | |
| ] | |
| ``` | |
| ### 4.2 Embedding Service (`src/services/embeddings.py`) | |
| > **CRITICAL: Async Pattern Required** | |
| > | |
| > `sentence-transformers` is synchronous and CPU-bound. Running it directly in async code | |
| > will **block the event loop**, freezing the UI and halting all concurrent operations. | |
| > | |
| > **Solution**: Use `asyncio.run_in_executor()` to offload to thread pool. | |
| > This pattern already exists in `src/tools/websearch.py:28-34`. | |
| ```python | |
| """Embedding service for semantic search. | |
| IMPORTANT: All public methods are async to avoid blocking the event loop. | |
| The sentence-transformers model is CPU-bound, so we use run_in_executor(). | |
| """ | |
| import asyncio | |
| from typing import List | |
| import chromadb | |
| from sentence_transformers import SentenceTransformer | |
| class EmbeddingService: | |
| """Handles text embedding and vector storage. | |
| All embedding operations run in a thread pool to avoid blocking | |
| the async event loop. See src/tools/websearch.py for the pattern. | |
| """ | |
| def __init__(self, model_name: str = "all-MiniLM-L6-v2"): | |
| self._model = SentenceTransformer(model_name) | |
| self._client = chromadb.Client() # In-memory for hackathon | |
| self._collection = self._client.create_collection( | |
| name="evidence", | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Sync internal methods (run in thread pool) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _sync_embed(self, text: str) -> List[float]: | |
| """Synchronous embedding - DO NOT call directly from async code.""" | |
| return self._model.encode(text).tolist() | |
| def _sync_batch_embed(self, texts: List[str]) -> List[List[float]]: | |
| """Batch embedding for efficiency - DO NOT call directly from async code.""" | |
| return [e.tolist() for e in self._model.encode(texts)] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Async public methods (safe for event loop) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def embed(self, text: str) -> List[float]: | |
| """Embed a single text (async-safe). | |
| Uses run_in_executor to avoid blocking the event loop. | |
| """ | |
| loop = asyncio.get_running_loop() | |
| return await loop.run_in_executor(None, self._sync_embed, text) | |
| async def embed_batch(self, texts: List[str]) -> List[List[float]]: | |
| """Batch embed multiple texts (async-safe, more efficient).""" | |
| loop = asyncio.get_running_loop() | |
| return await loop.run_in_executor(None, self._sync_batch_embed, texts) | |
| async def add_evidence(self, evidence_id: str, content: str, metadata: dict) -> None: | |
| """Add evidence to vector store (async-safe).""" | |
| embedding = await self.embed(content) | |
| # ChromaDB operations are fast, but wrap for consistency | |
| loop = asyncio.get_running_loop() | |
| await loop.run_in_executor( | |
| None, | |
| lambda: self._collection.add( | |
| ids=[evidence_id], | |
| embeddings=[embedding], | |
| metadatas=[metadata], | |
| documents=[content] | |
| ) | |
| ) | |
| async def search_similar(self, query: str, n_results: int = 5) -> List[dict]: | |
| """Find semantically similar evidence (async-safe).""" | |
| query_embedding = await self.embed(query) | |
| loop = asyncio.get_running_loop() | |
| results = await loop.run_in_executor( | |
| None, | |
| lambda: self._collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=n_results | |
| ) | |
| ) | |
| # Handle empty results gracefully | |
| if not results["ids"] or not results["ids"][0]: | |
| return [] | |
| return [ | |
| {"id": id, "content": doc, "metadata": meta, "distance": dist} | |
| for id, doc, meta, dist in zip( | |
| results["ids"][0], | |
| results["documents"][0], | |
| results["metadatas"][0], | |
| results["distances"][0] | |
| ) | |
| ] | |
| async def deduplicate(self, new_evidence: List, threshold: float = 0.9) -> List: | |
| """Remove semantically duplicate evidence (async-safe).""" | |
| unique = [] | |
| for evidence in new_evidence: | |
| similar = await self.search_similar(evidence.content, n_results=1) | |
| if not similar or similar[0]["distance"] > (1 - threshold): | |
| unique.append(evidence) | |
| await self.add_evidence( | |
| evidence_id=evidence.citation.url, | |
| content=evidence.content, | |
| metadata={"source": evidence.citation.source} | |
| ) | |
| return unique | |
| ``` | |
| ### 4.3 Enhanced SearchAgent (`src/agents/search_agent.py`) | |
| Update SearchAgent to use embeddings. **Note**: All embedding calls are `await`ed: | |
| ```python | |
| class SearchAgent(BaseAgent): | |
| def __init__( | |
| self, | |
| search_handler: SearchHandlerProtocol, | |
| evidence_store: dict, | |
| embedding_service: EmbeddingService | None = None, # NEW | |
| ): | |
| # ... existing init ... | |
| self._embeddings = embedding_service | |
| async def run(self, messages, *, thread=None, **kwargs) -> AgentRunResponse: | |
| # ... extract query ... | |
| # Execute keyword search | |
| result = await self._handler.execute(query, max_results_per_tool=10) | |
| # Semantic deduplication (NEW) - ALL CALLS ARE AWAITED | |
| if self._embeddings: | |
| # Deduplicate by semantic similarity (async-safe) | |
| unique_evidence = await self._embeddings.deduplicate(result.evidence) | |
| # Also search for semantically related evidence (async-safe) | |
| related = await self._embeddings.search_similar(query, n_results=5) | |
| # Merge related evidence not already in results | |
| existing_urls = {e.citation.url for e in unique_evidence} | |
| for item in related: | |
| if item["id"] not in existing_urls: | |
| # Reconstruct Evidence from stored data | |
| # ... merge logic ... | |
| # ... rest of method ... | |
| ``` | |
| ### 4.4 Semantic Expansion in Orchestrator | |
| The MagenticOrchestrator can use embeddings to expand queries: | |
| ```python | |
| # In task instruction | |
| task = f"""Research drug repurposing opportunities for: {query} | |
| The system has semantic search enabled. When evidence is found: | |
| 1. Related concepts will be automatically surfaced | |
| 2. Duplicates are removed by meaning, not just URL | |
| 3. Use the surfaced related concepts to refine searches | |
| """ | |
| ``` | |
| ### 4.5 HuggingFace Spaces Deployment | |
| > **β οΈ Important for HF Spaces** | |
| > | |
| > `sentence-transformers` downloads models (~500MB) to `~/.cache` on first use. | |
| > HuggingFace Spaces have **ephemeral storage** - the cache is wiped on restart. | |
| > This causes slow cold starts and bandwidth usage. | |
| **Solution**: Pre-download the model in your Dockerfile: | |
| ```dockerfile | |
| # In Dockerfile | |
| FROM python:3.11-slim | |
| # Set cache directory | |
| ENV HF_HOME=/app/.cache | |
| ENV TRANSFORMERS_CACHE=/app/.cache | |
| # Pre-download the embedding model during build | |
| RUN pip install sentence-transformers && \ | |
| python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('all-MiniLM-L6-v2')" | |
| # ... rest of Dockerfile | |
| ``` | |
| **Alternative**: Use environment variable to specify persistent path: | |
| ```yaml | |
| # In HF Spaces settings or app.yaml | |
| env: | |
| - name: HF_HOME | |
| value: /data/.cache # Persistent volume | |
| ``` | |
| --- | |
| ## 5. Directory Structure After Phase 6 | |
| ``` | |
| src/ | |
| βββ services/ # NEW | |
| β βββ __init__.py | |
| β βββ embeddings.py # EmbeddingService | |
| βββ agents/ | |
| β βββ search_agent.py # Updated with embeddings | |
| β βββ judge_agent.py | |
| βββ ... | |
| ``` | |
| --- | |
| ## 6. Tests | |
| ### 6.1 Unit Tests (`tests/unit/services/test_embeddings.py`) | |
| > **Note**: All tests are async since the EmbeddingService methods are async. | |
| ```python | |
| """Unit tests for EmbeddingService.""" | |
| import pytest | |
| from src.services.embeddings import EmbeddingService | |
| class TestEmbeddingService: | |
| @pytest.mark.asyncio | |
| async def test_embed_returns_vector(self): | |
| """Embedding should return a float vector.""" | |
| service = EmbeddingService() | |
| embedding = await service.embed("metformin diabetes") | |
| assert isinstance(embedding, list) | |
| assert len(embedding) > 0 | |
| assert all(isinstance(x, float) for x in embedding) | |
| @pytest.mark.asyncio | |
| async def test_similar_texts_have_close_embeddings(self): | |
| """Semantically similar texts should have similar embeddings.""" | |
| service = EmbeddingService() | |
| e1 = await service.embed("metformin treats diabetes") | |
| e2 = await service.embed("metformin is used for diabetes treatment") | |
| e3 = await service.embed("the weather is sunny today") | |
| # Cosine similarity helper | |
| from numpy import dot | |
| from numpy.linalg import norm | |
| cosine = lambda a, b: dot(a, b) / (norm(a) * norm(b)) | |
| # Similar texts should be closer | |
| assert cosine(e1, e2) > cosine(e1, e3) | |
| @pytest.mark.asyncio | |
| async def test_batch_embed_efficient(self): | |
| """Batch embedding should be more efficient than individual calls.""" | |
| service = EmbeddingService() | |
| texts = ["text one", "text two", "text three"] | |
| # Batch embed | |
| batch_results = await service.embed_batch(texts) | |
| assert len(batch_results) == 3 | |
| assert all(isinstance(e, list) for e in batch_results) | |
| @pytest.mark.asyncio | |
| async def test_add_and_search(self): | |
| """Should be able to add evidence and search for similar.""" | |
| service = EmbeddingService() | |
| await service.add_evidence( | |
| evidence_id="test1", | |
| content="Metformin activates AMPK pathway", | |
| metadata={"source": "pubmed"} | |
| ) | |
| results = await service.search_similar("AMPK activation drugs", n_results=1) | |
| assert len(results) == 1 | |
| assert "AMPK" in results[0]["content"] | |
| @pytest.mark.asyncio | |
| async def test_search_similar_empty_collection(self): | |
| """Search on empty collection should return empty list, not error.""" | |
| service = EmbeddingService() | |
| results = await service.search_similar("anything", n_results=5) | |
| assert results == [] | |
| ``` | |
| --- | |
| ## 7. Definition of Done | |
| Phase 6 is **COMPLETE** when: | |
| 1. `EmbeddingService` implemented with ChromaDB | |
| 2. SearchAgent uses embeddings for deduplication | |
| 3. Semantic search surfaces related evidence | |
| 4. All unit tests pass | |
| 5. Integration test shows improved recall (finds related papers) | |
| --- | |
| ## 8. Value Delivered | |
| | Before (Phase 5) | After (Phase 6) | | |
| |------------------|-----------------| | |
| | Keyword-only search | Semantic + keyword search | | |
| | URL-based deduplication | Meaning-based deduplication | | |
| | Miss related papers | Surface related concepts | | |
| | Exact match required | Fuzzy semantic matching | | |
| **Real example improvement:** | |
| - Query: "metformin alzheimer" | |
| - Before: Only papers mentioning both words | |
| - After: Also finds "AMPK neuroprotection", "biguanide cognitive", etc. | |