"""QueryExecutor — dispatches retrieval results to the appropriate executor by source_type.""" import asyncio from sqlalchemy.ext.asyncio import AsyncSession from src.middlewares.logging import get_logger from src.query.base import QueryResult from src.query.executors.db_executor import db_executor from src.query.executors.tabular import tabular_executor from src.rag.base import RetrievalResult logger = get_logger("query_executor") class QueryExecutor: async def execute( self, results: list[RetrievalResult], user_id: str, db: AsyncSession, question: str, limit: int = 100, ) -> list[QueryResult]: db_results = [r for r in results if r.source_type == "database"] tabular_results = [ r for r in results if r.source_type == "document" and r.metadata.get("data", {}).get("file_type") in ("csv", "xlsx") ] async def _empty() -> list[QueryResult]: return [] batches = await asyncio.gather( db_executor.execute(db_results, user_id, db, question, limit) if db_results else _empty(), tabular_executor.execute(tabular_results, user_id, db, question, limit) if tabular_results else _empty(), return_exceptions=True, ) query_results: list[QueryResult] = [] for batch in batches: if isinstance(batch, Exception): logger.error("executor failed", error=str(batch)) continue query_results.extend(batch) logger.info("query execution complete", total=len(query_results)) return query_results query_executor = QueryExecutor()