Spaces:
Sleeping
Sleeping
| """ | |
| MEXAR - Hybrid Search Module | |
| Combines semantic (vector) search with keyword (full-text) search using RRF. | |
| """ | |
| import logging | |
| from typing import List, Tuple, Optional | |
| from sqlalchemy import text | |
| from core.database import SessionLocal | |
| from models.chunk import DocumentChunk | |
| logger = logging.getLogger(__name__) | |
| class HybridSearcher: | |
| """ | |
| Hybrid search combining: | |
| 1. Semantic search (pgvector cosine similarity) | |
| 2. Keyword search (PostgreSQL tsvector) | |
| 3. Reciprocal Rank Fusion (RRF) to merge results | |
| """ | |
| def __init__(self, embedding_model): | |
| """ | |
| Initialize hybrid searcher. | |
| Args: | |
| embedding_model: FastEmbed model for query embedding | |
| """ | |
| self.embedding_model = embedding_model | |
| def search( | |
| self, | |
| query: str, | |
| agent_id: int, | |
| top_k: int = 20 | |
| ) -> List[Tuple[DocumentChunk, float]]: | |
| """ | |
| Perform hybrid search using Supabase RPC function. | |
| Args: | |
| query: User's search query | |
| agent_id: ID of the agent to search within | |
| top_k: Number of results to return | |
| Returns: | |
| List of (DocumentChunk, rrf_score) tuples | |
| """ | |
| if not query.strip(): | |
| return [] | |
| try: | |
| # Generate query embedding | |
| query_embedding = list(self.embedding_model.embed([query]))[0].tolist() | |
| db = SessionLocal() | |
| try: | |
| # Call the hybrid_search function created in migration | |
| # Use CAST syntax to avoid clashing with SQLAlchemy bind parameters (:: is often parsed as a parameter) | |
| result = db.execute(text(""" | |
| SELECT * FROM hybrid_search( | |
| CAST(:embedding AS vector), | |
| :query_text, | |
| :agent_id, | |
| :match_count | |
| ) | |
| """), { | |
| "embedding": query_embedding, | |
| "query_text": query, | |
| "agent_id": agent_id, | |
| "match_count": top_k | |
| }) | |
| rows = result.fetchall() | |
| if not rows: | |
| # Fallback to pure semantic search if hybrid returns nothing | |
| return self._semantic_only_search(db, query_embedding, agent_id, top_k) | |
| # Fetch full chunk objects | |
| chunk_ids = [row.id for row in rows] | |
| chunks = db.query(DocumentChunk).filter( | |
| DocumentChunk.id.in_(chunk_ids) | |
| ).all() | |
| chunk_map = {c.id: c for c in chunks} | |
| # Return chunks with RRF scores, maintaining order | |
| results = [] | |
| for row in rows: | |
| if row.id in chunk_map: | |
| results.append((chunk_map[row.id], row.rrf_score)) | |
| logger.info(f"Hybrid search found {len(results)} results for agent {agent_id}") | |
| return results | |
| finally: | |
| db.close() | |
| except Exception as e: | |
| logger.error(f"Hybrid search failed: {e}") | |
| # Fallback to simple semantic search | |
| return self._fallback_semantic_search(query, agent_id, top_k) | |
| def _semantic_only_search( | |
| self, | |
| db, | |
| query_embedding: List[float], | |
| agent_id: int, | |
| top_k: int | |
| ) -> List[Tuple[DocumentChunk, float]]: | |
| """Pure semantic search fallback.""" | |
| try: | |
| chunks = db.query(DocumentChunk).filter( | |
| DocumentChunk.agent_id == agent_id | |
| ).order_by( | |
| DocumentChunk.embedding.cosine_distance(query_embedding) | |
| ).limit(top_k).all() | |
| # Calculate similarity scores (1 - distance) | |
| results = [] | |
| for i, chunk in enumerate(chunks): | |
| # Approximate score based on rank | |
| score = 1.0 / (1 + i * 0.1) | |
| results.append((chunk, score)) | |
| return results | |
| except Exception as e: | |
| logger.error(f"Semantic search failed: {e}") | |
| return [] | |
| def _fallback_semantic_search( | |
| self, | |
| query: str, | |
| agent_id: int, | |
| top_k: int | |
| ) -> List[Tuple[DocumentChunk, float]]: | |
| """Fallback when hybrid search function not available.""" | |
| try: | |
| query_embedding = list(self.embedding_model.embed([query]))[0].tolist() | |
| db = SessionLocal() | |
| try: | |
| return self._semantic_only_search(db, query_embedding, agent_id, top_k) | |
| finally: | |
| db.close() | |
| except Exception as e: | |
| logger.error(f"Fallback search failed: {e}") | |
| return [] | |
| def create_hybrid_searcher(embedding_model) -> HybridSearcher: | |
| """Factory function to create HybridSearcher.""" | |
| return HybridSearcher(embedding_model) | |