| """ChatHandler — top-level Phase 2 chat orchestrator. |
| |
| End-to-end flow per user message: |
| |
| 1. `OrchestratorAgent.classify` → RouterDecision (one of six intents). |
| 2. Route by intent: |
| - `chat` → no context. Pass straight to ChatbotAgent. |
| - `structured_flow` → CatalogReader → slow path / QueryService. |
| - `unstructured_flow` → DocumentRetriever (RAG over PGVector) → |
| list[DocumentChunk]. |
| - `check` → check_data / check_knowledge tool → rendered table. |
| - `help` → Help skill: analysis state + history → streamed guidance. |
| |
| (`problem_statement` was removed 2026-06-24 — the goal is now user-entered |
| `objective` + `business_questions` captured at onboarding, with no agent skill.) |
| 3. `ChatbotAgent.astream` → yield text tokens. |
| 4. Wrap each step into an SSE-style event dict so the API endpoint can |
| stream them as Server-Sent Events. |
| |
| The chat endpoint (`src/api/v1/chat.py`) calls `ChatHandler.handle(...)` per |
| request, behind two endpoint-level pre-filters: a greeting/farewell |
| short-circuit and a Redis response cache (both skip the LLM on a hit). |
| |
| All dependencies are injectable for tests. Default constructors lazy-build |
| production deps (no `Settings()` triggered at import time as long as you |
| inject mocks). |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| from collections.abc import AsyncIterator, Callable |
| from typing import TYPE_CHECKING, Any |
|
|
| from langchain_core.messages import BaseMessage |
|
|
| from src.middlewares.logging import get_logger |
| from src.retrieval.base import RetrievalResult |
|
|
| from .chatbot import ChatbotAgent, DocumentChunk |
| from .handlers.check import run_check |
| from .handlers.help import HelpAgent |
| |
| |
| from .handlers.problem_statement import ProblemStatementAgent |
| from .orchestration import OrchestratorAgent |
|
|
| if TYPE_CHECKING: |
| from ..catalog.reader import CatalogReader |
| from ..query.service import QueryService |
| from ..retrieval.router import RetrievalRouter |
| from .gate import AnalysisState |
| from .slow_path.coordinator import SlowPathCoordinator |
| from .slow_path.store import ReportInputStore |
|
|
| logger = get_logger("chat_handler") |
|
|
|
|
| class ChatHandler: |
| """Top-level chat orchestrator. |
| |
| Returns an `AsyncIterator[dict]` of SSE-style events with shape |
| `{"event": <name>, "data": <str>}`. Event types: |
| - `intent` — emitted once after classification (JSON-encoded decision) |
| - `sources` — JSON array of source refs (one per structured table, or |
| per (document_id, page_label) for unstructured) |
| - `chunk` — text fragment of the streaming answer (one per token) |
| - `done` — end of stream (data is empty string) |
| - `error` — failure; data is a user-facing message |
| """ |
|
|
| def __init__( |
| self, |
| intent_router: OrchestratorAgent | None = None, |
| answer_agent: ChatbotAgent | None = None, |
| catalog_reader: CatalogReader | None = None, |
| query_service: QueryService | None = None, |
| document_retriever: RetrievalRouter | None = None, |
| *, |
| enable_slow_path: bool = False, |
| slow_path_coordinator_factory: ( |
| Callable[[str], SlowPathCoordinator] | None |
| ) = None, |
| analysis_store: ReportInputStore | None = None, |
| check_invoker_factory: Callable[[str], Any] | None = None, |
| ps_agent: ProblemStatementAgent | None = None, |
| help_agent: HelpAgent | None = None, |
| state_store: Any | None = None, |
| binding_store: Any | None = None, |
| enable_gate: bool = False, |
| enable_tracing: bool = False, |
| ) -> None: |
| self._intent_router = intent_router |
| self._answer_agent = answer_agent |
| self._catalog_reader = catalog_reader |
| self._query_service = query_service |
| self._document_retriever = document_retriever |
| |
| |
| self._enable_tracing = enable_tracing |
| |
| |
| |
| |
| self._enable_slow_path = enable_slow_path |
| self._slow_path_factory = slow_path_coordinator_factory |
| self._analysis_store = analysis_store |
| |
| |
| self._check_invoker_factory = check_invoker_factory |
| |
| |
| self._ps_agent = ps_agent |
| |
| self._help_agent = help_agent |
| self._state_store = state_store |
| |
| |
| self._binding_store = binding_store |
| |
| |
| self._enable_gate = enable_gate |
|
|
| |
| |
| |
|
|
| def _get_intent_router(self) -> OrchestratorAgent: |
| if self._intent_router is None: |
| self._intent_router = OrchestratorAgent() |
| return self._intent_router |
|
|
| def _get_answer_agent(self) -> ChatbotAgent: |
| if self._answer_agent is None: |
| self._answer_agent = ChatbotAgent() |
| return self._answer_agent |
|
|
| def _get_catalog_reader(self) -> CatalogReader: |
| if self._catalog_reader is None: |
| from ..catalog.reader import CatalogReader |
| from ..catalog.store import CatalogStore |
|
|
| self._catalog_reader = CatalogReader(CatalogStore()) |
| return self._catalog_reader |
|
|
| def _get_query_service(self) -> QueryService: |
| if self._query_service is None: |
| from ..query.service import QueryService |
|
|
| self._query_service = QueryService() |
| return self._query_service |
|
|
| def _get_document_retriever(self) -> RetrievalRouter: |
| if self._document_retriever is None: |
| from ..retrieval.router import RetrievalRouter |
|
|
| self._document_retriever = RetrievalRouter() |
| return self._document_retriever |
|
|
| def _get_check_invoker(self, user_id: str) -> Any: |
| """Build the per-request data-access invoker for the `check` skill.""" |
| if self._check_invoker_factory is not None: |
| return self._check_invoker_factory(user_id) |
| from ..tools.data_access import DataAccessToolInvoker |
|
|
| return DataAccessToolInvoker(user_id, self._get_catalog_reader()) |
|
|
| def _get_ps_agent(self) -> ProblemStatementAgent: |
| if self._ps_agent is None: |
| self._ps_agent = ProblemStatementAgent() |
| return self._ps_agent |
|
|
| def _get_help_agent(self) -> HelpAgent: |
| if self._help_agent is None: |
| self._help_agent = HelpAgent() |
| return self._help_agent |
|
|
| def _get_state_store(self) -> Any: |
| if self._state_store is None: |
| from .state_store import AnalysisStateStore |
|
|
| self._state_store = AnalysisStateStore() |
| return self._state_store |
|
|
| def _get_binding_store(self) -> Any: |
| if self._binding_store is None: |
| from .binding_store import AnalysisDataSourceStore |
|
|
| self._binding_store = AnalysisDataSourceStore() |
| return self._binding_store |
|
|
| async def _bound_source_ids(self, analysis_id: str | None) -> set[str]: |
| """#10: the catalog source_ids this analysis is bound to (empty = unscoped). |
| |
| Fail-open: no analysis_id, no binding rows (legacy room / FE not sending |
| ids), or a read error → empty set, which the caller treats as "whole |
| catalog". Used to build a `_ScopedCatalogReader` so the Planner AND the |
| data-access tools (which re-read the catalog themselves) see the same scope. |
| """ |
| if not analysis_id: |
| return set() |
| try: |
| return set(await self._get_binding_store().get(analysis_id)) |
| except Exception as e: |
| logger.warning("binding read failed — unscoped", analysis_id=analysis_id, error=str(e)) |
| return set() |
|
|
| async def _load_analysis_state(self, analysis_id: str | None) -> AnalysisState: |
| """Load Analysis State for the Help skill; fail closed to a not-validated stub. |
| |
| Mirrors the gate's never-throw fallback so Help degrades gracefully on a |
| missing row, a read error, or a legacy room with no `analysis_id`. |
| """ |
| from .gate import stub_analysis_state |
|
|
| if not analysis_id: |
| return stub_analysis_state(problem_validated=False) |
| try: |
| state = await self._get_state_store().get(analysis_id) |
| except Exception as e: |
| logger.warning("help state read failed — not-validated", error=str(e)) |
| state = None |
| return state if state is not None else stub_analysis_state(problem_validated=False) |
|
|
| |
| |
| |
|
|
| async def stream_help( |
| self, |
| user_id: str, |
| analysis_id: str | None, |
| history: list[BaseMessage] | None = None, |
| message: str | None = None, |
| ) -> AsyncIterator[dict[str, Any]]: |
| """Deterministic `help` dispatch for the dedicated `/api/v1/tools/help` endpoint. |
| |
| Bypasses the intent router — the slash command IS the intent, so there is no |
| classify round-trip and no misclassification risk. Streams the same guidance as |
| the `help` branch of `handle()`, reusing the warm HelpAgent + state store. |
| |
| Emits SSE-style events: `sources` (always `[]` — help never references |
| documents), `chunk`*, then `done` (data left empty; the endpoint stamps the |
| `message_id`). On failure, yields a terminal `error` event. |
| """ |
| |
| |
| state: AnalysisState | None = None |
| if analysis_id: |
| try: |
| state = await self._get_state_store().ensure(analysis_id, user_id) |
| except Exception as e: |
| logger.warning("help state ensure failed", analysis_id=analysis_id, error=str(e)) |
| if state is None: |
| state = await self._load_analysis_state(analysis_id) |
|
|
| |
| |
| from .report.readiness import is_report_ready |
|
|
| report_ready = await is_report_ready(analysis_id, state) |
|
|
| yield {"event": "sources", "data": json.dumps([])} |
| try: |
| async for token in self._get_help_agent().astream( |
| state, |
| history=history, |
| message=message, |
| report_ready=report_ready, |
| ): |
| yield {"event": "chunk", "data": token} |
| except Exception as e: |
| logger.error("help streaming failed", user_id=user_id, error=str(e)) |
| yield {"event": "error", "data": f"Help generation failed: {e}"} |
| return |
| yield {"event": "done", "data": ""} |
|
|
| async def handle( |
| self, |
| message: str, |
| user_id: str, |
| history: list[BaseMessage] | None = None, |
| analysis_id: str | None = None, |
| ) -> AsyncIterator[dict[str, Any]]: |
| tracer = self._make_tracer(user_id, message) |
|
|
| |
| try: |
| oc = tracer.callbacks() |
| ckw = {"callbacks": oc} if oc else {} |
| decision = await self._get_intent_router().classify(message, history, **ckw) |
| except Exception as e: |
| logger.error("intent classification failed", error=str(e)) |
| yield {"event": "error", "data": f"Could not classify message: {e}"} |
| return |
|
|
| intent = decision.intent |
| |
| |
| |
| analysis_state: AnalysisState | None = None |
| if analysis_id: |
| try: |
| analysis_state = await self._get_state_store().ensure(analysis_id, user_id) |
| except Exception as e: |
| logger.warning( |
| "analysis state ensure failed", analysis_id=analysis_id, error=str(e) |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| event_data = decision.model_dump() |
| event_data["intent"] = intent |
| yield {"event": "intent", "data": json.dumps(event_data)} |
|
|
| rewritten = decision.rewritten_query or message |
| query_result = None |
| chunks: list[DocumentChunk] | None = None |
| raw_chunks: Any = None |
|
|
| |
| if intent == "structured_flow": |
| try: |
| |
| |
| |
| |
| from ..catalog.reader import MemoizingCatalogReader |
|
|
| req_reader = MemoizingCatalogReader(self._get_catalog_reader()) |
| |
| |
| |
| bound = await self._bound_source_ids(analysis_id) |
| reader = _ScopedCatalogReader(req_reader, bound) if bound else req_reader |
| catalog = await reader.read(user_id, "structured") |
| if self._enable_slow_path: |
| async for event in self._run_slow_path( |
| user_id, rewritten, catalog, tracer, reader, analysis_id |
| ): |
| yield event |
| return |
| query_result = await self._get_query_service().run( |
| user_id, rewritten, catalog |
| ) |
| except Exception as e: |
| logger.error( |
| "structured route failed", |
| user_id=user_id, |
| error=str(e), |
| ) |
| yield {"event": "error", "data": f"Structured query failed: {e}"} |
| return |
| elif intent == "unstructured_flow": |
| try: |
| raw_chunks = await self._get_document_retriever().retrieve( |
| rewritten, user_id |
| ) |
| chunks = _normalize_chunks(raw_chunks) |
| except Exception as e: |
| logger.error( |
| "unstructured route failed", user_id=user_id, error=str(e) |
| ) |
| yield {"event": "error", "data": f"Document retrieval failed: {e}"} |
| return |
| elif intent == "check": |
| try: |
| invoker = self._get_check_invoker(user_id) |
| text = await run_check(rewritten, invoker) |
| except Exception as e: |
| logger.error("check route failed", user_id=user_id, error=str(e)) |
| yield {"event": "error", "data": f"Lookup failed: {e}"} |
| return |
| yield {"event": "chunk", "data": text} |
| yield {"event": "done", "data": ""} |
| return |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| elif intent == "help": |
| try: |
| state = analysis_state or await self._load_analysis_state(analysis_id) |
| except Exception as e: |
| logger.error("help route failed", user_id=user_id, error=str(e)) |
| yield {"event": "error", "data": f"Help failed: {e}"} |
| return |
| |
| |
| |
| |
| from .report.readiness import is_report_ready |
|
|
| report_ready = await is_report_ready(analysis_id, state) |
| |
| hc = tracer.callbacks(masked=True) |
| hkw = {"callbacks": hc} if hc else {} |
| try: |
| async for token in self._get_help_agent().astream( |
| state, |
| history=history, |
| message=message, |
| report_ready=report_ready, |
| **hkw, |
| ): |
| yield {"event": "chunk", "data": token} |
| except Exception as e: |
| logger.error("help streaming failed", user_id=user_id, error=str(e)) |
| yield {"event": "error", "data": f"Help generation failed: {e}"} |
| return |
| tracer.end() |
| yield {"event": "done", "data": ""} |
| return |
| |
|
|
| |
| sources = _build_sources(intent, user_id, query_result, raw_chunks) |
| logger.info( |
| "built sources", |
| intent=intent, |
| sources_count=len(sources), |
| raw_chunks_count=len(raw_chunks) if raw_chunks else 0, |
| ) |
| yield {"event": "sources", "data": json.dumps(sources)} |
|
|
| |
| |
| mc = tracer.callbacks(masked=True) |
| akw = {"callbacks": mc} if mc else {} |
| try: |
| async for token in self._get_answer_agent().astream( |
| message, |
| history=history, |
| query_result=query_result, |
| chunks=chunks, |
| **akw, |
| ): |
| yield {"event": "chunk", "data": token} |
| except Exception as e: |
| logger.error("answer streaming failed", user_id=user_id, error=str(e)) |
| yield {"event": "error", "data": f"Answer generation failed: {e}"} |
| return |
|
|
| tracer.end() |
| yield {"event": "done", "data": ""} |
|
|
| |
| |
| |
|
|
| def _make_tracer(self, user_id: str, question: str) -> Any: |
| """One Langfuse trace per request (or a NullTracer when disabled).""" |
| if not self._enable_tracing: |
| from ..observability.langfuse.tracing import NullTracer |
|
|
| return NullTracer() |
| from ..observability.langfuse.tracing import RequestTracer |
|
|
| return RequestTracer.start(user_id=user_id, question=question) |
|
|
| def _get_slow_path_coordinator( |
| self, user_id: str, tracer: Any = None, catalog_reader: CatalogReader | None = None |
| ) -> SlowPathCoordinator: |
| """Build the per-request slow-path coordinator (composition root). |
| |
| The data-access tools need the authenticated `user_id` + `CatalogReader`, |
| so the `CompositeToolInvoker` is constructed per request. The slow-path |
| agent code stays tool-agnostic (INV-7) — only here, the composition root, |
| do we name concrete tool implementations. When tracing is active the invoker |
| is wrapped so each tool call records a metadata-only span. |
| """ |
| if self._slow_path_factory is not None: |
| return self._slow_path_factory(user_id) |
|
|
| from ..tools.data_access import DataAccessToolInvoker |
| from ..tools.invoker import AnalyticsToolInvoker, CompositeToolInvoker |
| from .planner.registry import default_registry |
| from .planner.service import PlannerService |
| from .slow_path.assembler import Assembler |
| from .slow_path.coordinator import SlowPathCoordinator |
| from .slow_path.task_runner import TaskRunner |
|
|
| invoker: Any = CompositeToolInvoker( |
| DataAccessToolInvoker(user_id, catalog_reader or self._get_catalog_reader()), |
| AnalyticsToolInvoker(), |
| ) |
| if tracer is not None and getattr(tracer, "active", False): |
| from ..observability.langfuse.tracing import TracingToolInvoker |
|
|
| invoker = TracingToolInvoker(invoker, tracer) |
| registry = default_registry() |
| return SlowPathCoordinator( |
| PlannerService(), TaskRunner(invoker, registry), Assembler(), registry |
| ) |
|
|
| def _get_analysis_store(self) -> ReportInputStore: |
| if self._analysis_store is None: |
| from .slow_path.store import PostgresReportInputStore |
|
|
| self._analysis_store = PostgresReportInputStore() |
| return self._analysis_store |
|
|
| async def _run_slow_path( |
| self, |
| user_id: str, |
| query: str, |
| catalog: Any, |
| tracer: Any = None, |
| catalog_reader: CatalogReader | None = None, |
| analysis_id: str | None = None, |
| ) -> AsyncIterator[dict[str, Any]]: |
| """Run the slow path and stream its assembled answer as SSE events. |
| |
| Context comes from the `get_business_context` seam (a stub today); the |
| `analysis_record` is persisted via the `ReportInputStore` seam (PostgresReportInputStore), |
| stamped with the request's user_id + analysis_id so the report can group it. |
| `chat_answer` is emitted as a single `chunk` (the Assembler returns the whole |
| object — true token streaming is a later step). |
| """ |
| from .planner.business_context import get_business_context |
| from .planner.inputs import Constraints |
|
|
| if tracer is None: |
| from ..observability.langfuse.tracing import NullTracer |
|
|
| tracer = NullTracer() |
|
|
| coordinator = self._get_slow_path_coordinator(user_id, tracer, catalog_reader) |
| context = await get_business_context(user_id) |
|
|
| |
| |
| |
| if self._slow_path_factory is None: |
| from ..query.executor.db import DbExecutor |
|
|
| asyncio.create_task(DbExecutor.prewarm(catalog, user_id)) |
|
|
| pc = tracer.callbacks() |
| ac = tracer.callbacks(masked=True) |
| run_kw: dict[str, Any] = {} |
| if pc: |
| run_kw["planner_callbacks"] = pc |
| if ac: |
| run_kw["assembler_callbacks"] = ac |
|
|
| |
| |
| |
| progress_q: asyncio.Queue[str] = asyncio.Queue() |
|
|
| async def _progress(stage: str) -> None: |
| await progress_q.put(stage) |
|
|
| run_task = asyncio.create_task( |
| coordinator.run( |
| context, catalog, query, Constraints(), progress=_progress, **run_kw |
| ) |
| ) |
| getter: asyncio.Task = asyncio.create_task(progress_q.get()) |
| pending: set[asyncio.Task] = {run_task, getter} |
| while True: |
| done, pending = await asyncio.wait( |
| pending, return_when=asyncio.FIRST_COMPLETED |
| ) |
| if getter in done: |
| yield {"event": "status", "data": getter.result()} |
| getter = asyncio.create_task(progress_q.get()) |
| pending = pending | {getter} |
| if run_task in done: |
| getter.cancel() |
| while not progress_q.empty(): |
| yield {"event": "status", "data": progress_q.get_nowait()} |
| break |
|
|
| try: |
| result = run_task.result() |
| except Exception as e: |
| logger.error("slow path failed", user_id=user_id, error=str(e)) |
| yield {"event": "error", "data": f"Analysis failed: {e}"} |
| return |
|
|
| yield {"event": "sources", "data": json.dumps([])} |
| yield {"event": "chunk", "data": result.chat_answer} |
| try: |
| |
| |
| |
| |
| record = result.analysis_record.model_copy( |
| update={"user_id": user_id, "analysis_id": analysis_id} |
| ) |
| await self._get_analysis_store().save(record) |
| except Exception as e: |
| logger.error("analysis_record persist failed", user_id=user_id, error=str(e)) |
| tracer.end() |
| yield {"event": "done", "data": ""} |
|
|
|
|
| class _ScopedCatalogReader: |
| """Wraps a CatalogReader, restricting `structured` reads to an analysis's bound |
| sources (#10). |
| |
| Scoping lives here — not at a single call site — so the Planner AND the |
| data-access tools (which re-read the catalog themselves) see the same scoped |
| view; otherwise binding is only a hint to the Planner while the executor runs |
| against the full catalog. Fail-open: an empty or fully-disjoint binding yields |
| the whole catalog, so a stale / cross-source binding degrades instead of |
| emptying the catalog. Only `structured` reads are scoped (all #10 binds today); |
| `unstructured` / retrieval reads pass through. |
| """ |
|
|
| def __init__(self, inner: Any, bound: set[str]) -> None: |
| self._inner = inner |
| self._bound = bound |
|
|
| async def read(self, user_id: str, source_hint: str) -> Any: |
| catalog = await self._inner.read(user_id, source_hint) |
| if not self._bound or source_hint != "structured": |
| return catalog |
| scoped = [s for s in catalog.sources if s.source_id in self._bound] |
| return catalog.model_copy(update={"sources": scoped or catalog.sources}) |
|
|
|
|
| def _build_sources( |
| intent: str, |
| user_id: str, |
| query_result: Any, |
| raw_chunks: Any, |
| ) -> list[dict[str, Any]]: |
| """Build the sources payload for the SSE `sources` event. |
| |
| - structured_flow: one entry per executed table (table_name only). |
| - unstructured_flow: deduped by (document_id, page_label), Phase 1 shape. |
| - chat or error: empty list. |
| """ |
| if intent == "structured_flow": |
| if query_result is None or getattr(query_result, "error", None): |
| return [] |
| table_name = getattr(query_result, "table_name", "") or "" |
| if not table_name: |
| return [] |
| return [{ |
| "document_id": f"{user_id}_{table_name}", |
| "filename": table_name, |
| "page_label": None, |
| }] |
|
|
| if intent == "unstructured_flow" and raw_chunks: |
| seen: set[tuple[Any, Any]] = set() |
| sources: list[dict[str, Any]] = [] |
| for item in raw_chunks: |
| if isinstance(item, RetrievalResult): |
| data = item.metadata.get("data", {}) |
| elif isinstance(item, dict): |
| data = item |
| else: |
| continue |
| key = (data.get("document_id"), data.get("page_label")) |
| if key in seen or key == (None, None): |
| continue |
| seen.add(key) |
| sources.append({ |
| "document_id": data.get("document_id"), |
| "filename": data.get("filename", "Unknown"), |
| "page_label": data.get("page_label", "Unknown"), |
| }) |
| return sources |
|
|
| return [] |
|
|
|
|
| def _normalize_chunks(raw: Any) -> list[DocumentChunk]: |
| """Convert whatever the retriever returns into list[DocumentChunk]. |
| |
| The Phase 2 `DocumentRetriever.retrieve` interface is a stub today; |
| when TAB owner ships it, it should return `list[DocumentChunk]` |
| directly so this normalizer becomes a no-op. Until then we coerce |
| common shapes (dict-with-content, plain string) defensively. |
| """ |
| if not raw: |
| return [] |
| if isinstance(raw, list) and all(isinstance(c, DocumentChunk) for c in raw): |
| return raw |
| chunks: list[DocumentChunk] = [] |
| for item in raw: |
| if isinstance(item, DocumentChunk): |
| chunks.append(item) |
| elif isinstance(item, dict): |
| chunks.append( |
| DocumentChunk( |
| content=str(item.get("content", "")), |
| filename=item.get("filename"), |
| page_label=item.get("page_label"), |
| ) |
| ) |
| elif isinstance(item, RetrievalResult): |
| data = item.metadata.get("data", {}) |
| page = data.get("page_label") |
| chunks.append(DocumentChunk( |
| content=item.content, |
| filename=data.get("filename"), |
| page_label=str(page) if page is not None else None, |
| )) |
| elif isinstance(item, str): |
| chunks.append(DocumentChunk(content=item)) |
| return chunks |
|
|