Rifqi Hafizuddin
[KM-512] connect query executor to user question. add logging for db_executor
abc494f | """QueryExecutor — dispatches retrieval results to the appropriate executor by source_type.""" | |
| import asyncio | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from src.middlewares.logging import get_logger | |
| from src.query.base import QueryResult | |
| from src.query.executors.db_executor import db_executor | |
| from src.query.executors.tabular import tabular_executor | |
| from src.rag.base import RetrievalResult | |
| logger = get_logger("query_executor") | |
| class QueryExecutor: | |
| async def execute( | |
| self, | |
| results: list[RetrievalResult], | |
| user_id: str, | |
| db: AsyncSession, | |
| question: str, | |
| limit: int = 100, | |
| ) -> list[QueryResult]: | |
| db_results = [r for r in results if r.source_type == "database"] | |
| tabular_results = [ | |
| r for r in results | |
| if r.source_type == "document" | |
| and r.metadata.get("data", {}).get("file_type") in ("csv", "xlsx") | |
| ] | |
| async def _empty() -> list[QueryResult]: | |
| return [] | |
| batches = await asyncio.gather( | |
| db_executor.execute(db_results, user_id, db, question, limit) if db_results else _empty(), | |
| tabular_executor.execute(tabular_results, user_id, db, question, limit) if tabular_results else _empty(), | |
| return_exceptions=True, | |
| ) | |
| query_results: list[QueryResult] = [] | |
| for batch in batches: | |
| if isinstance(batch, Exception): | |
| logger.error("executor failed", error=str(batch)) | |
| continue | |
| query_results.extend(batch) | |
| logger.info("query execution complete", total=len(query_results)) | |
| return query_results | |
| query_executor = QueryExecutor() | |