Rifqi Hafizuddin
[NOTICKET] fix(db): reconcile analyses schema + migrate chat to analyses_messages
283eb0e
Raw
History Blame
32.7 kB
"""ChatHandler — top-level Phase 2 chat orchestrator.
End-to-end flow per user message:
1. `OrchestratorAgent.classify` → RouterDecision (one of six intents).
2. Route by intent:
- `chat` → no context. Pass straight to ChatbotAgent.
- `structured_flow` → CatalogReader → slow path / QueryService.
- `unstructured_flow` → DocumentRetriever (RAG over PGVector) →
list[DocumentChunk].
- `check` → check_data / check_knowledge tool → rendered table.
- `help` → Help skill: analysis state + history → streamed guidance.
(`problem_statement` was removed 2026-06-24 — the goal is now user-entered
`objective` + `business_questions` captured at onboarding, with no agent skill.)
3. `ChatbotAgent.astream` → yield text tokens.
4. Wrap each step into an SSE-style event dict so the API endpoint can
stream them as Server-Sent Events.
The chat endpoint (`src/api/v1/chat.py`) calls `ChatHandler.handle(...)` per
request, behind two endpoint-level pre-filters: a greeting/farewell
short-circuit and a Redis response cache (both skip the LLM on a hit).
All dependencies are injectable for tests. Default constructors lazy-build
production deps (no `Settings()` triggered at import time as long as you
inject mocks).
"""
from __future__ import annotations
import asyncio
import json
from collections.abc import AsyncIterator, Callable
from typing import TYPE_CHECKING, Any
from langchain_core.messages import BaseMessage
from src.middlewares.logging import get_logger
from src.retrieval.base import RetrievalResult
from .chatbot import ChatbotAgent, DocumentChunk
from .handlers.check import run_check
from .handlers.help import HelpAgent
# `run_problem_statement` unwired 2026-06-24 (problem_statement removed from the router).
# `ProblemStatementAgent` kept — still referenced by the constructor + _get_ps_agent.
from .handlers.problem_statement import ProblemStatementAgent
from .orchestration import OrchestratorAgent
if TYPE_CHECKING:
from ..catalog.reader import CatalogReader
from ..query.service import QueryService
from ..retrieval.router import RetrievalRouter
from .gate import AnalysisState
from .slow_path.coordinator import SlowPathCoordinator
from .slow_path.store import ReportInputStore
logger = get_logger("chat_handler")
class ChatHandler:
"""Top-level chat orchestrator.
Returns an `AsyncIterator[dict]` of SSE-style events with shape
`{"event": <name>, "data": <str>}`. Event types:
- `intent` — emitted once after classification (JSON-encoded decision)
- `sources` — JSON array of source refs (one per structured table, or
per (document_id, page_label) for unstructured)
- `chunk` — text fragment of the streaming answer (one per token)
- `done` — end of stream (data is empty string)
- `error` — failure; data is a user-facing message
"""
def __init__(
self,
intent_router: OrchestratorAgent | None = None,
answer_agent: ChatbotAgent | None = None,
catalog_reader: CatalogReader | None = None,
query_service: QueryService | None = None,
document_retriever: RetrievalRouter | None = None,
*,
enable_slow_path: bool = False,
slow_path_coordinator_factory: (
Callable[[str], SlowPathCoordinator] | None
) = None,
analysis_store: ReportInputStore | None = None,
check_invoker_factory: Callable[[str], Any] | None = None,
ps_agent: ProblemStatementAgent | None = None,
help_agent: HelpAgent | None = None,
state_store: Any | None = None,
binding_store: Any | None = None,
enable_gate: bool = False,
enable_tracing: bool = False,
) -> None:
self._intent_router = intent_router
self._answer_agent = answer_agent
self._catalog_reader = catalog_reader
self._query_service = query_service
self._document_retriever = document_retriever
# Langfuse tracing (tokens + latency). OFF by default so tests never hit
# Langfuse; the live endpoint opts in with ChatHandler(enable_tracing=True).
self._enable_tracing = enable_tracing
# Slow analytical path (Planner -> TaskRunner -> Assembler). OFF by default:
# gated until the lead's real BusinessContext lands. When True, `structured`
# intents route here instead of the single-query QueryService path. The
# factory + store are injectable for tests.
self._enable_slow_path = enable_slow_path
self._slow_path_factory = slow_path_coordinator_factory
self._analysis_store = analysis_store
# `check` skill: builds the data-access invoker (check_data/check_knowledge)
# per request with the authenticated user_id. Injectable for tests.
self._check_invoker_factory = check_invoker_factory
# `problem_statement` skill: LLM drafter + the Analysis State store it writes
# `problem_validated` to. Both injectable for tests.
self._ps_agent = ps_agent
# `help` skill: LLM guide that reads the Analysis State + chat history.
self._help_agent = help_agent
self._state_store = state_store
# `#10` data-source binding: scopes structured_flow's catalog to the sources
# the analysis is bound to. Injectable for tests; fail-open when absent.
self._binding_store = binding_store
# Deterministic gate — DEPRECATED 2026-06-24 (problem_validated gate removed).
# Unused flag; the gate call site in handle() is commented out.
self._enable_gate = enable_gate
# ------------------------------------------------------------------
# Lazy default-dep builders
# ------------------------------------------------------------------
def _get_intent_router(self) -> OrchestratorAgent:
if self._intent_router is None:
self._intent_router = OrchestratorAgent()
return self._intent_router
def _get_answer_agent(self) -> ChatbotAgent:
if self._answer_agent is None:
self._answer_agent = ChatbotAgent()
return self._answer_agent
def _get_catalog_reader(self) -> CatalogReader:
if self._catalog_reader is None:
from ..catalog.reader import CatalogReader
from ..catalog.store import CatalogStore
self._catalog_reader = CatalogReader(CatalogStore())
return self._catalog_reader
def _get_query_service(self) -> QueryService:
if self._query_service is None:
from ..query.service import QueryService
self._query_service = QueryService()
return self._query_service
def _get_document_retriever(self) -> RetrievalRouter:
if self._document_retriever is None:
from ..retrieval.router import RetrievalRouter
self._document_retriever = RetrievalRouter()
return self._document_retriever
def _get_check_invoker(self, user_id: str) -> Any:
"""Build the per-request data-access invoker for the `check` skill."""
if self._check_invoker_factory is not None:
return self._check_invoker_factory(user_id)
from ..tools.data_access import DataAccessToolInvoker
return DataAccessToolInvoker(user_id, self._get_catalog_reader())
def _get_ps_agent(self) -> ProblemStatementAgent:
if self._ps_agent is None:
self._ps_agent = ProblemStatementAgent()
return self._ps_agent
def _get_help_agent(self) -> HelpAgent:
if self._help_agent is None:
self._help_agent = HelpAgent()
return self._help_agent
def _get_state_store(self) -> Any:
if self._state_store is None:
from .state_store import AnalysisStateStore
self._state_store = AnalysisStateStore()
return self._state_store
def _get_binding_store(self) -> Any:
if self._binding_store is None:
from .binding_store import AnalysisDataSourceStore
self._binding_store = AnalysisDataSourceStore()
return self._binding_store
async def _bound_source_ids(self, analysis_id: str | None) -> set[str]:
"""#10: the catalog source_ids this analysis is bound to (empty = unscoped).
Fail-open: no analysis_id, no binding rows (legacy room / FE not sending
ids), or a read error → empty set, which the caller treats as "whole
catalog". Used to build a `_ScopedCatalogReader` so the Planner AND the
data-access tools (which re-read the catalog themselves) see the same scope.
"""
if not analysis_id:
return set()
try:
return set(await self._get_binding_store().get(analysis_id))
except Exception as e: # noqa: BLE001 — never block the query on this
logger.warning("binding read failed — unscoped", analysis_id=analysis_id, error=str(e))
return set()
async def _load_analysis_state(self, analysis_id: str | None) -> AnalysisState:
"""Load Analysis State for the Help skill; fail closed to a not-validated stub.
Mirrors the gate's never-throw fallback so Help degrades gracefully on a
missing row, a read error, or a legacy room with no `analysis_id`.
"""
from .gate import stub_analysis_state
if not analysis_id:
return stub_analysis_state()
try:
state = await self._get_state_store().get(analysis_id)
except Exception as e:
logger.warning("help state read failed — not-validated", error=str(e))
state = None
return state if state is not None else stub_analysis_state()
# ------------------------------------------------------------------
# Public entry
# ------------------------------------------------------------------
async def stream_help(
self,
user_id: str,
analysis_id: str | None,
history: list[BaseMessage] | None = None,
message: str | None = None,
) -> AsyncIterator[dict[str, Any]]:
"""Deterministic `help` dispatch for the dedicated `/api/v1/tools/help` endpoint.
Bypasses the intent router — the slash command IS the intent, so there is no
classify round-trip and no misclassification risk. Streams the same guidance as
the `help` branch of `handle()`, reusing the warm HelpAgent + state store.
Emits SSE-style events: `sources` (always `[]` — help never references
documents), `chunk`*, then `done` (data left empty; the endpoint stamps the
`message_id`). On failure, yields a terminal `error` event.
"""
# Load (or lazily create) the analysis state; fail closed to a not-validated
# stub so help degrades gracefully on a missing row / read error / legacy id.
state: AnalysisState | None = None
if analysis_id:
try:
state = await self._get_state_store().ensure(analysis_id, user_id)
except Exception as e: # noqa: BLE001 — never block help on a state read
logger.warning("help state ensure failed", analysis_id=analysis_id, error=str(e))
if state is None:
state = await self._load_analysis_state(analysis_id)
# report_ready (seam #5): deterministic, never-throws (fails closed to
# not-ready) — the HelpAgent guard only offers generate_report when ready.
from .report.readiness import is_report_ready
report_ready = await is_report_ready(analysis_id, state)
yield {"event": "sources", "data": json.dumps([])}
try:
async for token in self._get_help_agent().astream(
state,
history=history,
message=message,
report_ready=report_ready,
):
yield {"event": "chunk", "data": token}
except Exception as e: # noqa: BLE001
logger.error("help streaming failed", user_id=user_id, error=str(e))
yield {"event": "error", "data": f"Help generation failed: {e}"}
return
yield {"event": "done", "data": ""}
async def handle(
self,
message: str,
user_id: str,
history: list[BaseMessage] | None = None,
analysis_id: str | None = None,
) -> AsyncIterator[dict[str, Any]]:
tracer = self._make_tracer(user_id, message)
# ---- 1. Classify intent --------------------------------------
try:
oc = tracer.callbacks() # orchestrator: PII-safe, full capture
ckw = {"callbacks": oc} if oc else {}
decision = await self._get_intent_router().classify(message, history, **ckw)
except Exception as e:
logger.error("intent classification failed", error=str(e))
yield {"event": "error", "data": f"Could not classify message: {e}"}
return
intent = decision.intent
# ---- 1a. Ensure session state row (T-A) ----------------------
# Rooms created via /room/create have no `analysis` row. Without one, Help and
# the report_id write-back silently no-op. Lazily get-or-create it (idempotent).
analysis_state: AnalysisState | None = None
if analysis_id:
try:
analysis_state = await self._get_state_store().ensure(analysis_id, user_id)
except Exception as e:
logger.warning(
"analysis state ensure failed", analysis_id=analysis_id, error=str(e)
)
# ---- 1b. Gate (REMOVED 2026-06-24) ---------------------------
# The problem_validated gate was dropped: structured_flow is no longer
# redirected to problem_statement (the goal is now user-entered objective +
# business_questions, no agent validation). `gate()` is neutered to a no-op; the
# call site is left commented for restorability.
# if self._enable_gate and analysis_id:
# from .gate import gate, stub_analysis_state
#
# intent = gate(
# intent,
# analysis_state
# if analysis_state is not None
# else stub_analysis_state(),
# )
# The `intent` event is consumed by the endpoint (it gates response caching
# on the effective intent) and is NOT forwarded to the frontend. We emit the
# post-gate intent so the cache keys on what actually ran.
event_data = decision.model_dump()
event_data["intent"] = intent
yield {"event": "intent", "data": json.dumps(event_data)}
rewritten = decision.rewritten_query or message
query_result = None
chunks: list[DocumentChunk] | None = None
raw_chunks: Any = None
# ---- 2. Route ------------------------------------------------
if intent == "structured_flow":
try:
# One memoizing reader per request: the same catalog is otherwise
# re-fetched from the catalog DB 4-5x across the slow-path run. This
# collapses those to one round-trip per source_hint and pins a single
# consistent snapshot for plan + execution.
from ..catalog.reader import MemoizingCatalogReader
req_reader = MemoizingCatalogReader(self._get_catalog_reader())
# #10: scope every catalog read — the Planner's AND the data-access
# tools' own re-reads — to the analysis's bound sources, so binding
# is a boundary, not just a planner hint (T-B). Fail-open (T-C).
bound = await self._bound_source_ids(analysis_id)
reader = _ScopedCatalogReader(req_reader, bound) if bound else req_reader
catalog = await reader.read(user_id, "structured")
if self._enable_slow_path:
async for event in self._run_slow_path(
user_id, rewritten, catalog, tracer, reader, analysis_id
):
yield event
return
query_result = await self._get_query_service().run(
user_id, rewritten, catalog
)
except Exception as e:
logger.error(
"structured route failed",
user_id=user_id,
error=str(e),
)
yield {"event": "error", "data": f"Structured query failed: {e}"}
return
elif intent == "unstructured_flow":
try:
raw_chunks = await self._get_document_retriever().retrieve(
rewritten, user_id
)
chunks = _normalize_chunks(raw_chunks)
except Exception as e:
logger.error(
"unstructured route failed", user_id=user_id, error=str(e)
)
yield {"event": "error", "data": f"Document retrieval failed: {e}"}
return
elif intent == "check":
try:
invoker = self._get_check_invoker(user_id)
text = await run_check(rewritten, invoker)
except Exception as e:
logger.error("check route failed", user_id=user_id, error=str(e))
yield {"event": "error", "data": f"Lookup failed: {e}"}
return
yield {"event": "chunk", "data": text}
yield {"event": "done", "data": ""}
return
# problem_statement dispatch removed 2026-06-24 (skill unwired; intent no longer
# emitted by the router). Branch kept commented for restorability.
# elif intent == "problem_statement":
# try:
# text = await run_problem_statement(
# message,
# analysis_id,
# agent=self._get_ps_agent(),
# store=self._get_state_store(),
# history=history,
# )
# except Exception as e:
# logger.error("problem_statement route failed", user_id=user_id, error=str(e))
# yield {"event": "error", "data": f"Problem statement failed: {e}"}
# return
# yield {"event": "chunk", "data": text}
# yield {"event": "done", "data": ""}
# return
elif intent == "help":
try:
state = analysis_state or await self._load_analysis_state(analysis_id)
except Exception as e:
logger.error("help route failed", user_id=user_id, error=str(e))
yield {"event": "error", "data": f"Help failed: {e}"}
return
# report_ready (seam #5): deterministic — validated goal + ≥1 recorded
# analysis (mirrors the report API's own 409 gate). Never-throws (fails
# closed to not-ready), so Help degrades safely. The consistency guard in
# HelpAgent only offers `generate_report` when this says ready.
from .report.readiness import is_report_ready
report_ready = await is_report_ready(analysis_id, state)
# The prompt sees chat history -> masked.
hc = tracer.callbacks(masked=True)
hkw = {"callbacks": hc} if hc else {}
try:
async for token in self._get_help_agent().astream(
state,
history=history,
message=message,
report_ready=report_ready,
**hkw,
):
yield {"event": "chunk", "data": token}
except Exception as e:
logger.error("help streaming failed", user_id=user_id, error=str(e))
yield {"event": "error", "data": f"Help generation failed: {e}"}
return
tracer.end()
yield {"event": "done", "data": ""}
return
# else: chat path — no context
# ---- 2b. Emit sources ---------------------------------------
sources = _build_sources(intent, user_id, query_result, raw_chunks)
logger.info(
"built sources",
intent=intent,
sources_count=len(sources),
raw_chunks_count=len(raw_chunks) if raw_chunks else 0,
)
yield {"event": "sources", "data": json.dumps(sources)}
# ---- 3. Stream answer ----------------------------------------
# masked: the answer call sees real query rows / doc chunks (possible PII).
mc = tracer.callbacks(masked=True)
akw = {"callbacks": mc} if mc else {}
try:
async for token in self._get_answer_agent().astream(
message,
history=history,
query_result=query_result,
chunks=chunks,
**akw,
):
yield {"event": "chunk", "data": token}
except Exception as e:
logger.error("answer streaming failed", user_id=user_id, error=str(e))
yield {"event": "error", "data": f"Answer generation failed: {e}"}
return
tracer.end()
yield {"event": "done", "data": ""}
# ------------------------------------------------------------------
# Slow analytical path (gated, off by default)
# ------------------------------------------------------------------
def _make_tracer(self, user_id: str, question: str) -> Any:
"""One Langfuse trace per request (or a NullTracer when disabled)."""
if not self._enable_tracing:
from ..observability.langfuse.tracing import NullTracer
return NullTracer()
from ..observability.langfuse.tracing import RequestTracer
return RequestTracer.start(user_id=user_id, question=question)
def _get_slow_path_coordinator(
self, user_id: str, tracer: Any = None, catalog_reader: CatalogReader | None = None
) -> SlowPathCoordinator:
"""Build the per-request slow-path coordinator (composition root).
The data-access tools need the authenticated `user_id` + `CatalogReader`,
so the `CompositeToolInvoker` is constructed per request. The slow-path
agent code stays tool-agnostic (INV-7) — only here, the composition root,
do we name concrete tool implementations. When tracing is active the invoker
is wrapped so each tool call records a metadata-only span.
"""
if self._slow_path_factory is not None:
return self._slow_path_factory(user_id)
from ..tools.data_access import DataAccessToolInvoker
from ..tools.invoker import AnalyticsToolInvoker, CompositeToolInvoker
from .planner.registry import default_registry
from .planner.service import PlannerService
from .slow_path.assembler import Assembler
from .slow_path.coordinator import SlowPathCoordinator
from .slow_path.task_runner import TaskRunner
invoker: Any = CompositeToolInvoker(
DataAccessToolInvoker(user_id, catalog_reader or self._get_catalog_reader()),
AnalyticsToolInvoker(),
)
if tracer is not None and getattr(tracer, "active", False):
from ..observability.langfuse.tracing import TracingToolInvoker
invoker = TracingToolInvoker(invoker, tracer)
registry = default_registry()
return SlowPathCoordinator(
PlannerService(), TaskRunner(invoker, registry), Assembler(), registry
)
def _get_analysis_store(self) -> ReportInputStore:
if self._analysis_store is None:
from .slow_path.store import PostgresReportInputStore
self._analysis_store = PostgresReportInputStore()
return self._analysis_store
async def _run_slow_path(
self,
user_id: str,
query: str,
catalog: Any,
tracer: Any = None,
catalog_reader: CatalogReader | None = None,
analysis_id: str | None = None,
) -> AsyncIterator[dict[str, Any]]:
"""Run the slow path and stream its assembled answer as SSE events.
Context comes from the `get_business_context` seam (a stub today); the
`analysis_record` is persisted via the `ReportInputStore` seam (PostgresReportInputStore),
stamped with the request's user_id + analysis_id so the report can group it.
`chat_answer` is emitted as a single `chunk` (the Assembler returns the whole
object — true token streaming is a later step).
"""
from .planner.business_context import get_business_context
from .planner.inputs import Constraints
if tracer is None:
from ..observability.langfuse.tracing import NullTracer
tracer = NullTracer()
coordinator = self._get_slow_path_coordinator(user_id, tracer, catalog_reader)
context = await get_business_context(user_id)
# DB3: warm the user's DB connection in parallel with planning so the
# handshake overlaps the ~4s Planner call. Default path only — an injected
# coordinator factory (tests / custom) may not use the real DbExecutor.
if self._slow_path_factory is None:
from ..query.executor.db import DbExecutor
asyncio.create_task(DbExecutor.prewarm(catalog, user_id)) # noqa: RUF006
pc = tracer.callbacks() # planner: PII-safe, full capture
ac = tracer.callbacks(masked=True) # assembler: sees real rows -> masked
run_kw: dict[str, Any] = {}
if pc:
run_kw["planner_callbacks"] = pc
if ac:
run_kw["assembler_callbacks"] = ac
# R4: bridge the coordinator's per-stage progress callback to SSE `status`
# events so the stream isn't silent for ~12s (and proxies don't drop the
# idle connection). Status events only appear if the coordinator calls back.
progress_q: asyncio.Queue[str] = asyncio.Queue()
async def _progress(stage: str) -> None:
await progress_q.put(stage)
run_task = asyncio.create_task(
coordinator.run(
context, catalog, query, Constraints(), progress=_progress, **run_kw
)
)
getter: asyncio.Task = asyncio.create_task(progress_q.get())
pending: set[asyncio.Task] = {run_task, getter}
while True:
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED
)
if getter in done:
yield {"event": "status", "data": getter.result()}
getter = asyncio.create_task(progress_q.get())
pending = pending | {getter}
if run_task in done:
getter.cancel()
while not progress_q.empty():
yield {"event": "status", "data": progress_q.get_nowait()}
break
try:
result = run_task.result()
except Exception as e:
logger.error("slow path failed", user_id=user_id, error=str(e))
yield {"event": "error", "data": f"Analysis failed: {e}"}
return
yield {"event": "sources", "data": json.dumps([])} # TODO: derive from record
yield {"event": "chunk", "data": result.chat_answer}
try:
# Stamp identity from the request scope: owner + the shared session id
# (analysis_id == room_id). Without analysis_id the record is orphaned —
# list_for_analysis can't find it, so the report + is_report_ready go
# blind. The store is never-throw.
record = result.analysis_record.model_copy(
update={"user_id": user_id, "analysis_id": analysis_id}
)
await self._get_analysis_store().save(record)
except Exception as e: # persistence must never break the user's answer
logger.error("analysis_record persist failed", user_id=user_id, error=str(e))
tracer.end() # output omitted (chat_answer may contain PII on Cloud)
yield {"event": "done", "data": ""}
class _ScopedCatalogReader:
"""Wraps a CatalogReader, restricting `structured` reads to an analysis's bound
sources (#10).
Scoping lives here — not at a single call site — so the Planner AND the
data-access tools (which re-read the catalog themselves) see the same scoped
view; otherwise binding is only a hint to the Planner while the executor runs
against the full catalog. Fail-open: an empty or fully-disjoint binding yields
the whole catalog, so a stale / cross-source binding degrades instead of
emptying the catalog. Only `structured` reads are scoped (all #10 binds today);
`unstructured` / retrieval reads pass through.
"""
def __init__(self, inner: Any, bound: set[str]) -> None:
self._inner = inner
self._bound = bound
async def read(self, user_id: str, source_hint: str) -> Any:
catalog = await self._inner.read(user_id, source_hint)
if not self._bound or source_hint != "structured":
return catalog
scoped = [s for s in catalog.sources if s.source_id in self._bound]
return catalog.model_copy(update={"sources": scoped or catalog.sources})
def _build_sources(
intent: str,
user_id: str,
query_result: Any,
raw_chunks: Any,
) -> list[dict[str, Any]]:
"""Build the sources payload for the SSE `sources` event.
- structured_flow: one entry per executed table (table_name only).
- unstructured_flow: deduped by (document_id, page_label), Phase 1 shape.
- chat or error: empty list.
"""
if intent == "structured_flow":
if query_result is None or getattr(query_result, "error", None):
return []
table_name = getattr(query_result, "table_name", "") or ""
if not table_name:
return []
return [{
"document_id": f"{user_id}_{table_name}",
"filename": table_name,
"page_label": None,
}]
if intent == "unstructured_flow" and raw_chunks:
seen: set[tuple[Any, Any]] = set()
sources: list[dict[str, Any]] = []
for item in raw_chunks:
if isinstance(item, RetrievalResult):
data = item.metadata.get("data", {})
elif isinstance(item, dict):
data = item
else:
continue
key = (data.get("document_id"), data.get("page_label"))
if key in seen or key == (None, None):
continue
seen.add(key)
sources.append({
"document_id": data.get("document_id"),
"filename": data.get("filename", "Unknown"),
"page_label": data.get("page_label", "Unknown"),
})
return sources
return []
def _normalize_chunks(raw: Any) -> list[DocumentChunk]:
"""Convert whatever the retriever returns into list[DocumentChunk].
The Phase 2 `DocumentRetriever.retrieve` interface is a stub today;
when TAB owner ships it, it should return `list[DocumentChunk]`
directly so this normalizer becomes a no-op. Until then we coerce
common shapes (dict-with-content, plain string) defensively.
"""
if not raw:
return []
if isinstance(raw, list) and all(isinstance(c, DocumentChunk) for c in raw):
return raw
chunks: list[DocumentChunk] = []
for item in raw:
if isinstance(item, DocumentChunk):
chunks.append(item)
elif isinstance(item, dict):
chunks.append(
DocumentChunk(
content=str(item.get("content", "")),
filename=item.get("filename"),
page_label=item.get("page_label"),
)
)
elif isinstance(item, RetrievalResult):
data = item.metadata.get("data", {})
page = data.get("page_label")
chunks.append(DocumentChunk(
content=item.content,
filename=data.get("filename"),
page_label=str(page) if page is not None else None,
))
elif isinstance(item, str):
chunks.append(DocumentChunk(content=item))
return chunks