"""ChatHandler — top-level Phase 2 chat orchestrator. End-to-end flow per user message: 1. `OrchestratorAgent.classify` → RouterDecision (one of six intents). 2. Route by intent: - `chat` → no context. Pass straight to ChatbotAgent. - `structured_flow` → CatalogReader → slow path / QueryService. - `unstructured_flow` → DocumentRetriever (RAG over PGVector) → list[DocumentChunk]. - `check` → check_data / check_knowledge tool → rendered table. - `help` → Help skill: analysis state + history → streamed guidance. (`problem_statement` was removed 2026-06-24 — the goal is now user-entered `objective` + `business_questions` captured at onboarding, with no agent skill.) 3. `ChatbotAgent.astream` → yield text tokens. 4. Wrap each step into an SSE-style event dict so the API endpoint can stream them as Server-Sent Events. The chat endpoint (`src/api/v1/chat.py`) calls `ChatHandler.handle(...)` per request, behind two endpoint-level pre-filters: a greeting/farewell short-circuit and a Redis response cache (both skip the LLM on a hit). All dependencies are injectable for tests. Default constructors lazy-build production deps (no `Settings()` triggered at import time as long as you inject mocks). """ from __future__ import annotations import asyncio import json from collections.abc import AsyncIterator, Callable from typing import TYPE_CHECKING, Any from langchain_core.messages import BaseMessage from src.middlewares.logging import get_logger from src.retrieval.base import RetrievalResult from .chatbot import ChatbotAgent, DocumentChunk from .handlers.check import run_check from .handlers.help import HelpAgent # `run_problem_statement` unwired 2026-06-24 (problem_statement removed from the router). # `ProblemStatementAgent` kept — still referenced by the constructor + _get_ps_agent. from .handlers.problem_statement import ProblemStatementAgent from .orchestration import OrchestratorAgent if TYPE_CHECKING: from ..catalog.reader import CatalogReader from ..query.service import QueryService from ..retrieval.router import RetrievalRouter from .gate import AnalysisState from .slow_path.coordinator import SlowPathCoordinator from .slow_path.store import ReportInputStore logger = get_logger("chat_handler") class ChatHandler: """Top-level chat orchestrator. Returns an `AsyncIterator[dict]` of SSE-style events with shape `{"event": , "data": }`. Event types: - `intent` — emitted once after classification (JSON-encoded decision) - `sources` — JSON array of source refs (one per structured table, or per (document_id, page_label) for unstructured) - `chunk` — text fragment of the streaming answer (one per token) - `done` — end of stream (data is empty string) - `error` — failure; data is a user-facing message """ def __init__( self, intent_router: OrchestratorAgent | None = None, answer_agent: ChatbotAgent | None = None, catalog_reader: CatalogReader | None = None, query_service: QueryService | None = None, document_retriever: RetrievalRouter | None = None, *, enable_slow_path: bool = False, slow_path_coordinator_factory: ( Callable[[str], SlowPathCoordinator] | None ) = None, analysis_store: ReportInputStore | None = None, check_invoker_factory: Callable[[str], Any] | None = None, ps_agent: ProblemStatementAgent | None = None, help_agent: HelpAgent | None = None, state_store: Any | None = None, binding_store: Any | None = None, enable_gate: bool = False, enable_tracing: bool = False, ) -> None: self._intent_router = intent_router self._answer_agent = answer_agent self._catalog_reader = catalog_reader self._query_service = query_service self._document_retriever = document_retriever # Langfuse tracing (tokens + latency). OFF by default so tests never hit # Langfuse; the live endpoint opts in with ChatHandler(enable_tracing=True). self._enable_tracing = enable_tracing # Slow analytical path (Planner -> TaskRunner -> Assembler). OFF by default: # gated until the lead's real BusinessContext lands. When True, `structured` # intents route here instead of the single-query QueryService path. The # factory + store are injectable for tests. self._enable_slow_path = enable_slow_path self._slow_path_factory = slow_path_coordinator_factory self._analysis_store = analysis_store # `check` skill: builds the data-access invoker (check_data/check_knowledge) # per request with the authenticated user_id. Injectable for tests. self._check_invoker_factory = check_invoker_factory # `problem_statement` skill: LLM drafter + the Analysis State store it writes # `problem_validated` to. Both injectable for tests. self._ps_agent = ps_agent # `help` skill: LLM guide that reads the Analysis State + chat history. self._help_agent = help_agent self._state_store = state_store # `#10` data-source binding: scopes structured_flow's catalog to the sources # the analysis is bound to. Injectable for tests; fail-open when absent. self._binding_store = binding_store # Deterministic gate — DEPRECATED 2026-06-24 (problem_validated gate removed). # Unused flag; the gate call site in handle() is commented out. self._enable_gate = enable_gate # ------------------------------------------------------------------ # Lazy default-dep builders # ------------------------------------------------------------------ def _get_intent_router(self) -> OrchestratorAgent: if self._intent_router is None: self._intent_router = OrchestratorAgent() return self._intent_router def _get_answer_agent(self) -> ChatbotAgent: if self._answer_agent is None: self._answer_agent = ChatbotAgent() return self._answer_agent def _get_catalog_reader(self) -> CatalogReader: if self._catalog_reader is None: from ..catalog.reader import CatalogReader from ..catalog.store import CatalogStore self._catalog_reader = CatalogReader(CatalogStore()) return self._catalog_reader def _get_query_service(self) -> QueryService: if self._query_service is None: from ..query.service import QueryService self._query_service = QueryService() return self._query_service def _get_document_retriever(self) -> RetrievalRouter: if self._document_retriever is None: from ..retrieval.router import RetrievalRouter self._document_retriever = RetrievalRouter() return self._document_retriever def _get_check_invoker(self, user_id: str) -> Any: """Build the per-request data-access invoker for the `check` skill.""" if self._check_invoker_factory is not None: return self._check_invoker_factory(user_id) from ..tools.data_access import DataAccessToolInvoker return DataAccessToolInvoker(user_id, self._get_catalog_reader()) def _get_ps_agent(self) -> ProblemStatementAgent: if self._ps_agent is None: self._ps_agent = ProblemStatementAgent() return self._ps_agent def _get_help_agent(self) -> HelpAgent: if self._help_agent is None: self._help_agent = HelpAgent() return self._help_agent def _get_state_store(self) -> Any: if self._state_store is None: from .state_store import AnalysisStateStore self._state_store = AnalysisStateStore() return self._state_store def _get_binding_store(self) -> Any: if self._binding_store is None: from .binding_store import AnalysisDataSourceStore self._binding_store = AnalysisDataSourceStore() return self._binding_store async def _bound_source_ids(self, analysis_id: str | None) -> set[str]: """#10: the catalog source_ids this analysis is bound to (empty = unscoped). Fail-open: no analysis_id, no binding rows (legacy room / FE not sending ids), or a read error → empty set, which the caller treats as "whole catalog". Used to build a `_ScopedCatalogReader` so the Planner AND the data-access tools (which re-read the catalog themselves) see the same scope. """ if not analysis_id: return set() try: return set(await self._get_binding_store().get(analysis_id)) except Exception as e: # noqa: BLE001 — never block the query on this logger.warning("binding read failed — unscoped", analysis_id=analysis_id, error=str(e)) return set() async def _load_analysis_state(self, analysis_id: str | None) -> AnalysisState: """Load Analysis State for the Help skill; fail closed to a not-validated stub. Mirrors the gate's never-throw fallback so Help degrades gracefully on a missing row, a read error, or a legacy room with no `analysis_id`. """ from .gate import stub_analysis_state if not analysis_id: return stub_analysis_state(problem_validated=False) try: state = await self._get_state_store().get(analysis_id) except Exception as e: logger.warning("help state read failed — not-validated", error=str(e)) state = None return state if state is not None else stub_analysis_state(problem_validated=False) # ------------------------------------------------------------------ # Public entry # ------------------------------------------------------------------ async def handle( self, message: str, user_id: str, history: list[BaseMessage] | None = None, analysis_id: str | None = None, ) -> AsyncIterator[dict[str, Any]]: tracer = self._make_tracer(user_id, message) # ---- 1. Classify intent -------------------------------------- try: oc = tracer.callbacks() # orchestrator: PII-safe, full capture ckw = {"callbacks": oc} if oc else {} decision = await self._get_intent_router().classify(message, history, **ckw) except Exception as e: logger.error("intent classification failed", error=str(e)) yield {"event": "error", "data": f"Could not classify message: {e}"} return intent = decision.intent # ---- 1a. Ensure session state row (T-A) ---------------------- # Rooms created via /room/create have no `analysis` row. Without one, Help and # the report_id write-back silently no-op. Lazily get-or-create it (idempotent). analysis_state: AnalysisState | None = None if analysis_id: try: analysis_state = await self._get_state_store().ensure(analysis_id, user_id) except Exception as e: logger.warning( "analysis state ensure failed", analysis_id=analysis_id, error=str(e) ) # ---- 1b. Gate (REMOVED 2026-06-24) --------------------------- # The problem_validated gate was dropped: structured_flow is no longer # redirected to problem_statement (the goal is now user-entered objective + # business_questions, no agent validation). `gate()` is neutered to a no-op; the # call site is left commented for restorability. # if self._enable_gate and analysis_id: # from .gate import gate, stub_analysis_state # # intent = gate( # intent, # analysis_state # if analysis_state is not None # else stub_analysis_state(problem_validated=False), # ) # The `intent` event is consumed by the endpoint (it gates response caching # on the effective intent) and is NOT forwarded to the frontend. We emit the # post-gate intent so the cache keys on what actually ran. event_data = decision.model_dump() event_data["intent"] = intent yield {"event": "intent", "data": json.dumps(event_data)} rewritten = decision.rewritten_query or message query_result = None chunks: list[DocumentChunk] | None = None raw_chunks: Any = None # ---- 2. Route ------------------------------------------------ if intent == "structured_flow": try: # One memoizing reader per request: the same catalog is otherwise # re-fetched from the catalog DB 4-5x across the slow-path run. This # collapses those to one round-trip per source_hint and pins a single # consistent snapshot for plan + execution. from ..catalog.reader import MemoizingCatalogReader req_reader = MemoizingCatalogReader(self._get_catalog_reader()) # #10: scope every catalog read — the Planner's AND the data-access # tools' own re-reads — to the analysis's bound sources, so binding # is a boundary, not just a planner hint (T-B). Fail-open (T-C). bound = await self._bound_source_ids(analysis_id) reader = _ScopedCatalogReader(req_reader, bound) if bound else req_reader catalog = await reader.read(user_id, "structured") if self._enable_slow_path: async for event in self._run_slow_path( user_id, rewritten, catalog, tracer, reader, analysis_id ): yield event return query_result = await self._get_query_service().run( user_id, rewritten, catalog ) except Exception as e: logger.error( "structured route failed", user_id=user_id, error=str(e), ) yield {"event": "error", "data": f"Structured query failed: {e}"} return elif intent == "unstructured_flow": try: raw_chunks = await self._get_document_retriever().retrieve( rewritten, user_id ) chunks = _normalize_chunks(raw_chunks) except Exception as e: logger.error( "unstructured route failed", user_id=user_id, error=str(e) ) yield {"event": "error", "data": f"Document retrieval failed: {e}"} return elif intent == "check": try: invoker = self._get_check_invoker(user_id) text = await run_check(rewritten, invoker) except Exception as e: logger.error("check route failed", user_id=user_id, error=str(e)) yield {"event": "error", "data": f"Lookup failed: {e}"} return yield {"event": "chunk", "data": text} yield {"event": "done", "data": ""} return # problem_statement dispatch removed 2026-06-24 (skill unwired; intent no longer # emitted by the router). Branch kept commented for restorability. # elif intent == "problem_statement": # try: # text = await run_problem_statement( # message, # analysis_id, # agent=self._get_ps_agent(), # store=self._get_state_store(), # history=history, # ) # except Exception as e: # logger.error("problem_statement route failed", user_id=user_id, error=str(e)) # yield {"event": "error", "data": f"Problem statement failed: {e}"} # return # yield {"event": "chunk", "data": text} # yield {"event": "done", "data": ""} # return elif intent == "help": try: state = analysis_state or await self._load_analysis_state(analysis_id) except Exception as e: logger.error("help route failed", user_id=user_id, error=str(e)) yield {"event": "error", "data": f"Help failed: {e}"} return # report_ready (seam #5): deterministic — validated goal + ≥1 recorded # analysis (mirrors the report API's own 409 gate). Never-throws (fails # closed to not-ready), so Help degrades safely. The consistency guard in # HelpAgent only offers `generate_report` when this says ready. from .report.readiness import is_report_ready report_ready = await is_report_ready(analysis_id, state) # The prompt sees chat history -> masked. hc = tracer.callbacks(masked=True) hkw = {"callbacks": hc} if hc else {} try: async for token in self._get_help_agent().astream( state, history=history, message=message, report_ready=report_ready, **hkw, ): yield {"event": "chunk", "data": token} except Exception as e: logger.error("help streaming failed", user_id=user_id, error=str(e)) yield {"event": "error", "data": f"Help generation failed: {e}"} return tracer.end() yield {"event": "done", "data": ""} return # else: chat path — no context # ---- 2b. Emit sources --------------------------------------- sources = _build_sources(intent, user_id, query_result, raw_chunks) logger.info( "built sources", intent=intent, sources_count=len(sources), raw_chunks_count=len(raw_chunks) if raw_chunks else 0, ) yield {"event": "sources", "data": json.dumps(sources)} # ---- 3. Stream answer ---------------------------------------- # masked: the answer call sees real query rows / doc chunks (possible PII). mc = tracer.callbacks(masked=True) akw = {"callbacks": mc} if mc else {} try: async for token in self._get_answer_agent().astream( message, history=history, query_result=query_result, chunks=chunks, **akw, ): yield {"event": "chunk", "data": token} except Exception as e: logger.error("answer streaming failed", user_id=user_id, error=str(e)) yield {"event": "error", "data": f"Answer generation failed: {e}"} return tracer.end() yield {"event": "done", "data": ""} # ------------------------------------------------------------------ # Slow analytical path (gated, off by default) # ------------------------------------------------------------------ def _make_tracer(self, user_id: str, question: str) -> Any: """One Langfuse trace per request (or a NullTracer when disabled).""" if not self._enable_tracing: from ..observability.langfuse.tracing import NullTracer return NullTracer() from ..observability.langfuse.tracing import RequestTracer return RequestTracer.start(user_id=user_id, question=question) def _get_slow_path_coordinator( self, user_id: str, tracer: Any = None, catalog_reader: CatalogReader | None = None ) -> SlowPathCoordinator: """Build the per-request slow-path coordinator (composition root). The data-access tools need the authenticated `user_id` + `CatalogReader`, so the `CompositeToolInvoker` is constructed per request. The slow-path agent code stays tool-agnostic (INV-7) — only here, the composition root, do we name concrete tool implementations. When tracing is active the invoker is wrapped so each tool call records a metadata-only span. """ if self._slow_path_factory is not None: return self._slow_path_factory(user_id) from ..tools.data_access import DataAccessToolInvoker from ..tools.invoker import AnalyticsToolInvoker, CompositeToolInvoker from .planner.registry import default_registry from .planner.service import PlannerService from .slow_path.assembler import Assembler from .slow_path.coordinator import SlowPathCoordinator from .slow_path.task_runner import TaskRunner invoker: Any = CompositeToolInvoker( DataAccessToolInvoker(user_id, catalog_reader or self._get_catalog_reader()), AnalyticsToolInvoker(), ) if tracer is not None and getattr(tracer, "active", False): from ..observability.langfuse.tracing import TracingToolInvoker invoker = TracingToolInvoker(invoker, tracer) registry = default_registry() return SlowPathCoordinator( PlannerService(), TaskRunner(invoker, registry), Assembler(), registry ) def _get_analysis_store(self) -> ReportInputStore: if self._analysis_store is None: from .slow_path.store import PostgresReportInputStore self._analysis_store = PostgresReportInputStore() return self._analysis_store async def _run_slow_path( self, user_id: str, query: str, catalog: Any, tracer: Any = None, catalog_reader: CatalogReader | None = None, analysis_id: str | None = None, ) -> AsyncIterator[dict[str, Any]]: """Run the slow path and stream its assembled answer as SSE events. Context comes from the `get_business_context` seam (a stub today); the `analysis_record` is persisted via the `ReportInputStore` seam (PostgresReportInputStore), stamped with the request's user_id + analysis_id so the report can group it. `chat_answer` is emitted as a single `chunk` (the Assembler returns the whole object — true token streaming is a later step). """ from .planner.business_context import get_business_context from .planner.inputs import Constraints if tracer is None: from ..observability.langfuse.tracing import NullTracer tracer = NullTracer() coordinator = self._get_slow_path_coordinator(user_id, tracer, catalog_reader) context = await get_business_context(user_id) # DB3: warm the user's DB connection in parallel with planning so the # handshake overlaps the ~4s Planner call. Default path only — an injected # coordinator factory (tests / custom) may not use the real DbExecutor. if self._slow_path_factory is None: from ..query.executor.db import DbExecutor asyncio.create_task(DbExecutor.prewarm(catalog, user_id)) # noqa: RUF006 pc = tracer.callbacks() # planner: PII-safe, full capture ac = tracer.callbacks(masked=True) # assembler: sees real rows -> masked run_kw: dict[str, Any] = {} if pc: run_kw["planner_callbacks"] = pc if ac: run_kw["assembler_callbacks"] = ac # R4: bridge the coordinator's per-stage progress callback to SSE `status` # events so the stream isn't silent for ~12s (and proxies don't drop the # idle connection). Status events only appear if the coordinator calls back. progress_q: asyncio.Queue[str] = asyncio.Queue() async def _progress(stage: str) -> None: await progress_q.put(stage) run_task = asyncio.create_task( coordinator.run( context, catalog, query, Constraints(), progress=_progress, **run_kw ) ) getter: asyncio.Task = asyncio.create_task(progress_q.get()) pending: set[asyncio.Task] = {run_task, getter} while True: done, pending = await asyncio.wait( pending, return_when=asyncio.FIRST_COMPLETED ) if getter in done: yield {"event": "status", "data": getter.result()} getter = asyncio.create_task(progress_q.get()) pending = pending | {getter} if run_task in done: getter.cancel() while not progress_q.empty(): yield {"event": "status", "data": progress_q.get_nowait()} break try: result = run_task.result() except Exception as e: logger.error("slow path failed", user_id=user_id, error=str(e)) yield {"event": "error", "data": f"Analysis failed: {e}"} return yield {"event": "sources", "data": json.dumps([])} # TODO: derive from record yield {"event": "chunk", "data": result.chat_answer} try: # Stamp identity from the request scope: owner + the shared session id # (analysis_id == room_id). Without analysis_id the record is orphaned — # list_for_analysis can't find it, so the report + is_report_ready go # blind. The store is never-throw. record = result.analysis_record.model_copy( update={"user_id": user_id, "analysis_id": analysis_id} ) await self._get_analysis_store().save(record) except Exception as e: # persistence must never break the user's answer logger.error("analysis_record persist failed", user_id=user_id, error=str(e)) tracer.end() # output omitted (chat_answer may contain PII on Cloud) yield {"event": "done", "data": ""} class _ScopedCatalogReader: """Wraps a CatalogReader, restricting `structured` reads to an analysis's bound sources (#10). Scoping lives here — not at a single call site — so the Planner AND the data-access tools (which re-read the catalog themselves) see the same scoped view; otherwise binding is only a hint to the Planner while the executor runs against the full catalog. Fail-open: an empty or fully-disjoint binding yields the whole catalog, so a stale / cross-source binding degrades instead of emptying the catalog. Only `structured` reads are scoped (all #10 binds today); `unstructured` / retrieval reads pass through. """ def __init__(self, inner: Any, bound: set[str]) -> None: self._inner = inner self._bound = bound async def read(self, user_id: str, source_hint: str) -> Any: catalog = await self._inner.read(user_id, source_hint) if not self._bound or source_hint != "structured": return catalog scoped = [s for s in catalog.sources if s.source_id in self._bound] return catalog.model_copy(update={"sources": scoped or catalog.sources}) def _build_sources( intent: str, user_id: str, query_result: Any, raw_chunks: Any, ) -> list[dict[str, Any]]: """Build the sources payload for the SSE `sources` event. - structured_flow: one entry per executed table (table_name only). - unstructured_flow: deduped by (document_id, page_label), Phase 1 shape. - chat or error: empty list. """ if intent == "structured_flow": if query_result is None or getattr(query_result, "error", None): return [] table_name = getattr(query_result, "table_name", "") or "" if not table_name: return [] return [{ "document_id": f"{user_id}_{table_name}", "filename": table_name, "page_label": None, }] if intent == "unstructured_flow" and raw_chunks: seen: set[tuple[Any, Any]] = set() sources: list[dict[str, Any]] = [] for item in raw_chunks: if isinstance(item, RetrievalResult): data = item.metadata.get("data", {}) elif isinstance(item, dict): data = item else: continue key = (data.get("document_id"), data.get("page_label")) if key in seen or key == (None, None): continue seen.add(key) sources.append({ "document_id": data.get("document_id"), "filename": data.get("filename", "Unknown"), "page_label": data.get("page_label", "Unknown"), }) return sources return [] def _normalize_chunks(raw: Any) -> list[DocumentChunk]: """Convert whatever the retriever returns into list[DocumentChunk]. The Phase 2 `DocumentRetriever.retrieve` interface is a stub today; when TAB owner ships it, it should return `list[DocumentChunk]` directly so this normalizer becomes a no-op. Until then we coerce common shapes (dict-with-content, plain string) defensively. """ if not raw: return [] if isinstance(raw, list) and all(isinstance(c, DocumentChunk) for c in raw): return raw chunks: list[DocumentChunk] = [] for item in raw: if isinstance(item, DocumentChunk): chunks.append(item) elif isinstance(item, dict): chunks.append( DocumentChunk( content=str(item.get("content", "")), filename=item.get("filename"), page_label=item.get("page_label"), ) ) elif isinstance(item, RetrievalResult): data = item.metadata.get("data", {}) page = data.get("page_label") chunks.append(DocumentChunk( content=item.content, filename=data.get("filename"), page_label=str(page) if page is not None else None, )) elif isinstance(item, str): chunks.append(DocumentChunk(content=item)) return chunks