| """QueryExecutor — dispatches retrieval results to the appropriate executor by source_type.""" |
|
|
| import asyncio |
|
|
| from sqlalchemy.ext.asyncio import AsyncSession |
|
|
| from src.middlewares.logging import get_logger |
| from src.query.base import QueryResult |
| from src.query.executors.db_executor import db_executor |
| from src.query.executors.tabular import tabular_executor |
| from src.rag.base import RetrievalResult |
|
|
| logger = get_logger("query_executor") |
|
|
|
|
| class QueryExecutor: |
| async def execute( |
| self, |
| results: list[RetrievalResult], |
| user_id: str, |
| db: AsyncSession, |
| question: str, |
| limit: int = 100, |
| ) -> list[QueryResult]: |
| batches = await asyncio.gather( |
| db_executor.execute(results, user_id, db, question, limit), |
| tabular_executor.execute(results, user_id, db, question, limit), |
| return_exceptions=True, |
| ) |
|
|
| query_results: list[QueryResult] = [] |
| for batch in batches: |
| if isinstance(batch, Exception): |
| logger.error("executor failed", error=str(batch)) |
| continue |
| query_results.extend(batch) |
|
|
| logger.info("query execution complete", total=len(query_results)) |
| return query_results |
|
|
|
|
| query_executor = QueryExecutor() |
|
|