Codeseys's picture
Wave 21: close both Wave 20 debt items — chat-template alignment + structural is_error
6806cf7
Raw
History Blame Contribute Delete
13 kB
"""Tests for composer_replication.ingestion.trace_examples (Wave 19).
Pins the contract that:
1. ClaudeCodeIngester output → claude_states_to_trace_examples → list[TraceExample]
2. Tool errors in source JSONL (`is_error: true`) survive the ingester's
[TOOL_RESULT (ERROR)] tag → are detected by the adapter → mark the
subsequent assistant turn with tool_error
3. The default error classifier categorizes common error kinds
4. The output is a valid input to ComposerDataCollator with hint_generator
"""
from __future__ import annotations
from pathlib import Path
import pytest
from composer_replication.ingestion import (
ClaudeCodeIngester,
TOOL_ERROR_TAG,
claude_states_to_trace_examples,
default_classify_error,
)
HERE = Path(__file__).resolve().parent
FIXTURE_DIR = HERE.parent.parent.parent / "spikes" / "007-real-trace-ingestion" / "fixtures"
ERROR_FIXTURE = FIXTURE_DIR / "synthetic_session_with_error.jsonl"
OK_FIXTURE = FIXTURE_DIR / "synthetic_session.jsonl"
# ----------------------------------------------------------------------
# Error classifier
# ----------------------------------------------------------------------
def test_classify_file_not_found():
assert default_classify_error(
"Error: File does not exist: /etc/foo.yaml"
) == "file_not_found"
assert default_classify_error(
"no such file or directory: /tmp/x"
) == "file_not_found"
def test_classify_permission_denied():
assert default_classify_error("Permission denied") == "permission_denied"
def test_classify_command_not_found():
assert default_classify_error("bash: foo: command not found") == "command_not_found"
def test_classify_unknown_falls_back():
assert default_classify_error("something weird went wrong") == "tool_error"
# ----------------------------------------------------------------------
# Adapter — happy path with error site
# ----------------------------------------------------------------------
def test_adapter_emits_one_example_per_state():
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
examples = claude_states_to_trace_examples(states)
assert len(examples) == len(states)
def test_adapter_detects_tool_error_on_recovery_turn():
"""The assistant turn IMMEDIATELY AFTER a [TOOL_RESULT (ERROR)] user
turn must be marked with tool_error. Earlier assistant turns (before
any error) and assistant turns separated from the error by a
successful tool result must NOT be marked."""
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
examples = claude_states_to_trace_examples(states)
# Find the example with at least one error turn
error_examples = [
ex for ex in examples
if any(t.get("tool_error") for t in ex["turns"])
]
assert error_examples, (
f"Expected ≥1 example with a tool_error turn; got {len(error_examples)}. "
f"Per-example error turns: {[(ex['trace_id'], sum(1 for t in ex['turns'] if t.get('tool_error'))) for ex in examples]}"
)
# The error fixture has one error site; one of the late states should have exactly 1 error turn
err_counts = [
sum(1 for t in ex["turns"] if t.get("tool_error"))
for ex in examples
]
assert max(err_counts) == 1, (
f"Expected exactly 1 error turn in some state; counts: {err_counts}"
)
def test_adapter_classifies_file_not_found_in_fixture():
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
examples = claude_states_to_trace_examples(states)
error_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")]
assert any(t["tool_error"] == "file_not_found" for t in error_turns), (
f"Expected 'file_not_found' classification on the fixture's "
f"non-existent-config error; got: "
f"{[t['tool_error'] for t in error_turns]}"
)
def test_adapter_no_errors_on_clean_fixture():
"""The original Spike 007 fixture has no is_error: true rows, so no
error turns should be detected."""
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(OK_FIXTURE))
examples = claude_states_to_trace_examples(states)
err_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")]
assert not err_turns, (
f"Clean fixture should have 0 error turns; got "
f"{len(err_turns)}: {[t['tool_error'] for t in err_turns]}"
)
def test_adapter_preserves_role_and_content():
"""Every output turn should have role + content from the input messages."""
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
examples = claude_states_to_trace_examples(states)
for ex in examples:
for turn in ex["turns"]:
assert "role" in turn
assert "content" in turn
assert turn["role"] in ("system", "user", "assistant", "tool")
def test_adapter_custom_error_kind_fn():
"""User-provided error_kind_fn should override default classification."""
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
def custom_kind(content: str) -> str:
return "custom_kind"
examples = claude_states_to_trace_examples(states, error_kind_fn=custom_kind)
error_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")]
assert all(t["tool_error"] == "custom_kind" for t in error_turns)
def test_adapter_threads_final_reward():
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
examples = claude_states_to_trace_examples(states, final_reward=0.5)
assert all(ex["final_reward"] == 0.5 for ex in examples)
# ----------------------------------------------------------------------
# Tool error tag constant
# ----------------------------------------------------------------------
def test_tool_error_tag_matches_ingester_output():
"""The TOOL_ERROR_TAG constant must match what ClaudeCodeIngester
actually writes for is_error: true records."""
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
# Find a user-message containing an error tool_result
contents = [
m.get("content", "")
for s in states for m in s["messages"]
if m.get("role") == "user"
]
assert any(TOOL_ERROR_TAG in c for c in contents if isinstance(c, str)), (
f"TOOL_ERROR_TAG {TOOL_ERROR_TAG!r} not found in any user content; "
f"the constant has drifted from the ingester's output format."
)
# ----------------------------------------------------------------------
# Structural error flag (Wave 20 — eliminate TOOL_ERROR_TAG coupling)
# ----------------------------------------------------------------------
def test_ingester_sets_structural_tool_error_flag():
"""The ingester must set a STRUCTURAL `tool_error: True` boolean on
user messages whose source JSONL had `is_error: true`, independent of
the rendered string tag."""
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
flagged = [
m for s in states for m in s["messages"]
if m.get("role") == "user" and m.get("tool_error") is True
]
assert flagged, (
"Expected ≥1 user message with structural tool_error=True flag; "
"the ingester is not surfacing is_error structurally."
)
# And every structurally-flagged message must also render the tag
# (the tag is kept for readability — both should co-occur on the fixture).
for m in flagged:
assert TOOL_ERROR_TAG in m["content"], (
"Structural flag set but string tag missing — the two views "
"of the same error have diverged within the ingester."
)
def test_clean_fixture_has_no_structural_flag():
"""No user message on the clean fixture should carry tool_error=True."""
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(OK_FIXTURE))
flagged = [
m for s in states for m in s["messages"]
if m.get("role") == "user" and m.get("tool_error")
]
assert not flagged, f"Clean fixture should have 0 structural flags; got {len(flagged)}"
def test_structural_flag_survives_tag_drift():
"""THE drift-resilience guarantee: if the rendered string tag drifts
(e.g. a future serialization change strips or renames it) but the
structural `tool_error: True` flag is present, the adapter MUST still
detect the error site. This is the entire point of the Wave 20 fix —
detection no longer depends on grepping a human-readable string."""
# Hand-build a state where the tag is ABSENT from content but the
# structural flag is set — simulating ingester serialization drift.
states = [{
"state_id": "drift-0",
"messages": [
{"role": "system", "content": "sys"},
{"role": "user", "content": "run the build"},
{"role": "assistant", "content": "[TOOL_USE] name=Bash input={}"},
# Tag DELIBERATELY absent from content; only the structural flag.
{"role": "user", "content": "build failed: missing target",
"tool_error": True},
{"role": "assistant", "content": "Let me fix the target."},
],
}]
examples = claude_states_to_trace_examples(states)
assert len(examples) == 1
err_turns = [t for t in examples[0]["turns"] if t.get("tool_error")]
assert len(err_turns) == 1, (
"Structural flag present but adapter failed to detect the error "
"site without the string tag — the coupling fix is broken."
)
# The recovery turn is the assistant immediately after the flagged user turn.
assert err_turns[0]["content"] == "Let me fix the target."
def test_structural_false_suppresses_tag_match():
"""Inverse drift case: a producer sets `tool_error: False` to assert
'this is NOT an error' even though the rendered content happens to
contain the tag string. The structural flag must WIN over the string."""
states = [{
"state_id": "false-0",
"messages": [
{"role": "system", "content": "sys"},
{"role": "user", "content": "look at this log"},
{"role": "assistant", "content": "[TOOL_USE] name=Read input={}"},
# Content contains the tag verbatim (e.g. quoting a prior log)
# but the producer asserts it's not a live error site.
{"role": "user",
"content": f"the docs mention {TOOL_ERROR_TAG} as an example",
"tool_error": False},
{"role": "assistant", "content": "I see, that's just documentation."},
],
}]
examples = claude_states_to_trace_examples(states)
err_turns = [t for t in examples[0]["turns"] if t.get("tool_error")]
assert not err_turns, (
"tool_error=False should suppress detection even when the string "
"tag is present in content; structural flag must take precedence."
)
def test_string_tag_fallback_when_no_structural_flag():
"""Backward-compat: an OLD trace (no structural flag anywhere) with the
tag in content must STILL be detected via the string fallback path."""
states = [{
"state_id": "legacy-0",
"messages": [
{"role": "system", "content": "sys"},
{"role": "user", "content": "run it"},
{"role": "assistant", "content": "[TOOL_USE] name=Bash input={}"},
# No tool_error key at all — pure legacy serialization.
{"role": "user",
"content": f"{TOOL_ERROR_TAG} (id=x)\nno such file or directory"},
{"role": "assistant", "content": "Creating the file first."},
],
}]
examples = claude_states_to_trace_examples(states)
err_turns = [t for t in examples[0]["turns"] if t.get("tool_error")]
assert len(err_turns) == 1, (
"Legacy trace without structural flag must fall back to the string "
"tag match; backward compatibility broken."
)
assert err_turns[0]["tool_error"] == "file_not_found"
# ----------------------------------------------------------------------
# Empty input
# ----------------------------------------------------------------------
def test_adapter_empty_input():
assert claude_states_to_trace_examples([]) == []
def test_adapter_state_with_no_messages():
"""A degenerate state with empty messages should be skipped silently."""
examples = claude_states_to_trace_examples([{"state_id": "empty", "messages": []}])
assert examples == []