Aksel Joonas Reedi commited on
Commit
d408a51
·
unverified ·
1 Parent(s): 59b2038

Preserve streamed thinking metadata with live model tests (#150)

Browse files

* Add opt-in live thinking model tests

Add paid integration coverage for the concrete models requested for #143: Anthropic Opus 4.7 and OpenAI's current GPT-5.2 model. The tests load an explicit env file, run only behind ML_INTERN_LIVE_LLM_TESTS=1, and keep normal CI credential-free.

Constraint: Live provider calls require local credentials and should not run by default in CI.

Rejected: Make live provider tests unconditional | would fail or spend tokens anywhere credentials are absent.

Confidence: high

Scope-risk: narrow

Tested: ML_INTERN_LIVE_LLM_TESTS=1 ML_INTERN_LIVE_ENV_FILE=/Users/akseljoonas/Documents/ml-intern/.env UV_CACHE_DIR=/tmp/uv-cache uv run --extra dev pytest tests/integration/test_live_thinking_models.py -q -rs

Tested: UV_CACHE_DIR=/tmp/uv-cache uv run --extra dev pytest tests/unit/test_thinking_history.py tests/integration/test_live_thinking_models.py -q

* Preserve streamed Opus thinking metadata

Live Opus 4.7 exposed that LiteLLM surfaces streamed thinking blocks on deltas while stream_chunk_builder can drop them. Capture Anthropic thinking metadata directly during streaming and keep the chunk rebuild path as a fallback.

Constraint: #150 live test must prove real thinking metadata is present, not pass on None metadata.

Rejected: Switch the live Opus test to non-streaming only | would avoid the actual streaming replay gap.

Confidence: high

Scope-risk: narrow

Directive: Keep replay of provider reasoning fields gated to anthropic/* models; OpenAI-compatible providers must not receive echoed reasoning_content.

Tested: UV_CACHE_DIR=/tmp/uv-cache uv run --extra dev pytest tests/unit/test_thinking_history.py -q

Tested: ML_INTERN_LIVE_LLM_TESTS=1 ML_INTERN_LIVE_ENV_FILE=/Users/akseljoonas/Documents/ml-intern/.env UV_CACHE_DIR=/tmp/uv-cache uv run --extra dev pytest tests/integration/test_live_thinking_models.py -q -rs

Tested: UV_CACHE_DIR=/tmp/uv-cache uv run --extra dev pytest tests/unit/test_thinking_history.py tests/integration/test_live_thinking_models.py -q

agent/core/agent_loop.py CHANGED
@@ -410,8 +410,20 @@ def _extract_thinking_state(
410
  message: Any,
411
  ) -> tuple[list[dict[str, Any]] | None, str | None]:
412
  """Return provider reasoning fields that must be replayed after tool calls."""
413
- thinking_blocks = getattr(message, "thinking_blocks", None) or None
414
- reasoning_content = getattr(message, "reasoning_content", None) or None
 
 
 
 
 
 
 
 
 
 
 
 
415
  return thinking_blocks, reasoning_content
416
 
417
 
@@ -492,6 +504,9 @@ async def _call_llm_streaming(session: Session, messages, tools, llm_params) ->
492
  finish_reason = None
493
  final_usage_chunk = None
494
  chunks = []
 
 
 
495
 
496
  async for chunk in response:
497
  chunks.append(chunk)
@@ -510,6 +525,13 @@ async def _call_llm_streaming(session: Session, messages, tools, llm_params) ->
510
  if choice.finish_reason:
511
  finish_reason = choice.finish_reason
512
 
 
 
 
 
 
 
 
513
  if delta.content:
514
  full_content += delta.content
515
  await session.send_event(
@@ -543,9 +565,9 @@ async def _call_llm_streaming(session: Session, messages, tools, llm_params) ->
543
  latency_ms=int((time.monotonic() - t_start) * 1000),
544
  finish_reason=finish_reason,
545
  )
546
- thinking_blocks = None
547
- reasoning_content = None
548
- if chunks and _should_replay_thinking_state(llm_params.get("model")):
549
  try:
550
  rebuilt = stream_chunk_builder(chunks, messages=messages)
551
  if rebuilt and getattr(rebuilt, "choices", None):
 
410
  message: Any,
411
  ) -> tuple[list[dict[str, Any]] | None, str | None]:
412
  """Return provider reasoning fields that must be replayed after tool calls."""
413
+ provider_fields = getattr(message, "provider_specific_fields", None)
414
+ if not isinstance(provider_fields, dict):
415
+ provider_fields = {}
416
+
417
+ thinking_blocks = (
418
+ getattr(message, "thinking_blocks", None)
419
+ or provider_fields.get("thinking_blocks")
420
+ or None
421
+ )
422
+ reasoning_content = (
423
+ getattr(message, "reasoning_content", None)
424
+ or provider_fields.get("reasoning_content")
425
+ or None
426
+ )
427
  return thinking_blocks, reasoning_content
428
 
429
 
 
504
  finish_reason = None
505
  final_usage_chunk = None
506
  chunks = []
507
+ should_replay_thinking = _should_replay_thinking_state(llm_params.get("model"))
508
+ collected_thinking_blocks: list[dict[str, Any]] = []
509
+ collected_reasoning_content: list[str] = []
510
 
511
  async for chunk in response:
512
  chunks.append(chunk)
 
525
  if choice.finish_reason:
526
  finish_reason = choice.finish_reason
527
 
528
+ if should_replay_thinking:
529
+ delta_thinking_blocks, delta_reasoning_content = _extract_thinking_state(delta)
530
+ if delta_thinking_blocks:
531
+ collected_thinking_blocks.extend(delta_thinking_blocks)
532
+ if delta_reasoning_content:
533
+ collected_reasoning_content.append(delta_reasoning_content)
534
+
535
  if delta.content:
536
  full_content += delta.content
537
  await session.send_event(
 
565
  latency_ms=int((time.monotonic() - t_start) * 1000),
566
  finish_reason=finish_reason,
567
  )
568
+ thinking_blocks = collected_thinking_blocks or None
569
+ reasoning_content = "".join(collected_reasoning_content) or None
570
+ if chunks and should_replay_thinking and not (thinking_blocks or reasoning_content):
571
  try:
572
  rebuilt = stream_chunk_builder(chunks, messages=messages)
573
  if rebuilt and getattr(rebuilt, "choices", None):
tests/integration/test_live_thinking_models.py ADDED
@@ -0,0 +1,151 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Opt-in live provider checks for thinking metadata replay.
2
+
3
+ These tests intentionally call paid model APIs and are skipped unless
4
+ ``ML_INTERN_LIVE_LLM_TESTS=1`` plus the relevant provider key are set.
5
+ They cover the concrete model families involved in #87 without making
6
+ default CI depend on external credentials or provider availability.
7
+ """
8
+
9
+ from __future__ import annotations
10
+
11
+ import os
12
+ from pathlib import Path
13
+ from types import SimpleNamespace
14
+
15
+ import pytest
16
+ from dotenv import load_dotenv
17
+ from litellm import Message
18
+
19
+ from agent.core.agent_loop import (
20
+ _assistant_message_from_result,
21
+ _call_llm_streaming,
22
+ )
23
+ from agent.core.llm_params import _resolve_llm_params
24
+
25
+
26
+ if env_file := os.environ.get("ML_INTERN_LIVE_ENV_FILE"):
27
+ load_dotenv(Path(env_file))
28
+
29
+ LIVE_TESTS_ENABLED = os.environ.get("ML_INTERN_LIVE_LLM_TESTS") == "1"
30
+ OPUS_47_MODEL = "anthropic/claude-opus-4-7"
31
+ LATEST_GPT_MODEL = "openai/gpt-5.2"
32
+ REPORT_RESULT_TOOL = [
33
+ {
34
+ "type": "function",
35
+ "function": {
36
+ "name": "report_result",
37
+ "description": "Report the final test result.",
38
+ "parameters": {
39
+ "type": "object",
40
+ "properties": {
41
+ "answer": {
42
+ "type": "string",
43
+ "description": "The exact marker requested by the test.",
44
+ }
45
+ },
46
+ "required": ["answer"],
47
+ },
48
+ },
49
+ }
50
+ ]
51
+
52
+
53
+ def _skip_without_live_flag() -> None:
54
+ if not LIVE_TESTS_ENABLED:
55
+ pytest.skip("set ML_INTERN_LIVE_LLM_TESTS=1 to run paid live LLM tests")
56
+
57
+
58
+ def _skip_without_env(name: str) -> None:
59
+ if not os.environ.get(name):
60
+ pytest.skip(f"set {name} to run this live provider test")
61
+
62
+
63
+ def _session(model_name: str):
64
+ events = []
65
+
66
+ async def send_event(event):
67
+ events.append(event)
68
+
69
+ return SimpleNamespace(
70
+ config=SimpleNamespace(model_name=model_name),
71
+ is_cancelled=False,
72
+ send_event=send_event,
73
+ events=events,
74
+ )
75
+
76
+
77
+ @pytest.mark.asyncio
78
+ async def test_live_opus_47_preserves_thinking_metadata_for_replay():
79
+ _skip_without_live_flag()
80
+ _skip_without_env("ANTHROPIC_API_KEY")
81
+
82
+ session = _session(OPUS_47_MODEL)
83
+ llm_params = _resolve_llm_params(
84
+ OPUS_47_MODEL,
85
+ reasoning_effort="high",
86
+ )
87
+
88
+ result = await _call_llm_streaming(
89
+ session,
90
+ messages=[
91
+ Message(
92
+ role="user",
93
+ content=(
94
+ "Use careful reasoning for this small check. "
95
+ "If 17 * 19 = 323, call report_result with answer OPUS_OK."
96
+ ),
97
+ )
98
+ ],
99
+ tools=REPORT_RESULT_TOOL,
100
+ llm_params=llm_params,
101
+ )
102
+
103
+ replay = _assistant_message_from_result(
104
+ result,
105
+ model_name=OPUS_47_MODEL,
106
+ )
107
+
108
+ assert result.content or result.tool_calls_acc
109
+ assert result.thinking_blocks, (
110
+ "Opus returned no thinking_blocks with reasoning_effort='high' - "
111
+ "check that adaptive thinking params are being forwarded correctly"
112
+ )
113
+ assert getattr(replay, "thinking_blocks", None) == result.thinking_blocks
114
+ assert getattr(replay, "reasoning_content", None) == result.reasoning_content
115
+
116
+
117
+ @pytest.mark.asyncio
118
+ async def test_live_latest_gpt_does_not_replay_reasoning_metadata():
119
+ _skip_without_live_flag()
120
+ _skip_without_env("OPENAI_API_KEY")
121
+
122
+ session = _session(LATEST_GPT_MODEL)
123
+ llm_params = _resolve_llm_params(
124
+ LATEST_GPT_MODEL,
125
+ reasoning_effort="low",
126
+ )
127
+
128
+ result = await _call_llm_streaming(
129
+ session,
130
+ messages=[
131
+ Message(
132
+ role="user",
133
+ content="Call report_result with answer GPT_OK.",
134
+ )
135
+ ],
136
+ tools=REPORT_RESULT_TOOL,
137
+ llm_params=llm_params,
138
+ )
139
+
140
+ # Even if a GPT-family response carries provider reasoning internally,
141
+ # OpenAI-compatible history must not echo it back on the next tool turn.
142
+ # Force the non-None strip path when the live model omits reasoning details.
143
+ result.reasoning_content = result.reasoning_content or "synthetic-reasoning"
144
+ replay = _assistant_message_from_result(
145
+ result,
146
+ model_name=LATEST_GPT_MODEL,
147
+ )
148
+
149
+ assert result.content or result.tool_calls_acc
150
+ assert getattr(replay, "thinking_blocks", None) is None
151
+ assert getattr(replay, "reasoning_content", None) is None
tests/unit/test_thinking_history.py CHANGED
@@ -26,6 +26,20 @@ def test_extract_thinking_state_from_litellm_message():
26
  assert reasoning_content == "reasoned"
27
 
28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  def test_assistant_message_from_result_preserves_thinking_with_tool_calls():
30
  tool_call = ChatCompletionMessageToolCall(
31
  id="call_1",
@@ -144,6 +158,60 @@ async def test_streaming_call_rebuilds_anthropic_thinking_state(monkeypatch):
144
  assert result.reasoning_content == "reasoned"
145
 
146
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  @pytest.mark.asyncio
148
  async def test_streaming_call_skips_chunk_rebuild_for_non_anthropic(monkeypatch):
149
  async def fake_stream():
 
26
  assert reasoning_content == "reasoned"
27
 
28
 
29
+ def test_extract_thinking_state_from_provider_fields():
30
+ message = SimpleNamespace(
31
+ provider_specific_fields={
32
+ "thinking_blocks": [{"type": "thinking", "thinking": "reasoned"}],
33
+ "reasoning_content": "reasoned",
34
+ },
35
+ )
36
+
37
+ thinking_blocks, reasoning_content = _extract_thinking_state(message)
38
+
39
+ assert thinking_blocks == [{"type": "thinking", "thinking": "reasoned"}]
40
+ assert reasoning_content == "reasoned"
41
+
42
+
43
  def test_assistant_message_from_result_preserves_thinking_with_tool_calls():
44
  tool_call = ChatCompletionMessageToolCall(
45
  id="call_1",
 
158
  assert result.reasoning_content == "reasoned"
159
 
160
 
161
+ @pytest.mark.asyncio
162
+ async def test_streaming_call_collects_anthropic_delta_thinking_state(monkeypatch):
163
+ async def fake_stream():
164
+ yield SimpleNamespace(
165
+ choices=[
166
+ SimpleNamespace(
167
+ delta=SimpleNamespace(
168
+ content=None,
169
+ tool_calls=None,
170
+ thinking_blocks=[{"type": "thinking", "thinking": "reasoned"}],
171
+ ),
172
+ finish_reason=None,
173
+ )
174
+ ],
175
+ )
176
+ yield SimpleNamespace(
177
+ choices=[
178
+ SimpleNamespace(
179
+ delta=SimpleNamespace(content="done", tool_calls=None),
180
+ finish_reason="stop",
181
+ )
182
+ ],
183
+ )
184
+ yield SimpleNamespace(choices=[], usage=SimpleNamespace(total_tokens=3))
185
+
186
+ async def fake_acompletion(**_kwargs):
187
+ return fake_stream()
188
+
189
+ def fail_chunk_builder(*_args, **_kwargs):
190
+ raise AssertionError("stream_chunk_builder should not run when deltas include thinking")
191
+
192
+ events = []
193
+ async def send_event(event):
194
+ events.append(event)
195
+
196
+ session = SimpleNamespace(
197
+ config=SimpleNamespace(model_name="anthropic/claude-opus-4-7"),
198
+ is_cancelled=False,
199
+ send_event=send_event,
200
+ )
201
+ monkeypatch.setattr(agent_loop, "acompletion", fake_acompletion)
202
+ monkeypatch.setattr(agent_loop, "stream_chunk_builder", fail_chunk_builder)
203
+
204
+ result = await _call_llm_streaming(
205
+ session,
206
+ messages=[Message(role="user", content="hi")],
207
+ tools=[],
208
+ llm_params={"model": "anthropic/claude-opus-4-7"},
209
+ )
210
+
211
+ assert result.content == "done"
212
+ assert result.thinking_blocks == [{"type": "thinking", "thinking": "reasoned"}]
213
+
214
+
215
  @pytest.mark.asyncio
216
  async def test_streaming_call_skips_chunk_rebuild_for_non_anthropic(monkeypatch):
217
  async def fake_stream():