Rifqi Hafizuddin
[NOTICKET] fix(db): reconcile analyses schema + migrate chat to analyses_messages
283eb0e | """ChatHandler — top-level Phase 2 chat orchestrator. | |
| End-to-end flow per user message: | |
| 1. `OrchestratorAgent.classify` → RouterDecision (one of six intents). | |
| 2. Route by intent: | |
| - `chat` → no context. Pass straight to ChatbotAgent. | |
| - `structured_flow` → CatalogReader → slow path / QueryService. | |
| - `unstructured_flow` → DocumentRetriever (RAG over PGVector) → | |
| list[DocumentChunk]. | |
| - `check` → check_data / check_knowledge tool → rendered table. | |
| - `help` → Help skill: analysis state + history → streamed guidance. | |
| (`problem_statement` was removed 2026-06-24 — the goal is now user-entered | |
| `objective` + `business_questions` captured at onboarding, with no agent skill.) | |
| 3. `ChatbotAgent.astream` → yield text tokens. | |
| 4. Wrap each step into an SSE-style event dict so the API endpoint can | |
| stream them as Server-Sent Events. | |
| The chat endpoint (`src/api/v1/chat.py`) calls `ChatHandler.handle(...)` per | |
| request, behind two endpoint-level pre-filters: a greeting/farewell | |
| short-circuit and a Redis response cache (both skip the LLM on a hit). | |
| All dependencies are injectable for tests. Default constructors lazy-build | |
| production deps (no `Settings()` triggered at import time as long as you | |
| inject mocks). | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| from collections.abc import AsyncIterator, Callable | |
| from typing import TYPE_CHECKING, Any | |
| from langchain_core.messages import BaseMessage | |
| from src.middlewares.logging import get_logger | |
| from src.retrieval.base import RetrievalResult | |
| from .chatbot import ChatbotAgent, DocumentChunk | |
| from .handlers.check import run_check | |
| from .handlers.help import HelpAgent | |
| # `run_problem_statement` unwired 2026-06-24 (problem_statement removed from the router). | |
| # `ProblemStatementAgent` kept — still referenced by the constructor + _get_ps_agent. | |
| from .handlers.problem_statement import ProblemStatementAgent | |
| from .orchestration import OrchestratorAgent | |
| if TYPE_CHECKING: | |
| from ..catalog.reader import CatalogReader | |
| from ..query.service import QueryService | |
| from ..retrieval.router import RetrievalRouter | |
| from .gate import AnalysisState | |
| from .slow_path.coordinator import SlowPathCoordinator | |
| from .slow_path.store import ReportInputStore | |
| logger = get_logger("chat_handler") | |
| class ChatHandler: | |
| """Top-level chat orchestrator. | |
| Returns an `AsyncIterator[dict]` of SSE-style events with shape | |
| `{"event": <name>, "data": <str>}`. Event types: | |
| - `intent` — emitted once after classification (JSON-encoded decision) | |
| - `sources` — JSON array of source refs (one per structured table, or | |
| per (document_id, page_label) for unstructured) | |
| - `chunk` — text fragment of the streaming answer (one per token) | |
| - `done` — end of stream (data is empty string) | |
| - `error` — failure; data is a user-facing message | |
| """ | |
| def __init__( | |
| self, | |
| intent_router: OrchestratorAgent | None = None, | |
| answer_agent: ChatbotAgent | None = None, | |
| catalog_reader: CatalogReader | None = None, | |
| query_service: QueryService | None = None, | |
| document_retriever: RetrievalRouter | None = None, | |
| *, | |
| enable_slow_path: bool = False, | |
| slow_path_coordinator_factory: ( | |
| Callable[[str], SlowPathCoordinator] | None | |
| ) = None, | |
| analysis_store: ReportInputStore | None = None, | |
| check_invoker_factory: Callable[[str], Any] | None = None, | |
| ps_agent: ProblemStatementAgent | None = None, | |
| help_agent: HelpAgent | None = None, | |
| state_store: Any | None = None, | |
| binding_store: Any | None = None, | |
| enable_gate: bool = False, | |
| enable_tracing: bool = False, | |
| ) -> None: | |
| self._intent_router = intent_router | |
| self._answer_agent = answer_agent | |
| self._catalog_reader = catalog_reader | |
| self._query_service = query_service | |
| self._document_retriever = document_retriever | |
| # Langfuse tracing (tokens + latency). OFF by default so tests never hit | |
| # Langfuse; the live endpoint opts in with ChatHandler(enable_tracing=True). | |
| self._enable_tracing = enable_tracing | |
| # Slow analytical path (Planner -> TaskRunner -> Assembler). OFF by default: | |
| # gated until the lead's real BusinessContext lands. When True, `structured` | |
| # intents route here instead of the single-query QueryService path. The | |
| # factory + store are injectable for tests. | |
| self._enable_slow_path = enable_slow_path | |
| self._slow_path_factory = slow_path_coordinator_factory | |
| self._analysis_store = analysis_store | |
| # `check` skill: builds the data-access invoker (check_data/check_knowledge) | |
| # per request with the authenticated user_id. Injectable for tests. | |
| self._check_invoker_factory = check_invoker_factory | |
| # `problem_statement` skill: LLM drafter + the Analysis State store it writes | |
| # `problem_validated` to. Both injectable for tests. | |
| self._ps_agent = ps_agent | |
| # `help` skill: LLM guide that reads the Analysis State + chat history. | |
| self._help_agent = help_agent | |
| self._state_store = state_store | |
| # `#10` data-source binding: scopes structured_flow's catalog to the sources | |
| # the analysis is bound to. Injectable for tests; fail-open when absent. | |
| self._binding_store = binding_store | |
| # Deterministic gate — DEPRECATED 2026-06-24 (problem_validated gate removed). | |
| # Unused flag; the gate call site in handle() is commented out. | |
| self._enable_gate = enable_gate | |
| # ------------------------------------------------------------------ | |
| # Lazy default-dep builders | |
| # ------------------------------------------------------------------ | |
| def _get_intent_router(self) -> OrchestratorAgent: | |
| if self._intent_router is None: | |
| self._intent_router = OrchestratorAgent() | |
| return self._intent_router | |
| def _get_answer_agent(self) -> ChatbotAgent: | |
| if self._answer_agent is None: | |
| self._answer_agent = ChatbotAgent() | |
| return self._answer_agent | |
| def _get_catalog_reader(self) -> CatalogReader: | |
| if self._catalog_reader is None: | |
| from ..catalog.reader import CatalogReader | |
| from ..catalog.store import CatalogStore | |
| self._catalog_reader = CatalogReader(CatalogStore()) | |
| return self._catalog_reader | |
| def _get_query_service(self) -> QueryService: | |
| if self._query_service is None: | |
| from ..query.service import QueryService | |
| self._query_service = QueryService() | |
| return self._query_service | |
| def _get_document_retriever(self) -> RetrievalRouter: | |
| if self._document_retriever is None: | |
| from ..retrieval.router import RetrievalRouter | |
| self._document_retriever = RetrievalRouter() | |
| return self._document_retriever | |
| def _get_check_invoker(self, user_id: str) -> Any: | |
| """Build the per-request data-access invoker for the `check` skill.""" | |
| if self._check_invoker_factory is not None: | |
| return self._check_invoker_factory(user_id) | |
| from ..tools.data_access import DataAccessToolInvoker | |
| return DataAccessToolInvoker(user_id, self._get_catalog_reader()) | |
| def _get_ps_agent(self) -> ProblemStatementAgent: | |
| if self._ps_agent is None: | |
| self._ps_agent = ProblemStatementAgent() | |
| return self._ps_agent | |
| def _get_help_agent(self) -> HelpAgent: | |
| if self._help_agent is None: | |
| self._help_agent = HelpAgent() | |
| return self._help_agent | |
| def _get_state_store(self) -> Any: | |
| if self._state_store is None: | |
| from .state_store import AnalysisStateStore | |
| self._state_store = AnalysisStateStore() | |
| return self._state_store | |
| def _get_binding_store(self) -> Any: | |
| if self._binding_store is None: | |
| from .binding_store import AnalysisDataSourceStore | |
| self._binding_store = AnalysisDataSourceStore() | |
| return self._binding_store | |
| async def _bound_source_ids(self, analysis_id: str | None) -> set[str]: | |
| """#10: the catalog source_ids this analysis is bound to (empty = unscoped). | |
| Fail-open: no analysis_id, no binding rows (legacy room / FE not sending | |
| ids), or a read error → empty set, which the caller treats as "whole | |
| catalog". Used to build a `_ScopedCatalogReader` so the Planner AND the | |
| data-access tools (which re-read the catalog themselves) see the same scope. | |
| """ | |
| if not analysis_id: | |
| return set() | |
| try: | |
| return set(await self._get_binding_store().get(analysis_id)) | |
| except Exception as e: # noqa: BLE001 — never block the query on this | |
| logger.warning("binding read failed — unscoped", analysis_id=analysis_id, error=str(e)) | |
| return set() | |
| async def _load_analysis_state(self, analysis_id: str | None) -> AnalysisState: | |
| """Load Analysis State for the Help skill; fail closed to a not-validated stub. | |
| Mirrors the gate's never-throw fallback so Help degrades gracefully on a | |
| missing row, a read error, or a legacy room with no `analysis_id`. | |
| """ | |
| from .gate import stub_analysis_state | |
| if not analysis_id: | |
| return stub_analysis_state() | |
| try: | |
| state = await self._get_state_store().get(analysis_id) | |
| except Exception as e: | |
| logger.warning("help state read failed — not-validated", error=str(e)) | |
| state = None | |
| return state if state is not None else stub_analysis_state() | |
| # ------------------------------------------------------------------ | |
| # Public entry | |
| # ------------------------------------------------------------------ | |
| async def stream_help( | |
| self, | |
| user_id: str, | |
| analysis_id: str | None, | |
| history: list[BaseMessage] | None = None, | |
| message: str | None = None, | |
| ) -> AsyncIterator[dict[str, Any]]: | |
| """Deterministic `help` dispatch for the dedicated `/api/v1/tools/help` endpoint. | |
| Bypasses the intent router — the slash command IS the intent, so there is no | |
| classify round-trip and no misclassification risk. Streams the same guidance as | |
| the `help` branch of `handle()`, reusing the warm HelpAgent + state store. | |
| Emits SSE-style events: `sources` (always `[]` — help never references | |
| documents), `chunk`*, then `done` (data left empty; the endpoint stamps the | |
| `message_id`). On failure, yields a terminal `error` event. | |
| """ | |
| # Load (or lazily create) the analysis state; fail closed to a not-validated | |
| # stub so help degrades gracefully on a missing row / read error / legacy id. | |
| state: AnalysisState | None = None | |
| if analysis_id: | |
| try: | |
| state = await self._get_state_store().ensure(analysis_id, user_id) | |
| except Exception as e: # noqa: BLE001 — never block help on a state read | |
| logger.warning("help state ensure failed", analysis_id=analysis_id, error=str(e)) | |
| if state is None: | |
| state = await self._load_analysis_state(analysis_id) | |
| # report_ready (seam #5): deterministic, never-throws (fails closed to | |
| # not-ready) — the HelpAgent guard only offers generate_report when ready. | |
| from .report.readiness import is_report_ready | |
| report_ready = await is_report_ready(analysis_id, state) | |
| yield {"event": "sources", "data": json.dumps([])} | |
| try: | |
| async for token in self._get_help_agent().astream( | |
| state, | |
| history=history, | |
| message=message, | |
| report_ready=report_ready, | |
| ): | |
| yield {"event": "chunk", "data": token} | |
| except Exception as e: # noqa: BLE001 | |
| logger.error("help streaming failed", user_id=user_id, error=str(e)) | |
| yield {"event": "error", "data": f"Help generation failed: {e}"} | |
| return | |
| yield {"event": "done", "data": ""} | |
| async def handle( | |
| self, | |
| message: str, | |
| user_id: str, | |
| history: list[BaseMessage] | None = None, | |
| analysis_id: str | None = None, | |
| ) -> AsyncIterator[dict[str, Any]]: | |
| tracer = self._make_tracer(user_id, message) | |
| # ---- 1. Classify intent -------------------------------------- | |
| try: | |
| oc = tracer.callbacks() # orchestrator: PII-safe, full capture | |
| ckw = {"callbacks": oc} if oc else {} | |
| decision = await self._get_intent_router().classify(message, history, **ckw) | |
| except Exception as e: | |
| logger.error("intent classification failed", error=str(e)) | |
| yield {"event": "error", "data": f"Could not classify message: {e}"} | |
| return | |
| intent = decision.intent | |
| # ---- 1a. Ensure session state row (T-A) ---------------------- | |
| # Rooms created via /room/create have no `analysis` row. Without one, Help and | |
| # the report_id write-back silently no-op. Lazily get-or-create it (idempotent). | |
| analysis_state: AnalysisState | None = None | |
| if analysis_id: | |
| try: | |
| analysis_state = await self._get_state_store().ensure(analysis_id, user_id) | |
| except Exception as e: | |
| logger.warning( | |
| "analysis state ensure failed", analysis_id=analysis_id, error=str(e) | |
| ) | |
| # ---- 1b. Gate (REMOVED 2026-06-24) --------------------------- | |
| # The problem_validated gate was dropped: structured_flow is no longer | |
| # redirected to problem_statement (the goal is now user-entered objective + | |
| # business_questions, no agent validation). `gate()` is neutered to a no-op; the | |
| # call site is left commented for restorability. | |
| # if self._enable_gate and analysis_id: | |
| # from .gate import gate, stub_analysis_state | |
| # | |
| # intent = gate( | |
| # intent, | |
| # analysis_state | |
| # if analysis_state is not None | |
| # else stub_analysis_state(), | |
| # ) | |
| # The `intent` event is consumed by the endpoint (it gates response caching | |
| # on the effective intent) and is NOT forwarded to the frontend. We emit the | |
| # post-gate intent so the cache keys on what actually ran. | |
| event_data = decision.model_dump() | |
| event_data["intent"] = intent | |
| yield {"event": "intent", "data": json.dumps(event_data)} | |
| rewritten = decision.rewritten_query or message | |
| query_result = None | |
| chunks: list[DocumentChunk] | None = None | |
| raw_chunks: Any = None | |
| # ---- 2. Route ------------------------------------------------ | |
| if intent == "structured_flow": | |
| try: | |
| # One memoizing reader per request: the same catalog is otherwise | |
| # re-fetched from the catalog DB 4-5x across the slow-path run. This | |
| # collapses those to one round-trip per source_hint and pins a single | |
| # consistent snapshot for plan + execution. | |
| from ..catalog.reader import MemoizingCatalogReader | |
| req_reader = MemoizingCatalogReader(self._get_catalog_reader()) | |
| # #10: scope every catalog read — the Planner's AND the data-access | |
| # tools' own re-reads — to the analysis's bound sources, so binding | |
| # is a boundary, not just a planner hint (T-B). Fail-open (T-C). | |
| bound = await self._bound_source_ids(analysis_id) | |
| reader = _ScopedCatalogReader(req_reader, bound) if bound else req_reader | |
| catalog = await reader.read(user_id, "structured") | |
| if self._enable_slow_path: | |
| async for event in self._run_slow_path( | |
| user_id, rewritten, catalog, tracer, reader, analysis_id | |
| ): | |
| yield event | |
| return | |
| query_result = await self._get_query_service().run( | |
| user_id, rewritten, catalog | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| "structured route failed", | |
| user_id=user_id, | |
| error=str(e), | |
| ) | |
| yield {"event": "error", "data": f"Structured query failed: {e}"} | |
| return | |
| elif intent == "unstructured_flow": | |
| try: | |
| raw_chunks = await self._get_document_retriever().retrieve( | |
| rewritten, user_id | |
| ) | |
| chunks = _normalize_chunks(raw_chunks) | |
| except Exception as e: | |
| logger.error( | |
| "unstructured route failed", user_id=user_id, error=str(e) | |
| ) | |
| yield {"event": "error", "data": f"Document retrieval failed: {e}"} | |
| return | |
| elif intent == "check": | |
| try: | |
| invoker = self._get_check_invoker(user_id) | |
| text = await run_check(rewritten, invoker) | |
| except Exception as e: | |
| logger.error("check route failed", user_id=user_id, error=str(e)) | |
| yield {"event": "error", "data": f"Lookup failed: {e}"} | |
| return | |
| yield {"event": "chunk", "data": text} | |
| yield {"event": "done", "data": ""} | |
| return | |
| # problem_statement dispatch removed 2026-06-24 (skill unwired; intent no longer | |
| # emitted by the router). Branch kept commented for restorability. | |
| # elif intent == "problem_statement": | |
| # try: | |
| # text = await run_problem_statement( | |
| # message, | |
| # analysis_id, | |
| # agent=self._get_ps_agent(), | |
| # store=self._get_state_store(), | |
| # history=history, | |
| # ) | |
| # except Exception as e: | |
| # logger.error("problem_statement route failed", user_id=user_id, error=str(e)) | |
| # yield {"event": "error", "data": f"Problem statement failed: {e}"} | |
| # return | |
| # yield {"event": "chunk", "data": text} | |
| # yield {"event": "done", "data": ""} | |
| # return | |
| elif intent == "help": | |
| try: | |
| state = analysis_state or await self._load_analysis_state(analysis_id) | |
| except Exception as e: | |
| logger.error("help route failed", user_id=user_id, error=str(e)) | |
| yield {"event": "error", "data": f"Help failed: {e}"} | |
| return | |
| # report_ready (seam #5): deterministic — validated goal + ≥1 recorded | |
| # analysis (mirrors the report API's own 409 gate). Never-throws (fails | |
| # closed to not-ready), so Help degrades safely. The consistency guard in | |
| # HelpAgent only offers `generate_report` when this says ready. | |
| from .report.readiness import is_report_ready | |
| report_ready = await is_report_ready(analysis_id, state) | |
| # The prompt sees chat history -> masked. | |
| hc = tracer.callbacks(masked=True) | |
| hkw = {"callbacks": hc} if hc else {} | |
| try: | |
| async for token in self._get_help_agent().astream( | |
| state, | |
| history=history, | |
| message=message, | |
| report_ready=report_ready, | |
| **hkw, | |
| ): | |
| yield {"event": "chunk", "data": token} | |
| except Exception as e: | |
| logger.error("help streaming failed", user_id=user_id, error=str(e)) | |
| yield {"event": "error", "data": f"Help generation failed: {e}"} | |
| return | |
| tracer.end() | |
| yield {"event": "done", "data": ""} | |
| return | |
| # else: chat path — no context | |
| # ---- 2b. Emit sources --------------------------------------- | |
| sources = _build_sources(intent, user_id, query_result, raw_chunks) | |
| logger.info( | |
| "built sources", | |
| intent=intent, | |
| sources_count=len(sources), | |
| raw_chunks_count=len(raw_chunks) if raw_chunks else 0, | |
| ) | |
| yield {"event": "sources", "data": json.dumps(sources)} | |
| # ---- 3. Stream answer ---------------------------------------- | |
| # masked: the answer call sees real query rows / doc chunks (possible PII). | |
| mc = tracer.callbacks(masked=True) | |
| akw = {"callbacks": mc} if mc else {} | |
| try: | |
| async for token in self._get_answer_agent().astream( | |
| message, | |
| history=history, | |
| query_result=query_result, | |
| chunks=chunks, | |
| **akw, | |
| ): | |
| yield {"event": "chunk", "data": token} | |
| except Exception as e: | |
| logger.error("answer streaming failed", user_id=user_id, error=str(e)) | |
| yield {"event": "error", "data": f"Answer generation failed: {e}"} | |
| return | |
| tracer.end() | |
| yield {"event": "done", "data": ""} | |
| # ------------------------------------------------------------------ | |
| # Slow analytical path (gated, off by default) | |
| # ------------------------------------------------------------------ | |
| def _make_tracer(self, user_id: str, question: str) -> Any: | |
| """One Langfuse trace per request (or a NullTracer when disabled).""" | |
| if not self._enable_tracing: | |
| from ..observability.langfuse.tracing import NullTracer | |
| return NullTracer() | |
| from ..observability.langfuse.tracing import RequestTracer | |
| return RequestTracer.start(user_id=user_id, question=question) | |
| def _get_slow_path_coordinator( | |
| self, user_id: str, tracer: Any = None, catalog_reader: CatalogReader | None = None | |
| ) -> SlowPathCoordinator: | |
| """Build the per-request slow-path coordinator (composition root). | |
| The data-access tools need the authenticated `user_id` + `CatalogReader`, | |
| so the `CompositeToolInvoker` is constructed per request. The slow-path | |
| agent code stays tool-agnostic (INV-7) — only here, the composition root, | |
| do we name concrete tool implementations. When tracing is active the invoker | |
| is wrapped so each tool call records a metadata-only span. | |
| """ | |
| if self._slow_path_factory is not None: | |
| return self._slow_path_factory(user_id) | |
| from ..tools.data_access import DataAccessToolInvoker | |
| from ..tools.invoker import AnalyticsToolInvoker, CompositeToolInvoker | |
| from .planner.registry import default_registry | |
| from .planner.service import PlannerService | |
| from .slow_path.assembler import Assembler | |
| from .slow_path.coordinator import SlowPathCoordinator | |
| from .slow_path.task_runner import TaskRunner | |
| invoker: Any = CompositeToolInvoker( | |
| DataAccessToolInvoker(user_id, catalog_reader or self._get_catalog_reader()), | |
| AnalyticsToolInvoker(), | |
| ) | |
| if tracer is not None and getattr(tracer, "active", False): | |
| from ..observability.langfuse.tracing import TracingToolInvoker | |
| invoker = TracingToolInvoker(invoker, tracer) | |
| registry = default_registry() | |
| return SlowPathCoordinator( | |
| PlannerService(), TaskRunner(invoker, registry), Assembler(), registry | |
| ) | |
| def _get_analysis_store(self) -> ReportInputStore: | |
| if self._analysis_store is None: | |
| from .slow_path.store import PostgresReportInputStore | |
| self._analysis_store = PostgresReportInputStore() | |
| return self._analysis_store | |
| async def _run_slow_path( | |
| self, | |
| user_id: str, | |
| query: str, | |
| catalog: Any, | |
| tracer: Any = None, | |
| catalog_reader: CatalogReader | None = None, | |
| analysis_id: str | None = None, | |
| ) -> AsyncIterator[dict[str, Any]]: | |
| """Run the slow path and stream its assembled answer as SSE events. | |
| Context comes from the `get_business_context` seam (a stub today); the | |
| `analysis_record` is persisted via the `ReportInputStore` seam (PostgresReportInputStore), | |
| stamped with the request's user_id + analysis_id so the report can group it. | |
| `chat_answer` is emitted as a single `chunk` (the Assembler returns the whole | |
| object — true token streaming is a later step). | |
| """ | |
| from .planner.business_context import get_business_context | |
| from .planner.inputs import Constraints | |
| if tracer is None: | |
| from ..observability.langfuse.tracing import NullTracer | |
| tracer = NullTracer() | |
| coordinator = self._get_slow_path_coordinator(user_id, tracer, catalog_reader) | |
| context = await get_business_context(user_id) | |
| # DB3: warm the user's DB connection in parallel with planning so the | |
| # handshake overlaps the ~4s Planner call. Default path only — an injected | |
| # coordinator factory (tests / custom) may not use the real DbExecutor. | |
| if self._slow_path_factory is None: | |
| from ..query.executor.db import DbExecutor | |
| asyncio.create_task(DbExecutor.prewarm(catalog, user_id)) # noqa: RUF006 | |
| pc = tracer.callbacks() # planner: PII-safe, full capture | |
| ac = tracer.callbacks(masked=True) # assembler: sees real rows -> masked | |
| run_kw: dict[str, Any] = {} | |
| if pc: | |
| run_kw["planner_callbacks"] = pc | |
| if ac: | |
| run_kw["assembler_callbacks"] = ac | |
| # R4: bridge the coordinator's per-stage progress callback to SSE `status` | |
| # events so the stream isn't silent for ~12s (and proxies don't drop the | |
| # idle connection). Status events only appear if the coordinator calls back. | |
| progress_q: asyncio.Queue[str] = asyncio.Queue() | |
| async def _progress(stage: str) -> None: | |
| await progress_q.put(stage) | |
| run_task = asyncio.create_task( | |
| coordinator.run( | |
| context, catalog, query, Constraints(), progress=_progress, **run_kw | |
| ) | |
| ) | |
| getter: asyncio.Task = asyncio.create_task(progress_q.get()) | |
| pending: set[asyncio.Task] = {run_task, getter} | |
| while True: | |
| done, pending = await asyncio.wait( | |
| pending, return_when=asyncio.FIRST_COMPLETED | |
| ) | |
| if getter in done: | |
| yield {"event": "status", "data": getter.result()} | |
| getter = asyncio.create_task(progress_q.get()) | |
| pending = pending | {getter} | |
| if run_task in done: | |
| getter.cancel() | |
| while not progress_q.empty(): | |
| yield {"event": "status", "data": progress_q.get_nowait()} | |
| break | |
| try: | |
| result = run_task.result() | |
| except Exception as e: | |
| logger.error("slow path failed", user_id=user_id, error=str(e)) | |
| yield {"event": "error", "data": f"Analysis failed: {e}"} | |
| return | |
| yield {"event": "sources", "data": json.dumps([])} # TODO: derive from record | |
| yield {"event": "chunk", "data": result.chat_answer} | |
| try: | |
| # Stamp identity from the request scope: owner + the shared session id | |
| # (analysis_id == room_id). Without analysis_id the record is orphaned — | |
| # list_for_analysis can't find it, so the report + is_report_ready go | |
| # blind. The store is never-throw. | |
| record = result.analysis_record.model_copy( | |
| update={"user_id": user_id, "analysis_id": analysis_id} | |
| ) | |
| await self._get_analysis_store().save(record) | |
| except Exception as e: # persistence must never break the user's answer | |
| logger.error("analysis_record persist failed", user_id=user_id, error=str(e)) | |
| tracer.end() # output omitted (chat_answer may contain PII on Cloud) | |
| yield {"event": "done", "data": ""} | |
| class _ScopedCatalogReader: | |
| """Wraps a CatalogReader, restricting `structured` reads to an analysis's bound | |
| sources (#10). | |
| Scoping lives here — not at a single call site — so the Planner AND the | |
| data-access tools (which re-read the catalog themselves) see the same scoped | |
| view; otherwise binding is only a hint to the Planner while the executor runs | |
| against the full catalog. Fail-open: an empty or fully-disjoint binding yields | |
| the whole catalog, so a stale / cross-source binding degrades instead of | |
| emptying the catalog. Only `structured` reads are scoped (all #10 binds today); | |
| `unstructured` / retrieval reads pass through. | |
| """ | |
| def __init__(self, inner: Any, bound: set[str]) -> None: | |
| self._inner = inner | |
| self._bound = bound | |
| async def read(self, user_id: str, source_hint: str) -> Any: | |
| catalog = await self._inner.read(user_id, source_hint) | |
| if not self._bound or source_hint != "structured": | |
| return catalog | |
| scoped = [s for s in catalog.sources if s.source_id in self._bound] | |
| return catalog.model_copy(update={"sources": scoped or catalog.sources}) | |
| def _build_sources( | |
| intent: str, | |
| user_id: str, | |
| query_result: Any, | |
| raw_chunks: Any, | |
| ) -> list[dict[str, Any]]: | |
| """Build the sources payload for the SSE `sources` event. | |
| - structured_flow: one entry per executed table (table_name only). | |
| - unstructured_flow: deduped by (document_id, page_label), Phase 1 shape. | |
| - chat or error: empty list. | |
| """ | |
| if intent == "structured_flow": | |
| if query_result is None or getattr(query_result, "error", None): | |
| return [] | |
| table_name = getattr(query_result, "table_name", "") or "" | |
| if not table_name: | |
| return [] | |
| return [{ | |
| "document_id": f"{user_id}_{table_name}", | |
| "filename": table_name, | |
| "page_label": None, | |
| }] | |
| if intent == "unstructured_flow" and raw_chunks: | |
| seen: set[tuple[Any, Any]] = set() | |
| sources: list[dict[str, Any]] = [] | |
| for item in raw_chunks: | |
| if isinstance(item, RetrievalResult): | |
| data = item.metadata.get("data", {}) | |
| elif isinstance(item, dict): | |
| data = item | |
| else: | |
| continue | |
| key = (data.get("document_id"), data.get("page_label")) | |
| if key in seen or key == (None, None): | |
| continue | |
| seen.add(key) | |
| sources.append({ | |
| "document_id": data.get("document_id"), | |
| "filename": data.get("filename", "Unknown"), | |
| "page_label": data.get("page_label", "Unknown"), | |
| }) | |
| return sources | |
| return [] | |
| def _normalize_chunks(raw: Any) -> list[DocumentChunk]: | |
| """Convert whatever the retriever returns into list[DocumentChunk]. | |
| The Phase 2 `DocumentRetriever.retrieve` interface is a stub today; | |
| when TAB owner ships it, it should return `list[DocumentChunk]` | |
| directly so this normalizer becomes a no-op. Until then we coerce | |
| common shapes (dict-with-content, plain string) defensively. | |
| """ | |
| if not raw: | |
| return [] | |
| if isinstance(raw, list) and all(isinstance(c, DocumentChunk) for c in raw): | |
| return raw | |
| chunks: list[DocumentChunk] = [] | |
| for item in raw: | |
| if isinstance(item, DocumentChunk): | |
| chunks.append(item) | |
| elif isinstance(item, dict): | |
| chunks.append( | |
| DocumentChunk( | |
| content=str(item.get("content", "")), | |
| filename=item.get("filename"), | |
| page_label=item.get("page_label"), | |
| ) | |
| ) | |
| elif isinstance(item, RetrievalResult): | |
| data = item.metadata.get("data", {}) | |
| page = data.get("page_label") | |
| chunks.append(DocumentChunk( | |
| content=item.content, | |
| filename=data.get("filename"), | |
| page_label=str(page) if page is not None else None, | |
| )) | |
| elif isinstance(item, str): | |
| chunks.append(DocumentChunk(content=item)) | |
| return chunks | |