"""Tests for composer_replication.ingestion.trace_examples (Wave 19). Pins the contract that: 1. ClaudeCodeIngester output → claude_states_to_trace_examples → list[TraceExample] 2. Tool errors in source JSONL (`is_error: true`) survive the ingester's [TOOL_RESULT (ERROR)] tag → are detected by the adapter → mark the subsequent assistant turn with tool_error 3. The default error classifier categorizes common error kinds 4. The output is a valid input to ComposerDataCollator with hint_generator """ from __future__ import annotations from pathlib import Path import pytest from composer_replication.ingestion import ( ClaudeCodeIngester, TOOL_ERROR_TAG, claude_states_to_trace_examples, default_classify_error, ) HERE = Path(__file__).resolve().parent FIXTURE_DIR = HERE.parent.parent.parent / "spikes" / "007-real-trace-ingestion" / "fixtures" ERROR_FIXTURE = FIXTURE_DIR / "synthetic_session_with_error.jsonl" OK_FIXTURE = FIXTURE_DIR / "synthetic_session.jsonl" # ---------------------------------------------------------------------- # Error classifier # ---------------------------------------------------------------------- def test_classify_file_not_found(): assert default_classify_error( "Error: File does not exist: /etc/foo.yaml" ) == "file_not_found" assert default_classify_error( "no such file or directory: /tmp/x" ) == "file_not_found" def test_classify_permission_denied(): assert default_classify_error("Permission denied") == "permission_denied" def test_classify_command_not_found(): assert default_classify_error("bash: foo: command not found") == "command_not_found" def test_classify_unknown_falls_back(): assert default_classify_error("something weird went wrong") == "tool_error" # ---------------------------------------------------------------------- # Adapter — happy path with error site # ---------------------------------------------------------------------- def test_adapter_emits_one_example_per_state(): ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) examples = claude_states_to_trace_examples(states) assert len(examples) == len(states) def test_adapter_detects_tool_error_on_recovery_turn(): """The assistant turn IMMEDIATELY AFTER a [TOOL_RESULT (ERROR)] user turn must be marked with tool_error. Earlier assistant turns (before any error) and assistant turns separated from the error by a successful tool result must NOT be marked.""" ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) examples = claude_states_to_trace_examples(states) # Find the example with at least one error turn error_examples = [ ex for ex in examples if any(t.get("tool_error") for t in ex["turns"]) ] assert error_examples, ( f"Expected ≥1 example with a tool_error turn; got {len(error_examples)}. " f"Per-example error turns: {[(ex['trace_id'], sum(1 for t in ex['turns'] if t.get('tool_error'))) for ex in examples]}" ) # The error fixture has one error site; one of the late states should have exactly 1 error turn err_counts = [ sum(1 for t in ex["turns"] if t.get("tool_error")) for ex in examples ] assert max(err_counts) == 1, ( f"Expected exactly 1 error turn in some state; counts: {err_counts}" ) def test_adapter_classifies_file_not_found_in_fixture(): ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) examples = claude_states_to_trace_examples(states) error_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")] assert any(t["tool_error"] == "file_not_found" for t in error_turns), ( f"Expected 'file_not_found' classification on the fixture's " f"non-existent-config error; got: " f"{[t['tool_error'] for t in error_turns]}" ) def test_adapter_no_errors_on_clean_fixture(): """The original Spike 007 fixture has no is_error: true rows, so no error turns should be detected.""" ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(OK_FIXTURE)) examples = claude_states_to_trace_examples(states) err_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")] assert not err_turns, ( f"Clean fixture should have 0 error turns; got " f"{len(err_turns)}: {[t['tool_error'] for t in err_turns]}" ) def test_adapter_preserves_role_and_content(): """Every output turn should have role + content from the input messages.""" ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) examples = claude_states_to_trace_examples(states) for ex in examples: for turn in ex["turns"]: assert "role" in turn assert "content" in turn assert turn["role"] in ("system", "user", "assistant", "tool") def test_adapter_custom_error_kind_fn(): """User-provided error_kind_fn should override default classification.""" ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) def custom_kind(content: str) -> str: return "custom_kind" examples = claude_states_to_trace_examples(states, error_kind_fn=custom_kind) error_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")] assert all(t["tool_error"] == "custom_kind" for t in error_turns) def test_adapter_threads_final_reward(): ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) examples = claude_states_to_trace_examples(states, final_reward=0.5) assert all(ex["final_reward"] == 0.5 for ex in examples) # ---------------------------------------------------------------------- # Tool error tag constant # ---------------------------------------------------------------------- def test_tool_error_tag_matches_ingester_output(): """The TOOL_ERROR_TAG constant must match what ClaudeCodeIngester actually writes for is_error: true records.""" ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) # Find a user-message containing an error tool_result contents = [ m.get("content", "") for s in states for m in s["messages"] if m.get("role") == "user" ] assert any(TOOL_ERROR_TAG in c for c in contents if isinstance(c, str)), ( f"TOOL_ERROR_TAG {TOOL_ERROR_TAG!r} not found in any user content; " f"the constant has drifted from the ingester's output format." ) # ---------------------------------------------------------------------- # Structural error flag (Wave 20 — eliminate TOOL_ERROR_TAG coupling) # ---------------------------------------------------------------------- def test_ingester_sets_structural_tool_error_flag(): """The ingester must set a STRUCTURAL `tool_error: True` boolean on user messages whose source JSONL had `is_error: true`, independent of the rendered string tag.""" ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) flagged = [ m for s in states for m in s["messages"] if m.get("role") == "user" and m.get("tool_error") is True ] assert flagged, ( "Expected ≥1 user message with structural tool_error=True flag; " "the ingester is not surfacing is_error structurally." ) # And every structurally-flagged message must also render the tag # (the tag is kept for readability — both should co-occur on the fixture). for m in flagged: assert TOOL_ERROR_TAG in m["content"], ( "Structural flag set but string tag missing — the two views " "of the same error have diverged within the ingester." ) def test_clean_fixture_has_no_structural_flag(): """No user message on the clean fixture should carry tool_error=True.""" ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(OK_FIXTURE)) flagged = [ m for s in states for m in s["messages"] if m.get("role") == "user" and m.get("tool_error") ] assert not flagged, f"Clean fixture should have 0 structural flags; got {len(flagged)}" def test_structural_flag_survives_tag_drift(): """THE drift-resilience guarantee: if the rendered string tag drifts (e.g. a future serialization change strips or renames it) but the structural `tool_error: True` flag is present, the adapter MUST still detect the error site. This is the entire point of the Wave 20 fix — detection no longer depends on grepping a human-readable string.""" # Hand-build a state where the tag is ABSENT from content but the # structural flag is set — simulating ingester serialization drift. states = [{ "state_id": "drift-0", "messages": [ {"role": "system", "content": "sys"}, {"role": "user", "content": "run the build"}, {"role": "assistant", "content": "[TOOL_USE] name=Bash input={}"}, # Tag DELIBERATELY absent from content; only the structural flag. {"role": "user", "content": "build failed: missing target", "tool_error": True}, {"role": "assistant", "content": "Let me fix the target."}, ], }] examples = claude_states_to_trace_examples(states) assert len(examples) == 1 err_turns = [t for t in examples[0]["turns"] if t.get("tool_error")] assert len(err_turns) == 1, ( "Structural flag present but adapter failed to detect the error " "site without the string tag — the coupling fix is broken." ) # The recovery turn is the assistant immediately after the flagged user turn. assert err_turns[0]["content"] == "Let me fix the target." def test_structural_false_suppresses_tag_match(): """Inverse drift case: a producer sets `tool_error: False` to assert 'this is NOT an error' even though the rendered content happens to contain the tag string. The structural flag must WIN over the string.""" states = [{ "state_id": "false-0", "messages": [ {"role": "system", "content": "sys"}, {"role": "user", "content": "look at this log"}, {"role": "assistant", "content": "[TOOL_USE] name=Read input={}"}, # Content contains the tag verbatim (e.g. quoting a prior log) # but the producer asserts it's not a live error site. {"role": "user", "content": f"the docs mention {TOOL_ERROR_TAG} as an example", "tool_error": False}, {"role": "assistant", "content": "I see, that's just documentation."}, ], }] examples = claude_states_to_trace_examples(states) err_turns = [t for t in examples[0]["turns"] if t.get("tool_error")] assert not err_turns, ( "tool_error=False should suppress detection even when the string " "tag is present in content; structural flag must take precedence." ) def test_string_tag_fallback_when_no_structural_flag(): """Backward-compat: an OLD trace (no structural flag anywhere) with the tag in content must STILL be detected via the string fallback path.""" states = [{ "state_id": "legacy-0", "messages": [ {"role": "system", "content": "sys"}, {"role": "user", "content": "run it"}, {"role": "assistant", "content": "[TOOL_USE] name=Bash input={}"}, # No tool_error key at all — pure legacy serialization. {"role": "user", "content": f"{TOOL_ERROR_TAG} (id=x)\nno such file or directory"}, {"role": "assistant", "content": "Creating the file first."}, ], }] examples = claude_states_to_trace_examples(states) err_turns = [t for t in examples[0]["turns"] if t.get("tool_error")] assert len(err_turns) == 1, ( "Legacy trace without structural flag must fall back to the string " "tag match; backward compatibility broken." ) assert err_turns[0]["tool_error"] == "file_not_found" # ---------------------------------------------------------------------- # Empty input # ---------------------------------------------------------------------- def test_adapter_empty_input(): assert claude_states_to_trace_examples([]) == [] def test_adapter_state_with_no_messages(): """A degenerate state with empty messages should be skipped silently.""" examples = claude_states_to_trace_examples([{"state_id": "empty", "messages": []}]) assert examples == []