Rifqi Hafizuddin Claude Opus 4.8 commited on
Commit
e4337a8
·
1 Parent(s): ac310de

[KM-626][AI] Slow-path agent: Assembler + Coordinator

Browse files

The write-up half of the slow path (AGENT_ARCHITECTURE_CONTEXT_new.md §7.5, §8.3).

- assembler.py + prompt.py + config/prompts/assembler.md: single LLM call ->
AssemblerNarrative; code merges it with the RunState to build the AnalysisRecord
(results_snapshot / tasks_run / metadata copied from RunState, never re-authored
by the model, so the record stays a faithful source of truth). chat_answer is the
first output field so it streams via SSE. Chain construction mirrors the planner
service.
- coordinator.py: SlowPathCoordinator wires Planner -> TaskRunner -> Assembler.
Built and tested, but NOT yet wired into the live ChatHandler.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

src/agents/slow_path/assembler.py ADDED
@@ -0,0 +1,136 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Assembler — single LLM call at the end of the slow path.
2
+
3
+ Reads the `RunState` (all `TaskResult`s) + `BusinessContext` and produces an
4
+ `AssembledOutput` { chat_answer, analysis_record }. Owns all language/output: prose,
5
+ markdown tables, citations, and merging structured + unstructured results.
6
+
7
+ The model authors only the *narrative* (`AssemblerNarrative`); this service copies
8
+ the structured pass-through (`results_snapshot`, `tasks_run`) and metadata from the
9
+ `RunState` so the record stays a faithful source of truth (§8.3, INV-4).
10
+
11
+ Chain construction mirrors `agents/planner/service.py`.
12
+
13
+ See AGENT_ARCHITECTURE_CONTEXT_new.md §7.5.
14
+ """
15
+
16
+ from __future__ import annotations
17
+
18
+ from datetime import UTC, datetime
19
+ from pathlib import Path
20
+
21
+ from langchain_core.messages import SystemMessage
22
+ from langchain_core.prompts import ChatPromptTemplate
23
+ from langchain_core.runnables import Runnable
24
+ from langchain_openai import AzureChatOpenAI
25
+
26
+ from src.middlewares.logging import get_logger
27
+
28
+ from ..planner.contracts import BusinessContext
29
+ from .errors import AssemblerError
30
+ from .prompt import build_assembler_prompt
31
+ from .schemas import (
32
+ AnalysisRecord,
33
+ AssembledOutput,
34
+ AssemblerNarrative,
35
+ RunState,
36
+ TaskSummary,
37
+ )
38
+
39
+ logger = get_logger("assembler")
40
+
41
+ _PROMPT_PATH = (
42
+ Path(__file__).resolve().parent.parent.parent / "config" / "prompts" / "assembler.md"
43
+ )
44
+
45
+
46
+ def _load_prompt_text() -> str:
47
+ return _PROMPT_PATH.read_text(encoding="utf-8")
48
+
49
+
50
+ def _build_default_chain() -> Runnable:
51
+ from src.config.settings import settings
52
+
53
+ llm = AzureChatOpenAI(
54
+ azure_deployment=settings.azureai_deployment_name_4o,
55
+ openai_api_version=settings.azureai_api_version_4o,
56
+ azure_endpoint=settings.azureai_endpoint_url_4o,
57
+ api_key=settings.azureai_api_key_4o,
58
+ temperature=0,
59
+ )
60
+ prompt = ChatPromptTemplate.from_messages(
61
+ [
62
+ SystemMessage(content=_load_prompt_text()),
63
+ ("human", "{human_content}"),
64
+ ]
65
+ )
66
+ return prompt | llm.with_structured_output(AssemblerNarrative)
67
+
68
+
69
+ _default_chain: Runnable | None = None
70
+
71
+
72
+ def _get_default_chain() -> Runnable:
73
+ global _default_chain
74
+ if _default_chain is None:
75
+ _default_chain = _build_default_chain()
76
+ return _default_chain
77
+
78
+
79
+ class Assembler:
80
+ """Wraps the single Assembler LLM call. Inject `structured_chain` for tests."""
81
+
82
+ def __init__(self, structured_chain: Runnable | None = None) -> None:
83
+ self._chain = structured_chain
84
+
85
+ def _ensure_chain(self) -> Runnable:
86
+ if self._chain is None:
87
+ self._chain = _get_default_chain()
88
+ return self._chain
89
+
90
+ async def assemble(
91
+ self,
92
+ run_state: RunState,
93
+ context: BusinessContext,
94
+ question: str | None = None,
95
+ ) -> AssembledOutput:
96
+ chain = self._ensure_chain()
97
+ human_content = build_assembler_prompt(run_state, context, question)
98
+ try:
99
+ narrative: AssemblerNarrative = await chain.ainvoke(
100
+ {"human_content": human_content}
101
+ )
102
+ except Exception as exc: # surface as a typed error for the caller
103
+ raise AssemblerError(f"assembler call failed: {exc}") from exc
104
+
105
+ record = _build_record(narrative, run_state)
106
+ logger.info(
107
+ "analysis assembled",
108
+ plan_id=run_state.plan_id,
109
+ business_context_id=run_state.business_context_id,
110
+ n_tasks=len(run_state.results),
111
+ )
112
+ return AssembledOutput(chat_answer=narrative.chat_answer, analysis_record=record)
113
+
114
+
115
+ def _build_record(narrative: AssemblerNarrative, run_state: RunState) -> AnalysisRecord:
116
+ tasks_run = [
117
+ TaskSummary(
118
+ task_id=task_id,
119
+ objective=result.objective,
120
+ status=result.status,
121
+ tools_used=[o.tool for o in result.outputs],
122
+ )
123
+ for task_id, result in run_state.results.items()
124
+ ]
125
+ return AnalysisRecord(
126
+ goal_restated=narrative.goal_restated,
127
+ findings=narrative.findings,
128
+ caveats=narrative.caveats,
129
+ data_used=narrative.data_used,
130
+ open_questions=narrative.open_questions,
131
+ tasks_run=tasks_run,
132
+ results_snapshot=run_state.results,
133
+ plan_id=run_state.plan_id,
134
+ business_context_id=run_state.business_context_id,
135
+ created_at=datetime.now(UTC),
136
+ )
src/agents/slow_path/coordinator.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """SlowPathCoordinator — wires the slow path: Planner -> TaskRunner -> Assembler.
2
+
3
+ A thin coordination object. This is the unit the (future) expanded Orchestrator /
4
+ ChatHandler will call on a `structured` analytical query. It is built and tested
5
+ here but **not yet wired into the live chat flow** — that step waits on the tool
6
+ team's real `ToolInvoker` and a real `BusinessContext` source.
7
+
8
+ See AGENT_ARCHITECTURE_CONTEXT_new.md §5.2 / §6.1.
9
+ """
10
+
11
+ from __future__ import annotations
12
+
13
+ from ...catalog.models import Catalog
14
+ from ..planner.contracts import BusinessContext, ToolRegistry
15
+ from ..planner.inputs import Constraints
16
+ from ..planner.service import PlannerService
17
+ from .assembler import Assembler
18
+ from .schemas import AssembledOutput
19
+ from .task_runner import TaskRunner
20
+
21
+
22
+ class SlowPathCoordinator:
23
+ def __init__(
24
+ self,
25
+ planner: PlannerService,
26
+ task_runner: TaskRunner,
27
+ assembler: Assembler,
28
+ registry: ToolRegistry,
29
+ ) -> None:
30
+ self._planner = planner
31
+ self._task_runner = task_runner
32
+ self._assembler = assembler
33
+ self._registry = registry
34
+
35
+ async def run(
36
+ self,
37
+ context: BusinessContext,
38
+ catalog: Catalog,
39
+ query: str,
40
+ constraints: Constraints,
41
+ ) -> AssembledOutput:
42
+ task_list = await self._planner.plan(
43
+ context, catalog, self._registry, query, constraints
44
+ )
45
+ run_state = await self._task_runner.run(
46
+ task_list, business_context_id=context.project_id
47
+ )
48
+ return await self._assembler.assemble(run_state, context, question=query)
src/agents/slow_path/prompt.py ADDED
@@ -0,0 +1,66 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Builds the Assembler LLM human-message content.
2
+
3
+ The system prompt (`config/prompts/assembler.md`) carries the role and rules. This
4
+ module assembles the per-call human content: the business context + the executed
5
+ `RunState` (task objectives, statuses, and structured tool outputs) + the original
6
+ question. Tool outputs are rendered compactly as data — the model turns them into
7
+ prose and markdown tables.
8
+ """
9
+
10
+ from __future__ import annotations
11
+
12
+ from ..planner.contracts import BusinessContext, ToolOutput
13
+ from ..planner.prompt import render_business_context
14
+ from .schemas import RunState, TaskResult
15
+
16
+ _MAX_ROWS = 20
17
+
18
+
19
+ def render_run_state(run_state: RunState) -> str:
20
+ lines = [f"Plan: {run_state.plan_id}"]
21
+ if run_state.open_questions:
22
+ lines.append("Open questions carried from the plan:")
23
+ lines.extend(f" - {q}" for q in run_state.open_questions)
24
+ lines.append("")
25
+ lines.append("Task results (in execution order):")
26
+ for task_id, result in run_state.results.items():
27
+ lines.append(_render_task(task_id, result))
28
+ return "\n".join(lines)
29
+
30
+
31
+ def _render_task(task_id: str, result: TaskResult) -> str:
32
+ lines = [f"- [{result.status}] {task_id}: {result.objective}"]
33
+ if result.error:
34
+ lines.append(f" note: {result.error}")
35
+ for output in result.outputs:
36
+ lines.append(f" {_render_output(output)}")
37
+ return "\n".join(lines)
38
+
39
+
40
+ def _render_output(output: ToolOutput) -> str:
41
+ if output.kind == "error":
42
+ return f"({output.tool}) error: {output.error}"
43
+ if output.kind == "table" and output.columns is not None:
44
+ header = ", ".join(output.columns)
45
+ rows = output.rows or []
46
+ preview = "; ".join(
47
+ " | ".join(str(cell) for cell in row) for row in rows[:_MAX_ROWS]
48
+ )
49
+ more = "" if len(rows) <= _MAX_ROWS else f" … (+{len(rows) - _MAX_ROWS} more rows)"
50
+ return f"({output.tool}) table [{header}]: {preview}{more}"
51
+ meta = f" meta={output.meta}" if output.meta else ""
52
+ return f"({output.tool}) {output.kind}: {output.value}{meta}"
53
+
54
+
55
+ def build_assembler_prompt(
56
+ run_state: RunState,
57
+ context: BusinessContext,
58
+ question: str | None = None,
59
+ ) -> str:
60
+ sections = [
61
+ f"# Business context\n\n{render_business_context(context)}",
62
+ f"# Analysis results\n\n{render_run_state(run_state)}",
63
+ ]
64
+ if question:
65
+ sections.append(f"# Original question\n\n{question}")
66
+ return "\n\n".join(sections)
src/config/prompts/assembler.md ADDED
@@ -0,0 +1,43 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ You are the Assembler for Data Eyond, an AI data scientist. A deterministic
2
+ TaskRunner has just executed a static analysis plan; you receive its results (the
3
+ `RunState`) plus the project's business context. Your job is to turn those results
4
+ into a decision-ready answer.
5
+
6
+ You produce two things in one structured object:
7
+ 1. `chat_answer` — a compact, to-the-point reply for the chat, in **markdown**
8
+ (prose + tables where useful).
9
+ 2. The narrative fields of an analysis record: `goal_restated`, `findings`,
10
+ `caveats`, `data_used`, `open_questions`.
11
+
12
+ # Hard rules (non-negotiable)
13
+
14
+ 1. **Ground every claim in the provided results.** Use only the numbers, tables,
15
+ and values present in the task results. **Never invent, estimate, or extrapolate
16
+ a number** that is not in the results. If the data does not answer part of the
17
+ question, say so.
18
+ 2. **Report what failed.** Some tasks may have `status: partial` or `failure`. Do
19
+ not pretend they succeeded. Briefly state what could not be completed and how it
20
+ limits the answer; put unresolved items in `open_questions`.
21
+ 3. **Render, don't recompute.** Build markdown tables from the structured task
22
+ outputs as they are. Do not do your own arithmetic beyond trivially restating a
23
+ value already computed.
24
+ 4. **No tool/code talk.** Write for a business reader. Do not mention tool names,
25
+ task ids, SQL, or internal mechanics in `chat_answer`.
26
+
27
+ # How to write
28
+
29
+ - **`chat_answer`**: lead with the answer. Add a short markdown table when it makes
30
+ the numbers clearer. Keep it tight — this streams into a chat, not a report.
31
+ - **`findings`**: the key takeaways, each a single self-contained sentence with the
32
+ supporting figure.
33
+ - **`caveats`**: data-quality limits, partial/failed steps, assumptions that affect
34
+ confidence.
35
+ - **`data_used`**: the sources/tables/columns the answer rests on (plain names).
36
+ - **`goal_restated`**: one sentence restating the business question you answered.
37
+ - **`open_questions`**: anything ambiguous, missing, or worth a follow-up. Fold in
38
+ any open questions carried from the plan. Empty list if genuinely none.
39
+
40
+ # Output
41
+
42
+ Return exactly one structured object with the fields above. Be honest, specific,
43
+ and concise.