ishaq101's picture
feat/Catalog Retrieval System (#1)
6bff5d9
raw
history blame
5.41 kB
"""QueryService β€” orchestrates plan β†’ validate β†’ compile β†’ execute.
Top-level entry point for catalog-driven structured queries. Wired into
the chat endpoint when source_hint == "structured".
Flow per call:
1. Plan (LLM): question + catalog β†’ QueryIR
2. Validate IR against catalog. On failure, re-prompt the planner with the
error context and retry (up to `max_retries` total attempts).
3. Dispatch IR to the right executor by `source.source_type`.
4. Execute. Any exception (including NotImplementedError from the
TabularExecutor placeholder) is caught and surfaced via
`QueryResult.error` so the chatbot can branch on success / failure.
The service never raises β€” every code path returns a `QueryResult`.
"""
from __future__ import annotations
from collections.abc import Callable
from src.middlewares.logging import get_logger
from ..catalog.models import Catalog
from .executor.base import QueryResult
from .executor.dispatcher import ExecutorDispatcher
from .ir.validator import IRValidationError, IRValidator
from .planner.service import QueryPlannerService
logger = get_logger("query_service")
class QueryService:
"""End-to-end runner for a user question against a catalog.
All heavy dependencies are injectable so unit tests don't need real
LLMs or DB engines. Default constructors lazy-build the production
deps so importing this module is side-effect-free.
"""
def __init__(
self,
planner: QueryPlannerService | None = None,
validator: IRValidator | None = None,
dispatcher_factory: Callable[[Catalog], ExecutorDispatcher] | None = None,
max_retries: int = 3,
) -> None:
self._planner = planner or QueryPlannerService()
self._validator = validator or IRValidator()
self._dispatcher_factory = dispatcher_factory or ExecutorDispatcher
self._max_retries = max(1, max_retries)
async def run(self, user_id: str, question: str, catalog: Catalog) -> QueryResult:
if not catalog.sources:
return _error_result(
source_id="",
error="No structured data registered yet β€” connect a database "
"or upload a CSV/XLSX before asking data questions.",
)
# ---------- planner + validator with retry ------------------
previous_error: str | None = None
ir = None
for attempt in range(1, self._max_retries + 1):
try:
ir = await self._planner.plan(question, catalog, previous_error)
except Exception as e:
logger.error("planner crashed", attempt=attempt, error=str(e))
return _error_result(source_id="", error=f"planner failed: {e}")
try:
self._validator.validate(ir, catalog)
logger.info(
"ir planned and validated",
attempt=attempt,
source_id=ir.source_id,
table_id=ir.table_id,
select=[s.model_dump() for s in ir.select],
filters=[f.model_dump() for f in ir.filters],
group_by=ir.group_by,
)
break
except IRValidationError as e:
previous_error = str(e)
logger.warning(
"ir validation failed",
attempt=attempt,
error=previous_error,
)
ir = None # discard invalid IR
continue
else:
return _error_result(
source_id="",
error=(
f"Planner could not produce a valid IR after "
f"{self._max_retries} attempts. Last error: {previous_error}"
),
)
# `ir` is non-None and valid here (guarded by the for/else above)
assert ir is not None
# ---------- dispatch + execute ------------------------------
try:
dispatcher = self._dispatcher_factory(catalog)
executor = dispatcher.pick(ir)
except Exception as e:
logger.error("dispatch failed", source_id=ir.source_id, error=str(e))
return _error_result(source_id=ir.source_id, error=f"dispatch failed: {e}")
try:
return await executor.run(ir)
except NotImplementedError as e:
# TabularExecutor placeholder β€” TAB owner ships PR3-TAB
logger.warning(
"executor not yet implemented",
source_id=ir.source_id,
error=str(e),
)
return _error_result(
source_id=ir.source_id,
error="Tabular execution is not yet available β€” pending PR3-TAB.",
)
except Exception as e:
logger.error("executor crashed", source_id=ir.source_id, error=str(e))
return _error_result(
source_id=ir.source_id, error=f"executor failed: {e}"
)
def _error_result(source_id: str, error: str) -> QueryResult:
"""Build a uniform error QueryResult.
`backend` is intentionally empty when the failure happens before an
executor is picked β€” the caller can still distinguish via `error`.
"""
return QueryResult(source_id=source_id, backend="", error=error)