Rifqi Hafizuddin
[KM-644] feat(report): button-triggered report generator + API
1570c5d
Raw
History Blame
5.81 kB
"""Assembler — single LLM call at the end of the slow path.
Reads the `RunState` (all `TaskResult`s) + `BusinessContext` and produces an
`AssembledOutput` { chat_answer, analysis_record }. Owns all language/output: prose,
markdown tables, citations, and merging structured + unstructured results.
The model authors only the *narrative* (`AssemblerNarrative`); this service copies
the structured pass-through (`results_snapshot`, `tasks_run`) and metadata from the
`RunState` so the record stays a faithful source of truth (§8.3, INV-4).
Chain construction mirrors `agents/planner/service.py`.
See AGENT_ARCHITECTURE_CONTEXT_new.md §7.5.
"""
from __future__ import annotations
from datetime import UTC, datetime
from pathlib import Path
from langchain_core.messages import SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable
from langchain_openai import AzureChatOpenAI
from src.middlewares.logging import get_logger
from ..planner.contracts import BusinessContext
from .errors import AssemblerError
from .prompt import build_assembler_prompt
from .schemas import (
AnalysisRecord,
AssembledOutput,
AssemblerNarrative,
RunState,
TaskResult,
TaskSummary,
)
logger = get_logger("assembler")
_PROMPT_PATH = (
Path(__file__).resolve().parent.parent.parent / "config" / "prompts" / "assembler.md"
)
def _load_prompt_text() -> str:
return _PROMPT_PATH.read_text(encoding="utf-8")
def _build_default_chain() -> Runnable:
from src.config.settings import settings
llm = AzureChatOpenAI(
azure_deployment=settings.azureai_deployment_name_4o,
openai_api_version=settings.azureai_api_version_4o,
azure_endpoint=settings.azureai_endpoint_url_4o,
api_key=settings.azureai_api_key_4o,
temperature=0,
)
prompt = ChatPromptTemplate.from_messages(
[
SystemMessage(content=_load_prompt_text()),
("human", "{human_content}"),
]
)
return prompt | llm.with_structured_output(AssemblerNarrative)
_default_chain: Runnable | None = None
def _get_default_chain() -> Runnable:
global _default_chain
if _default_chain is None:
_default_chain = _build_default_chain()
return _default_chain
class Assembler:
"""Wraps the single Assembler LLM call. Inject `structured_chain` for tests."""
def __init__(self, structured_chain: Runnable | None = None) -> None:
self._chain = structured_chain
def _ensure_chain(self) -> Runnable:
if self._chain is None:
self._chain = _get_default_chain()
return self._chain
async def assemble(
self,
run_state: RunState,
context: BusinessContext,
question: str | None = None,
callbacks: list | None = None,
) -> AssembledOutput:
chain = self._ensure_chain()
human_content = build_assembler_prompt(run_state, context, question)
try:
if callbacks:
narrative: AssemblerNarrative = await chain.ainvoke(
{"human_content": human_content}, config={"callbacks": callbacks}
)
else:
narrative = await chain.ainvoke({"human_content": human_content})
except Exception as exc: # surface as a typed error for the caller
raise AssemblerError(f"assembler call failed: {exc}") from exc
record = _build_record(narrative, run_state)
logger.info(
"analysis assembled",
plan_id=run_state.plan_id,
business_context_id=run_state.business_context_id,
n_tasks=len(run_state.results),
)
return AssembledOutput(chat_answer=narrative.chat_answer, analysis_record=record)
# Persisted records keep `analyze_*` outputs (scalar/stats/series — small, and the
# basis a future report/chart renders from) in full, but cap raw `table` rows from
# data-access tools (retrieve_data can return up to the 10k LIMIT): the report never
# renders raw rows, so storing them all would bloat every record's jsonb.
_SNAPSHOT_ROW_SAMPLE = 10
def _trim_for_snapshot(result: TaskResult) -> TaskResult:
trimmed = []
changed = False
for out in result.outputs:
if out.kind == "table" and out.rows is not None and len(out.rows) > _SNAPSHOT_ROW_SAMPLE:
changed = True
trimmed.append(
out.model_copy(
update={
"rows": out.rows[:_SNAPSHOT_ROW_SAMPLE],
"meta": {**out.meta, "total_rows": len(out.rows), "rows_truncated": True},
}
)
)
else:
trimmed.append(out)
return result.model_copy(update={"outputs": trimmed}) if changed else result
def _build_record(narrative: AssemblerNarrative, run_state: RunState) -> AnalysisRecord:
tasks_run = [
TaskSummary(
task_id=task_id,
stage=result.stage,
objective=result.objective,
status=result.status,
tools_used=[o.tool for o in result.outputs],
)
for task_id, result in run_state.results.items()
]
results_snapshot = {
task_id: _trim_for_snapshot(result) for task_id, result in run_state.results.items()
}
return AnalysisRecord(
goal_restated=narrative.goal_restated,
findings=narrative.findings,
caveats=narrative.caveats,
data_used=narrative.data_used,
open_questions=narrative.open_questions,
tasks_run=tasks_run,
results_snapshot=results_snapshot,
plan_id=run_state.plan_id,
business_context_id=run_state.business_context_id,
created_at=datetime.now(UTC),
)