| """Public retrieval API — thin wrapper around RetrievalRouter.""" | |
| from typing import Any | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from src.middlewares.logging import get_logger | |
| from src.rag.retrievers.document import document_retriever | |
| from src.rag.retrievers.schema import schema_retriever | |
| from src.rag.router import RetrievalRouter, SourceHint | |
| logger = get_logger("retriever") | |
| class RetrieverService: | |
| """Public retrieval service used by chat.py and search tools. | |
| Delegates to RetrievalRouter which dispatches based on source_hint. | |
| Returns List[Dict] to preserve backward compatibility with chat.py. | |
| """ | |
| def __init__(self): | |
| self._router = RetrievalRouter( | |
| schema_retriever=schema_retriever, | |
| document_retriever=document_retriever, | |
| ) | |
| async def retrieve( | |
| self, | |
| query: str, | |
| user_id: str, | |
| db: AsyncSession, | |
| k: int = 5, | |
| source_hint: SourceHint = "both", | |
| ) -> list[dict[str, Any]]: | |
| try: | |
| results = await self._router.retrieve(query, user_id, source_hint, k) | |
| return [{"content": r.content, "metadata": r.metadata} for r in results] | |
| except Exception as e: | |
| logger.error("retrieval failed", error=str(e)) | |
| return [] | |
| retriever = RetrieverService() | |