| """QueryService β orchestrates plan β validate β compile β execute. |
| |
| Top-level entry point for catalog-driven structured queries. Wired into |
| the chat endpoint when source_hint == "structured". |
| |
| Flow per call: |
| 1. Plan (LLM): question + catalog β QueryIR |
| 2. Validate IR against catalog. On failure, re-prompt the planner with the |
| error context and retry (up to `max_retries` total attempts). |
| 3. Dispatch IR to the right executor by `source.source_type`. |
| 4. Execute. Any exception (including NotImplementedError from the |
| TabularExecutor placeholder) is caught and surfaced via |
| `QueryResult.error` so the chatbot can branch on success / failure. |
| |
| The service never raises β every code path returns a `QueryResult`. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from collections.abc import Callable |
|
|
| from src.middlewares.logging import get_logger |
|
|
| from ..catalog.models import Catalog |
| from .executor.base import QueryResult |
| from .executor.dispatcher import ExecutorDispatcher |
| from .ir.validator import IRValidationError, IRValidator |
| from .planner.service import QueryPlannerService |
|
|
| logger = get_logger("query_service") |
|
|
|
|
| class QueryService: |
| """End-to-end runner for a user question against a catalog. |
| |
| All heavy dependencies are injectable so unit tests don't need real |
| LLMs or DB engines. Default constructors lazy-build the production |
| deps so importing this module is side-effect-free. |
| """ |
|
|
| def __init__( |
| self, |
| planner: QueryPlannerService | None = None, |
| validator: IRValidator | None = None, |
| dispatcher_factory: Callable[[Catalog], ExecutorDispatcher] | None = None, |
| max_retries: int = 3, |
| ) -> None: |
| self._planner = planner or QueryPlannerService() |
| self._validator = validator or IRValidator() |
| self._dispatcher_factory = dispatcher_factory or ExecutorDispatcher |
| self._max_retries = max(1, max_retries) |
|
|
| async def run(self, user_id: str, question: str, catalog: Catalog) -> QueryResult: |
| if not catalog.sources: |
| return _error_result( |
| source_id="", |
| error="No structured data registered yet β connect a database " |
| "or upload a CSV/XLSX before asking data questions.", |
| ) |
|
|
| |
| previous_error: str | None = None |
| ir = None |
| for attempt in range(1, self._max_retries + 1): |
| try: |
| ir = await self._planner.plan(question, catalog, previous_error) |
| except Exception as e: |
| logger.error("planner crashed", attempt=attempt, error=str(e)) |
| return _error_result(source_id="", error=f"planner failed: {e}") |
|
|
| try: |
| self._validator.validate(ir, catalog) |
| logger.info( |
| "ir planned and validated", |
| attempt=attempt, |
| source_id=ir.source_id, |
| table_id=ir.table_id, |
| select=[s.model_dump() for s in ir.select], |
| filters=[f.model_dump() for f in ir.filters], |
| group_by=ir.group_by, |
| ) |
| break |
| except IRValidationError as e: |
| previous_error = str(e) |
| logger.warning( |
| "ir validation failed", |
| attempt=attempt, |
| error=previous_error, |
| ) |
| ir = None |
| continue |
| else: |
| return _error_result( |
| source_id="", |
| error=( |
| f"Planner could not produce a valid IR after " |
| f"{self._max_retries} attempts. Last error: {previous_error}" |
| ), |
| ) |
|
|
| |
| assert ir is not None |
|
|
| |
| try: |
| dispatcher = self._dispatcher_factory(catalog) |
| executor = dispatcher.pick(ir) |
| except Exception as e: |
| logger.error("dispatch failed", source_id=ir.source_id, error=str(e)) |
| return _error_result(source_id=ir.source_id, error=f"dispatch failed: {e}") |
|
|
| try: |
| return await executor.run(ir) |
| except NotImplementedError as e: |
| |
| logger.warning( |
| "executor not yet implemented", |
| source_id=ir.source_id, |
| error=str(e), |
| ) |
| return _error_result( |
| source_id=ir.source_id, |
| error="Tabular execution is not yet available β pending PR3-TAB.", |
| ) |
| except Exception as e: |
| logger.error("executor crashed", source_id=ir.source_id, error=str(e)) |
| return _error_result( |
| source_id=ir.source_id, error=f"executor failed: {e}" |
| ) |
|
|
|
|
| def _error_result(source_id: str, error: str) -> QueryResult: |
| """Build a uniform error QueryResult. |
| |
| `backend` is intentionally empty when the failure happens before an |
| executor is picked β the caller can still distinguish via `error`. |
| """ |
| return QueryResult(source_id=source_id, backend="", error=error) |
|
|