"""QueryService — orchestrates plan → validate → compile → execute. Top-level entry point for catalog-driven structured queries. Wired into the chat endpoint when source_hint == "structured". Flow per call: 1. Plan (LLM): question + catalog → QueryIR 2. Validate IR against catalog. On failure, re-prompt the planner with the error context and retry (up to `max_retries` total attempts). 3. Dispatch IR to the right executor by `source.source_type`. 4. Execute. Any exception (including NotImplementedError from the TabularExecutor placeholder) is caught and surfaced via `QueryResult.error` so the chatbot can branch on success / failure. The service never raises — every code path returns a `QueryResult`. """ from __future__ import annotations from collections.abc import Callable from src.middlewares.logging import get_logger from ..catalog.models import Catalog from .executor.base import QueryResult from .executor.dispatcher import ExecutorDispatcher from .ir.validator import IRValidationError, IRValidator from .planner.service import QueryPlannerService logger = get_logger("query_service") class QueryService: """End-to-end runner for a user question against a catalog. All heavy dependencies are injectable so unit tests don't need real LLMs or DB engines. Default constructors lazy-build the production deps so importing this module is side-effect-free. """ def __init__( self, planner: QueryPlannerService | None = None, validator: IRValidator | None = None, dispatcher_factory: Callable[[Catalog], ExecutorDispatcher] | None = None, max_retries: int = 3, ) -> None: self._planner = planner or QueryPlannerService() self._validator = validator or IRValidator() self._dispatcher_factory = dispatcher_factory or ExecutorDispatcher self._max_retries = max(1, max_retries) async def run(self, user_id: str, question: str, catalog: Catalog) -> QueryResult: if not catalog.sources: return _error_result( source_id="", error="No structured data registered yet — connect a database " "or upload a CSV/XLSX before asking data questions.", ) # ---------- planner + validator with retry ------------------ previous_error: str | None = None ir = None for attempt in range(1, self._max_retries + 1): try: ir = await self._planner.plan(question, catalog, previous_error) except Exception as e: logger.error("planner crashed", attempt=attempt, error=str(e)) return _error_result(source_id="", error=f"planner failed: {e}") try: self._validator.validate(ir, catalog) logger.info( "ir planned and validated", attempt=attempt, source_id=ir.source_id, table_id=ir.table_id, select=[s.model_dump() for s in ir.select], filters=[f.model_dump() for f in ir.filters], group_by=ir.group_by, ) break except IRValidationError as e: previous_error = str(e) logger.warning( "ir validation failed", attempt=attempt, error=previous_error, ) ir = None # discard invalid IR continue else: return _error_result( source_id="", error=( f"Planner could not produce a valid IR after " f"{self._max_retries} attempts. Last error: {previous_error}" ), ) # `ir` is non-None and valid here (guarded by the for/else above) assert ir is not None # ---------- dispatch + execute ------------------------------ try: dispatcher = self._dispatcher_factory(catalog) executor = dispatcher.pick(ir) except Exception as e: logger.error("dispatch failed", source_id=ir.source_id, error=str(e)) return _error_result(source_id=ir.source_id, error=f"dispatch failed: {e}") try: return await executor.run(ir) except NotImplementedError as e: # TabularExecutor placeholder — TAB owner ships PR3-TAB logger.warning( "executor not yet implemented", source_id=ir.source_id, error=str(e), ) return _error_result( source_id=ir.source_id, error="Tabular execution is not yet available — pending PR3-TAB.", ) except Exception as e: logger.error("executor crashed", source_id=ir.source_id, error=str(e)) return _error_result( source_id=ir.source_id, error=f"executor failed: {e}" ) def _error_result(source_id: str, error: str) -> QueryResult: """Build a uniform error QueryResult. `backend` is intentionally empty when the failure happens before an executor is picked — the caller can still distinguish via `error`. """ return QueryResult(source_id=source_id, backend="", error=error)