| """Assembler — single LLM call at the end of the slow path. |
| |
| Reads the `RunState` (all `TaskResult`s) + `BusinessContext` and produces an |
| `AssembledOutput` { chat_answer, analysis_record }. Owns all language/output: prose, |
| markdown tables, citations, and merging structured + unstructured results. |
| |
| The model authors only the *narrative* (`AssemblerNarrative`); this service copies |
| the structured pass-through (`results_snapshot`, `tasks_run`) and metadata from the |
| `RunState` so the record stays a faithful source of truth (§8.3, INV-4). |
| |
| Chain construction mirrors `agents/planner/service.py`. |
| |
| See AGENT_ARCHITECTURE_CONTEXT_new.md §7.5. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from datetime import UTC, datetime |
| from pathlib import Path |
|
|
| from langchain_core.messages import SystemMessage |
| from langchain_core.prompts import ChatPromptTemplate |
| from langchain_core.runnables import Runnable |
| from langchain_openai import AzureChatOpenAI |
|
|
| from src.middlewares.logging import get_logger |
|
|
| from ..planner.contracts import BusinessContext |
| from .errors import AssemblerError |
| from .prompt import build_assembler_prompt |
| from .schemas import ( |
| AnalysisRecord, |
| AssembledOutput, |
| AssemblerNarrative, |
| RunState, |
| TaskResult, |
| TaskSummary, |
| ) |
|
|
| logger = get_logger("assembler") |
|
|
| _PROMPT_PATH = ( |
| Path(__file__).resolve().parent.parent.parent / "config" / "prompts" / "assembler.md" |
| ) |
|
|
|
|
| def _load_prompt_text() -> str: |
| return _PROMPT_PATH.read_text(encoding="utf-8") |
|
|
|
|
| def _build_default_chain() -> Runnable: |
| from src.config.settings import settings |
|
|
| llm = AzureChatOpenAI( |
| azure_deployment=settings.azureai_deployment_name_4o, |
| openai_api_version=settings.azureai_api_version_4o, |
| azure_endpoint=settings.azureai_endpoint_url_4o, |
| api_key=settings.azureai_api_key_4o, |
| temperature=0, |
| ) |
| prompt = ChatPromptTemplate.from_messages( |
| [ |
| SystemMessage(content=_load_prompt_text()), |
| ("human", "{human_content}"), |
| ] |
| ) |
| return prompt | llm.with_structured_output(AssemblerNarrative) |
|
|
|
|
| _default_chain: Runnable | None = None |
|
|
|
|
| def _get_default_chain() -> Runnable: |
| global _default_chain |
| if _default_chain is None: |
| _default_chain = _build_default_chain() |
| return _default_chain |
|
|
|
|
| class Assembler: |
| """Wraps the single Assembler LLM call. Inject `structured_chain` for tests.""" |
|
|
| def __init__(self, structured_chain: Runnable | None = None) -> None: |
| self._chain = structured_chain |
|
|
| def _ensure_chain(self) -> Runnable: |
| if self._chain is None: |
| self._chain = _get_default_chain() |
| return self._chain |
|
|
| async def assemble( |
| self, |
| run_state: RunState, |
| context: BusinessContext, |
| question: str | None = None, |
| callbacks: list | None = None, |
| ) -> AssembledOutput: |
| chain = self._ensure_chain() |
| human_content = build_assembler_prompt(run_state, context, question) |
| try: |
| if callbacks: |
| narrative: AssemblerNarrative = await chain.ainvoke( |
| {"human_content": human_content}, config={"callbacks": callbacks} |
| ) |
| else: |
| narrative = await chain.ainvoke({"human_content": human_content}) |
| except Exception as exc: |
| raise AssemblerError(f"assembler call failed: {exc}") from exc |
|
|
| record = _build_record(narrative, run_state) |
| logger.info( |
| "analysis assembled", |
| plan_id=run_state.plan_id, |
| business_context_id=run_state.business_context_id, |
| n_tasks=len(run_state.results), |
| ) |
| return AssembledOutput(chat_answer=narrative.chat_answer, analysis_record=record) |
|
|
|
|
| |
| |
| |
| |
| _SNAPSHOT_ROW_SAMPLE = 10 |
|
|
|
|
| def _trim_for_snapshot(result: TaskResult) -> TaskResult: |
| trimmed = [] |
| changed = False |
| for out in result.outputs: |
| if out.kind == "table" and out.rows is not None and len(out.rows) > _SNAPSHOT_ROW_SAMPLE: |
| changed = True |
| trimmed.append( |
| out.model_copy( |
| update={ |
| "rows": out.rows[:_SNAPSHOT_ROW_SAMPLE], |
| "meta": {**out.meta, "total_rows": len(out.rows), "rows_truncated": True}, |
| } |
| ) |
| ) |
| else: |
| trimmed.append(out) |
| return result.model_copy(update={"outputs": trimmed}) if changed else result |
|
|
|
|
| def _build_record(narrative: AssemblerNarrative, run_state: RunState) -> AnalysisRecord: |
| tasks_run = [ |
| TaskSummary( |
| task_id=task_id, |
| stage=result.stage, |
| objective=result.objective, |
| status=result.status, |
| tools_used=[o.tool for o in result.outputs], |
| ) |
| for task_id, result in run_state.results.items() |
| ] |
| results_snapshot = { |
| task_id: _trim_for_snapshot(result) for task_id, result in run_state.results.items() |
| } |
| return AnalysisRecord( |
| goal_restated=narrative.goal_restated, |
| findings=narrative.findings, |
| caveats=narrative.caveats, |
| data_used=narrative.data_used, |
| open_questions=narrative.open_questions, |
| tasks_run=tasks_run, |
| results_snapshot=results_snapshot, |
| plan_id=run_state.plan_id, |
| business_context_id=run_state.business_context_id, |
| created_at=datetime.now(UTC), |
| ) |
|
|