Rifqi Hafizuddin Claude Opus 4.8 commited on
Commit
ac310de
·
1 Parent(s): f346114

[KM-626][AI] Slow-path agent: seam contracts + TaskRunner

Browse files

The execution half of the slow path (AGENT_ARCHITECTURE_CONTEXT_new.md §7.4,
§8.2/§8.4). Deterministic, 0 LLM, tool-agnostic (INV-7).

- schemas.py: TaskResult, RunState (the blackboard); TaskSummary, AnalysisRecord,
AssembledOutput, AssemblerNarrative. Reuses planner ToolOutput.
- invoker.py: ToolInvoker Protocol only -- the runtime seam the runner calls; the
tool team owns the implementation (KM-418).
- errors.py: SlowPathError, AssemblerError.
- task_runner.py: wave-based dependency execution (asyncio.gather), ${t<id>}
placeholder resolution, internal validate_args, never-throw invoke, status
labeling, degrade-and-continue. No replanning, no mid-run LLM.

Lives in agents/slow_path/ -- NOT "orchestrator" (that name is the entry
dispatcher in agents/orchestration.py).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

src/agents/slow_path/__init__.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ """Slow-path workers: TaskRunner (deterministic) + Assembler (1 LLM call) + Coordinator.
2
+
3
+ These are driven *by* the Orchestrator (the intent-router/dispatcher in
4
+ `agents/orchestration.py`); this package is deliberately NOT named "orchestrator"
5
+ to keep the dispatcher and the workers from sharing a name. It executes the
6
+ Planner's static `TaskList` and assembles the two outputs (`chat_answer` +
7
+ `AnalysisRecord`). See AGENT_ARCHITECTURE_CONTEXT_new.md §7.2 / §7.4 / §7.5 /
8
+ §8.2–8.4. Tool-agnostic: depends only on the `ToolInvoker` protocol and the
9
+ `ToolOutput` envelope, never on a specific tool (INV-7).
10
+ """
src/agents/slow_path/errors.py ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Typed errors for the slow-path layer."""
2
+
3
+ from __future__ import annotations
4
+
5
+
6
+ class SlowPathError(Exception):
7
+ """Base error for the slow-path layer."""
8
+
9
+
10
+ class AssemblerError(SlowPathError):
11
+ """The Assembler LLM call could not produce a valid `AssembledOutput`."""
src/agents/slow_path/invoker.py ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """The tool invocation seam (§8.4) — the one interface the TaskRunner calls.
2
+
3
+ The agent layer stays tool-agnostic (INV-7) by invoking every tool through this
4
+ protocol, never importing a tool module directly. The **tool team owns the
5
+ implementation** (KM-418); this file defines only the contract the TaskRunner
6
+ depends on.
7
+
8
+ Frozen guarantees the implementation must hold:
9
+ 1. **Never throws.** A tool failure returns `ToolOutput(kind="error", error=...)`,
10
+ not an exception — the TaskRunner's degrade-and-continue (§7.4) relies on this.
11
+ (The TaskRunner still wraps calls defensively, as a backstop.)
12
+ 2. **Returns the `ToolOutput` envelope** (§8.1) — structured data only, never
13
+ rendered tables or prose (that is the Assembler's job).
14
+ 3. **`tool_name` comes from the registry** (§9.2); unknown names return an error
15
+ envelope rather than throwing.
16
+ """
17
+
18
+ from __future__ import annotations
19
+
20
+ from typing import Any, Protocol, runtime_checkable
21
+
22
+ from ..planner.contracts import ToolOutput
23
+
24
+
25
+ @runtime_checkable
26
+ class ToolInvoker(Protocol):
27
+ async def invoke(self, tool_name: str, args: dict[str, Any]) -> ToolOutput: ...
src/agents/slow_path/schemas.py ADDED
@@ -0,0 +1,99 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Slow-path execution + output contracts.
2
+
3
+ The seams between the three slow-path stages:
4
+ - TaskRunner writes `RunState` (a blackboard of `TaskResult`s) — §8.2.
5
+ - Assembler reads `RunState` + `BusinessContext` and produces `AssembledOutput`
6
+ (`chat_answer` + `AnalysisRecord`) — §8.3.
7
+
8
+ `ToolOutput` (the tool -> agent envelope) is reused from the planner contracts so
9
+ there is exactly one definition across the layer.
10
+
11
+ Note on authorship (§8.3): the Assembler LLM authors only the *narrative* fields
12
+ (`AssemblerNarrative`). The `AnalysisRecord`'s structured pass-through fields
13
+ (`results_snapshot`, `tasks_run`) and metadata are copied from `RunState` by code,
14
+ never re-authored by the model — that is the source of truth the report generator
15
+ renders from (INV-4).
16
+
17
+ See AGENT_ARCHITECTURE_CONTEXT_new.md §8.2 / §8.3.
18
+ """
19
+
20
+ from __future__ import annotations
21
+
22
+ from datetime import datetime
23
+ from typing import Literal
24
+
25
+ from pydantic import BaseModel, Field
26
+
27
+ from ..planner.contracts import ToolOutput
28
+
29
+ TaskStatus = Literal["success", "partial", "failure"]
30
+
31
+
32
+ # --------------------------------------------------------------------------- #
33
+ # Execution state (TaskRunner -> Assembler) — §8.2
34
+ # --------------------------------------------------------------------------- #
35
+
36
+
37
+ class TaskResult(BaseModel):
38
+ task_id: str
39
+ status: TaskStatus
40
+ objective: str
41
+ outputs: list[ToolOutput] = Field(default_factory=list) # one per tool_call
42
+ note: str | None = None
43
+ error: str | None = None
44
+
45
+
46
+ class RunState(BaseModel):
47
+ plan_id: str
48
+ business_context_id: str
49
+ results: dict[str, TaskResult] = Field(default_factory=dict) # task_id -> result
50
+ open_questions: list[str] = Field(default_factory=list)
51
+
52
+
53
+ # --------------------------------------------------------------------------- #
54
+ # Assembled output (Assembler -> Orchestrator / memory) — §8.3
55
+ # --------------------------------------------------------------------------- #
56
+
57
+
58
+ class TaskSummary(BaseModel):
59
+ task_id: str
60
+ objective: str
61
+ status: TaskStatus
62
+ tools_used: list[str] = Field(default_factory=list)
63
+
64
+
65
+ class AnalysisRecord(BaseModel):
66
+ # Narrative fields — authored by the Assembler LLM.
67
+ goal_restated: str
68
+ findings: list[str] = Field(default_factory=list)
69
+ caveats: list[str] = Field(default_factory=list)
70
+ data_used: list[str] = Field(default_factory=list)
71
+ open_questions: list[str] = Field(default_factory=list)
72
+ # Structured pass-through — NOT re-authored; copied from RunState.
73
+ tasks_run: list[TaskSummary] = Field(default_factory=list)
74
+ results_snapshot: dict[str, TaskResult] = Field(default_factory=dict)
75
+ # Metadata.
76
+ plan_id: str
77
+ business_context_id: str
78
+ created_at: datetime
79
+
80
+
81
+ class AssembledOutput(BaseModel):
82
+ chat_answer: str # FIRST field — streams via SSE; markdown prose + tables
83
+ analysis_record: AnalysisRecord
84
+
85
+
86
+ class AssemblerNarrative(BaseModel):
87
+ """The subset of `AnalysisRecord` the Assembler LLM actually authors.
88
+
89
+ Kept separate from `AssembledOutput` so the model never emits the structured
90
+ pass-through fields (which would invite hallucinated numbers); `Assembler`
91
+ code merges this with the real `RunState` to build the final record.
92
+ """
93
+
94
+ chat_answer: str
95
+ goal_restated: str
96
+ findings: list[str] = Field(default_factory=list)
97
+ caveats: list[str] = Field(default_factory=list)
98
+ data_used: list[str] = Field(default_factory=list)
99
+ open_questions: list[str] = Field(default_factory=list)
src/agents/slow_path/task_runner.py ADDED
@@ -0,0 +1,168 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """TaskRunner — deterministic execution of a static `TaskList`. Zero LLM.
2
+
3
+ Executes tasks in dependency order, parallelizing each ready "wave" with
4
+ `asyncio.gather`. For each task it resolves `${t<id>}` placeholders from upstream
5
+ results, does an internal `validate_args`, invokes each tool via the `ToolInvoker`
6
+ seam, and records a `TaskResult`. On failure it **degrades and continues**: the
7
+ task is marked failed, its dependents are skipped, independent branches keep
8
+ running. There is no replanning and no mid-run LLM (INV-6).
9
+
10
+ `success_criteria` is *not* machine-evaluated here (it is free text); task status
11
+ is derived from tool execution outcomes and carried to the Assembler to report.
12
+
13
+ See AGENT_ARCHITECTURE_CONTEXT_new.md §7.4.
14
+ """
15
+
16
+ from __future__ import annotations
17
+
18
+ import asyncio
19
+ import re
20
+ from typing import Any
21
+
22
+ from src.middlewares.logging import get_logger
23
+
24
+ from ..planner.contracts import ToolOutput, ToolRegistry
25
+ from ..planner.schemas import Task
26
+ from ..planner.schemas import TaskList as PlanTaskList
27
+ from .invoker import ToolInvoker
28
+ from .schemas import RunState, TaskResult, TaskStatus
29
+
30
+ logger = get_logger("task_runner")
31
+
32
+ # Mirrors planner/validator.py:28 `_PLACEHOLDER_RE` — keep the two in sync.
33
+ _PLACEHOLDER_RE = re.compile(r"\$\{(t[^}]+)\}")
34
+
35
+
36
+ class TaskRunner:
37
+ """Runs a `TaskList` against a `ToolInvoker`, producing a `RunState`."""
38
+
39
+ def __init__(self, invoker: ToolInvoker, registry: ToolRegistry) -> None:
40
+ self._invoker = invoker
41
+ self._registry = registry
42
+
43
+ async def run(self, task_list: PlanTaskList, business_context_id: str) -> RunState:
44
+ tasks_by_id: dict[str, Task] = {t.id: t for t in task_list.tasks}
45
+ results: dict[str, TaskResult] = {}
46
+ remaining: set[str] = set(tasks_by_id)
47
+
48
+ while remaining:
49
+ ready = [
50
+ tid
51
+ for tid in remaining
52
+ if all(dep in results for dep in tasks_by_id[tid].depends_on)
53
+ ]
54
+ if not ready:
55
+ # A dependency points outside the plan (or a cycle slipped past the
56
+ # planner validator): nothing more can run. Fail the rest honestly.
57
+ for tid in list(remaining):
58
+ results[tid] = TaskResult(
59
+ task_id=tid,
60
+ status="failure",
61
+ objective=tasks_by_id[tid].objective,
62
+ error="unresolved dependency; task could not run",
63
+ )
64
+ remaining.discard(tid)
65
+ break
66
+
67
+ # Skip any ready task whose dependency failed (degrade-and-continue).
68
+ to_run: list[Task] = []
69
+ for tid in ready:
70
+ task = tasks_by_id[tid]
71
+ failed = [d for d in task.depends_on if results[d].status == "failure"]
72
+ if failed:
73
+ results[tid] = TaskResult(
74
+ task_id=tid,
75
+ status="failure",
76
+ objective=task.objective,
77
+ error=f"skipped: upstream {failed} did not succeed",
78
+ )
79
+ remaining.discard(tid)
80
+ else:
81
+ to_run.append(task)
82
+
83
+ if not to_run:
84
+ continue # remaining dependents will be re-evaluated (and skipped)
85
+
86
+ wave = await asyncio.gather(
87
+ *(self._run_task(task, results) for task in to_run)
88
+ )
89
+ for tr in wave:
90
+ results[tr.task_id] = tr
91
+ remaining.discard(tr.task_id)
92
+
93
+ return RunState(
94
+ plan_id=task_list.plan_id,
95
+ business_context_id=business_context_id,
96
+ results=results,
97
+ open_questions=list(task_list.open_questions),
98
+ )
99
+
100
+ async def _run_task(self, task: Task, results: dict[str, TaskResult]) -> TaskResult:
101
+ outputs: list[ToolOutput] = []
102
+ for call in task.tool_calls:
103
+ resolved = self._resolve_args(call.args, results)
104
+ arg_error = self._validate_args(call.tool, resolved)
105
+ if arg_error is not None:
106
+ outputs.append(ToolOutput(tool=call.tool, kind="error", error=arg_error))
107
+ continue
108
+ outputs.append(await self._safe_invoke(call.tool, resolved))
109
+
110
+ status = _label(outputs)
111
+ error: str | None = None
112
+ if status == "failure":
113
+ errs = [o.error for o in outputs if o.kind == "error" and o.error]
114
+ error = errs[0] if errs else "all tool calls failed"
115
+ return TaskResult(
116
+ task_id=task.id,
117
+ status=status,
118
+ objective=task.objective,
119
+ outputs=outputs,
120
+ error=error,
121
+ )
122
+
123
+ def _resolve_args(
124
+ self, args: dict[str, Any], results: dict[str, TaskResult]
125
+ ) -> dict[str, Any]:
126
+ return {k: self._resolve_value(v, results) for k, v in args.items()}
127
+
128
+ @staticmethod
129
+ def _resolve_value(value: Any, results: dict[str, TaskResult]) -> Any:
130
+ # A data arg is exactly a "${t<id>}" placeholder (Pattern A); resolve it to
131
+ # the referenced task's representative output (its last ToolOutput).
132
+ # Materializing that envelope into a DataFrame is the invoker's job.
133
+ if isinstance(value, str):
134
+ match = _PLACEHOLDER_RE.fullmatch(value.strip())
135
+ if match:
136
+ upstream = results.get(match.group(1))
137
+ if upstream is None or not upstream.outputs:
138
+ return None
139
+ return upstream.outputs[-1]
140
+ return value
141
+
142
+ def _validate_args(self, tool: str, resolved: dict[str, Any]) -> str | None:
143
+ spec = self._registry.get(tool)
144
+ if spec is None:
145
+ return f"tool {tool!r} not in registry"
146
+ required = spec.input_schema.get("required", [])
147
+ missing = [r for r in required if resolved.get(r) is None]
148
+ if missing:
149
+ return f"missing required arg(s): {sorted(missing)}"
150
+ return None
151
+
152
+ async def _safe_invoke(self, tool: str, args: dict[str, Any]) -> ToolOutput:
153
+ try:
154
+ return await self._invoker.invoke(tool, args)
155
+ except Exception as exc: # noqa: BLE001 — backstop; the invoker is never-throw (§8.4)
156
+ logger.warning("tool invoker raised", tool=tool, error=str(exc))
157
+ return ToolOutput(tool=tool, kind="error", error=f"invoker raised: {exc}")
158
+
159
+
160
+ def _label(outputs: list[ToolOutput]) -> TaskStatus:
161
+ if not outputs:
162
+ return "failure"
163
+ errors = sum(1 for o in outputs if o.kind == "error")
164
+ if errors == 0:
165
+ return "success"
166
+ if errors == len(outputs):
167
+ return "failure"
168
+ return "partial"