Agentic-Service-Data-Eyond / src /query /query_executor.py
Rifqi Hafizuddin
[KM-438-439] add retriever feature
ba550a5
raw
history blame
1.7 kB
"""QueryExecutor — dispatches retrieval results to the appropriate executor by source_type."""
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from src.middlewares.logging import get_logger
from src.query.base import QueryResult
from src.query.executors.db_executor import db_executor
from src.query.executors.tabular import tabular_executor
from src.rag.base import RetrievalResult
logger = get_logger("query_executor")
class QueryExecutor:
async def execute(
self,
results: list[RetrievalResult],
user_id: str,
db: AsyncSession,
question: str,
limit: int = 100,
) -> list[QueryResult]:
db_results = [r for r in results if r.source_type == "database"]
tabular_results = [
r for r in results
if r.source_type == "document"
and r.metadata.get("data", {}).get("file_type") in ("csv", "xlsx")
]
async def _empty() -> list[QueryResult]:
return []
batches = await asyncio.gather(
db_executor.execute(db_results, user_id, db, question, limit) if db_results else _empty(),
tabular_executor.execute(tabular_results, user_id, db, question, limit) if tabular_results else _empty(),
return_exceptions=True,
)
query_results: list[QueryResult] = []
for batch in batches:
if isinstance(batch, Exception):
logger.error("executor failed", error=str(batch))
continue
query_results.extend(batch)
logger.info("query execution complete", total=len(query_results))
return query_results
query_executor = QueryExecutor()