File size: 12,087 Bytes
7302343
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
"""Orchestrator loop tests — happy path, early-stop, budget exits, failure isolation."""

from __future__ import annotations

from typing import Literal

import pytest

from orchestrator.loop import Orchestrator
from orchestrator.strategy import StrategyDecision
from orchestrator.tools import ToolContext, ToolName, ToolRegistry, ToolResult

# === Test doubles ======================================================


class _ScriptedTool:
    """A `Tool` that returns a pre-scripted result. Records calls for assertions."""

    def __init__(  # noqa: PLR0913 — test-only kwarg surface, intentional
        self,
        name: ToolName,
        *,
        status: Literal["success", "failure", "skipped", "timeout"] = "success",
        summary: str = "ok",
        latency_ms: int = 10,
        signal: str | None = None,
        raise_exc: Exception | None = None,
    ) -> None:
        self._name = name
        self._status = status
        self._summary = summary
        self._latency_ms = latency_ms
        self._signal = signal
        self._raise = raise_exc
        self.calls: int = 0

    @property
    def name(self) -> ToolName:
        return self._name

    async def run(self, context: ToolContext) -> ToolResult:
        self.calls += 1
        if self._raise is not None:
            raise self._raise
        detail: dict[str, object] = {}
        if self._signal:
            detail["signal"] = self._signal
        return ToolResult(
            tool=self._name,
            status=self._status,
            summary=self._summary,
            latency_ms=self._latency_ms,
            detail=detail,
        )


def _decision(
    tier: Literal["FAST", "STANDARD", "DEEP"] = "STANDARD",
    *,
    tool_budget: int | None = None,
    time_budget_ms: int | None = None,
) -> StrategyDecision:
    defaults = {
        "FAST": (2, 800, 0.003, False),
        "STANDARD": (4, 3_000, 0.012, True),
        "DEEP": (6, 6_000, 0.030, True),
    }[tier]
    return StrategyDecision(
        tier=tier,
        tool_budget=tool_budget if tool_budget is not None else defaults[0],
        time_budget_ms=time_budget_ms if time_budget_ms is not None else defaults[1],
        cost_budget_usd=defaults[2],
        reasoner_required=defaults[3],
        rationale="test",
    )


def _ctx() -> ToolContext:
    return ToolContext(
        subreddit_id="t5_test",
        correlation_id="inv-test-1",
        target_kind="post",
        target_id="t3_x",
    )


class _FakeClock:
    """Monotonic clock that advances by `step` on each call."""

    def __init__(self, step: float = 0.001) -> None:
        self._t = 0.0
        self._step = step

    def __call__(self) -> float:
        self._t += self._step
        return self._t

    def advance(self, seconds: float) -> None:
        self._t += seconds


# === Happy path ========================================================


@pytest.mark.asyncio
async def test_happy_path_runs_full_plan() -> None:
    registry = ToolRegistry()
    registry.register(_ScriptedTool("policy_match"))
    registry.register(_ScriptedTool("report_velocity"))
    registry.register(_ScriptedTool("user_history"))
    registry.register(_ScriptedTool("prior_actions"))

    orch = Orchestrator(registry, clock=_FakeClock())
    result = await orch.run(decision=_decision("STANDARD"), context=_ctx())

    assert result.tier == "STANDARD"
    assert result.tools_run == 4
    assert result.early_stopped is False
    assert result.stop_reason == "plan_complete"
    assert [e.tool for e in result.accumulator.entries()] == [
        "policy_match",
        "report_velocity",
        "user_history",
        "prior_actions",
    ]
    assert [e.id for e in result.accumulator.entries()] == ["ev-1", "ev-2", "ev-3", "ev-4"]


@pytest.mark.asyncio
async def test_records_started_and_completed_timestamps() -> None:
    registry = ToolRegistry()
    registry.register(_ScriptedTool("policy_match"))
    orch = Orchestrator(registry, clock=_FakeClock())
    result = await orch.run(
        decision=_decision("FAST"), context=_ctx(), plan=["policy_match"]
    )
    assert result.started_at <= result.completed_at


# === Early stop: convergence ===========================================


@pytest.mark.asyncio
async def test_standard_converges_after_two_strong_signals() -> None:
    registry = ToolRegistry()
    registry.register(_ScriptedTool("policy_match", signal="high"))
    registry.register(_ScriptedTool("report_velocity", signal="high"))
    registry.register(_ScriptedTool("user_history"))
    registry.register(_ScriptedTool("prior_actions"))

    orch = Orchestrator(registry, clock=_FakeClock())
    result = await orch.run(decision=_decision("STANDARD"), context=_ctx())

    assert result.tools_run == 2
    assert result.early_stopped is True
    assert result.stop_reason == "converged"
    # The third tool (user_history) never ran.
    user_history_tool = registry.get("user_history")
    assert isinstance(user_history_tool, _ScriptedTool)
    assert user_history_tool.calls == 0


@pytest.mark.asyncio
async def test_fast_converges_after_one_strong_signal() -> None:
    """FAST tier's convergence threshold is 1."""
    registry = ToolRegistry()
    registry.register(_ScriptedTool("policy_match", signal="high"))
    registry.register(_ScriptedTool("report_velocity"))

    orch = Orchestrator(registry, clock=_FakeClock())
    result = await orch.run(decision=_decision("FAST"), context=_ctx())

    assert result.tools_run == 1
    assert result.stop_reason == "converged"


@pytest.mark.asyncio
async def test_no_convergence_when_no_strong_signal() -> None:
    """Tools that don't self-report signal=high never trigger convergence."""
    registry = ToolRegistry()
    registry.register(_ScriptedTool("policy_match"))
    registry.register(_ScriptedTool("report_velocity"))
    registry.register(_ScriptedTool("user_history"))
    registry.register(_ScriptedTool("prior_actions"))

    orch = Orchestrator(registry, clock=_FakeClock())
    result = await orch.run(decision=_decision("STANDARD"), context=_ctx())

    assert result.stop_reason == "plan_complete"
    assert result.early_stopped is False


# === Early stop: budgets ==============================================


@pytest.mark.asyncio
async def test_tool_budget_exit() -> None:
    """tool_budget=2 caps at 2 even when plan has 4."""
    registry = ToolRegistry()
    registry.register(_ScriptedTool("policy_match"))
    registry.register(_ScriptedTool("report_velocity"))
    registry.register(_ScriptedTool("user_history"))
    registry.register(_ScriptedTool("prior_actions"))

    orch = Orchestrator(registry, clock=_FakeClock())
    result = await orch.run(
        decision=_decision("STANDARD", tool_budget=2),
        context=_ctx(),
    )

    assert result.tools_run == 2
    assert result.early_stopped is True
    assert result.stop_reason == "budget_tool"


@pytest.mark.asyncio
async def test_time_budget_exit() -> None:
    """Fake clock burns 1 full second per tool; budget is 1500ms → 1 tool then stop."""

    clock = _FakeClock(step=0.5)  # 500ms per clock tick

    registry = ToolRegistry()
    registry.register(_ScriptedTool("policy_match"))
    registry.register(_ScriptedTool("report_velocity"))
    registry.register(_ScriptedTool("user_history"))
    registry.register(_ScriptedTool("prior_actions"))

    orch = Orchestrator(registry, clock=clock)
    result = await orch.run(
        decision=_decision("STANDARD", time_budget_ms=1_500),
        context=_ctx(),
    )

    # We exit on a pre-check, so at least one tool runs but not all 4.
    assert result.early_stopped is True
    assert result.stop_reason == "budget_time"
    assert result.tools_run < 4


# === Failure isolation =================================================


@pytest.mark.asyncio
async def test_single_tool_exception_does_not_abort_investigation() -> None:
    """A tool raising must not propagate. Investigation continues."""
    registry = ToolRegistry()
    registry.register(_ScriptedTool("policy_match"))
    registry.register(_ScriptedTool("report_velocity", raise_exc=RuntimeError("db down")))
    registry.register(_ScriptedTool("user_history"))
    registry.register(_ScriptedTool("prior_actions"))

    orch = Orchestrator(registry, clock=_FakeClock())
    result = await orch.run(decision=_decision("STANDARD"), context=_ctx())

    assert result.tools_run == 4
    assert result.stop_reason == "plan_complete"

    failed = result.accumulator.by_id("ev-2")
    assert failed is not None
    assert failed.tool == "report_velocity"
    assert failed.status == "failure"
    assert failed.error == "db down"
    assert "RuntimeError" in failed.summary

    # Subsequent tools still ran:
    assert result.accumulator.by_id("ev-3") is not None
    assert result.accumulator.by_id("ev-4") is not None


@pytest.mark.asyncio
async def test_failure_excluded_from_successful_entries() -> None:
    """Failures get evidence ids but are excluded from cite-able set (ADR-0003)."""
    registry = ToolRegistry()
    registry.register(_ScriptedTool("policy_match"))
    registry.register(_ScriptedTool("report_velocity", raise_exc=ValueError("x")))

    orch = Orchestrator(registry, clock=_FakeClock())
    result = await orch.run(decision=_decision("FAST"), context=_ctx())

    successful = result.accumulator.successful_entries()
    assert [e.tool for e in successful] == ["policy_match"]


# === Plan edge cases ===================================================


@pytest.mark.asyncio
async def test_unregistered_tool_in_plan_becomes_skipped() -> None:
    """Unknown tool → recorded as `status=skipped`, doesn't crash."""
    registry = ToolRegistry()
    registry.register(_ScriptedTool("policy_match"))
    # report_velocity intentionally NOT registered

    orch = Orchestrator(registry, clock=_FakeClock())
    result = await orch.run(
        decision=_decision("FAST"),
        context=_ctx(),
        plan=["policy_match", "report_velocity"],
    )

    assert result.tools_run == 2
    skipped = result.accumulator.by_id("ev-2")
    assert skipped is not None
    assert skipped.status == "skipped"
    assert "not registered" in skipped.summary


@pytest.mark.asyncio
async def test_custom_plan_overrides_tier_default() -> None:
    registry = ToolRegistry()
    registry.register(_ScriptedTool("user_history"))
    registry.register(_ScriptedTool("prior_actions"))

    orch = Orchestrator(registry, clock=_FakeClock())
    result = await orch.run(
        decision=_decision("STANDARD"),
        context=_ctx(),
        plan=["user_history", "prior_actions"],  # skip policy_match + report_velocity
    )

    assert [e.tool for e in result.accumulator.entries()] == ["user_history", "prior_actions"]
    assert result.plan == ["user_history", "prior_actions"]


def test_default_plan_per_tier() -> None:
    orch = Orchestrator(ToolRegistry())
    assert orch.default_plan("FAST") == ["policy_match", "report_velocity"]
    assert len(orch.default_plan("STANDARD")) == 4
    assert "thread_context" in orch.default_plan("DEEP")


def test_default_plan_unknown_tier_raises() -> None:
    orch = Orchestrator(ToolRegistry())
    with pytest.raises(ValueError, match="no default plan"):
        orch.default_plan("FAKE_TIER")


# === Reuse safety ======================================================


@pytest.mark.asyncio
async def test_orchestrator_reusable_across_investigations() -> None:
    """Stateless orchestrator: a second `run()` starts a fresh accumulator."""
    registry = ToolRegistry()
    registry.register(_ScriptedTool("policy_match"))
    orch = Orchestrator(registry, clock=_FakeClock())

    a = await orch.run(decision=_decision("FAST"), context=_ctx(), plan=["policy_match"])
    b = await orch.run(decision=_decision("FAST"), context=_ctx(), plan=["policy_match"])

    assert len(a.accumulator) == 1
    assert len(b.accumulator) == 1
    assert a.accumulator is not b.accumulator
    assert a.accumulator.entries()[0].id == "ev-1"
    assert b.accumulator.entries()[0].id == "ev-1"