Rifqi Hafizuddin
[NOTICKET] IntentRouter + planner + dispatcher + QueryService + AnswerAgent + ChatHandler
f65aee0 | """Picks DB vs Tabular executor based on the source_type of the IR's source. | |
| This is the only place in the structured query path where the schema/tabular | |
| distinction matters. Every step before this is source-type-agnostic. | |
| Production executors are imported lazily so the module is import-safe for | |
| tests (DbExecutor transitively imports `Settings` which fails without `.env`). | |
| Tests can inject their own `executor_factories` to bypass production deps | |
| entirely. | |
| Until TAB owner ships the real `TabularExecutor` body, dispatching to a | |
| tabular source returns the existing stub which raises `NotImplementedError` | |
| on `.run()`. `QueryService` catches this and surfaces a graceful error in | |
| `QueryResult.error`. | |
| """ | |
| from __future__ import annotations | |
| from collections.abc import Callable | |
| from ...catalog.models import Catalog, Source | |
| from ..ir.models import QueryIR | |
| from .base import BaseExecutor | |
| ExecutorFactory = Callable[[Catalog], BaseExecutor] | |
| class ExecutorDispatcher: | |
| """Picks the right `BaseExecutor` for an IR. | |
| One executor instance per source_type per dispatcher (cached internally), | |
| since both `DbExecutor` and `TabularExecutor` are stateless beyond the | |
| catalog they hold. | |
| """ | |
| def __init__( | |
| self, | |
| catalog: Catalog, | |
| executor_factories: dict[str, ExecutorFactory] | None = None, | |
| ) -> None: | |
| self._catalog = catalog | |
| self._factories = executor_factories | |
| self._cache: dict[str, BaseExecutor] = {} | |
| def pick(self, ir: QueryIR) -> BaseExecutor: | |
| source = self._find_source(ir.source_id) | |
| if source.source_type in self._cache: | |
| return self._cache[source.source_type] | |
| factory = self._get_factory(source.source_type) | |
| executor = factory(self._catalog) | |
| self._cache[source.source_type] = executor | |
| return executor | |
| def _get_factory(self, source_type: str) -> ExecutorFactory: | |
| if self._factories is not None: | |
| factory = self._factories.get(source_type) | |
| if factory is None: | |
| raise ValueError( | |
| f"no executor factory injected for source_type={source_type!r}" | |
| ) | |
| return factory | |
| # Default factories — lazy-imported so importing this module is cheap | |
| if source_type == "schema": | |
| from .db import DbExecutor | |
| return DbExecutor # type: ignore[return-value] | |
| if source_type == "tabular": | |
| from .tabular import TabularExecutor | |
| return TabularExecutor # type: ignore[return-value] | |
| raise ValueError(f"unsupported source_type={source_type!r}") | |
| def _find_source(self, source_id: str) -> Source: | |
| for s in self._catalog.sources: | |
| if s.source_id == source_id: | |
| return s | |
| raise ValueError(f"source_id {source_id!r} not in catalog") | |