Reinforcement Learning
Transformers
English
post-training
distillation
agentic-coding
composer-2.5
cursor
kimi-k2
grpo
dapo
diloco
openenv
trl
verl
research
methodology
Instructions to use Codeseys/composer-replication-framework with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use Codeseys/composer-replication-framework with Transformers:
# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("Codeseys/composer-replication-framework", dtype="auto") - Notebooks
- Google Colab
- Kaggle
composer-replication-framework / composer_replication /ingestion /tests /test_trace_examples_adapter.py
| """Tests for composer_replication.ingestion.trace_examples (Wave 19). | |
| Pins the contract that: | |
| 1. ClaudeCodeIngester output → claude_states_to_trace_examples → list[TraceExample] | |
| 2. Tool errors in source JSONL (`is_error: true`) survive the ingester's | |
| [TOOL_RESULT (ERROR)] tag → are detected by the adapter → mark the | |
| subsequent assistant turn with tool_error | |
| 3. The default error classifier categorizes common error kinds | |
| 4. The output is a valid input to ComposerDataCollator with hint_generator | |
| """ | |
| from __future__ import annotations | |
| from pathlib import Path | |
| import pytest | |
| from composer_replication.ingestion import ( | |
| ClaudeCodeIngester, | |
| TOOL_ERROR_TAG, | |
| claude_states_to_trace_examples, | |
| default_classify_error, | |
| ) | |
| HERE = Path(__file__).resolve().parent | |
| FIXTURE_DIR = HERE.parent.parent.parent / "spikes" / "007-real-trace-ingestion" / "fixtures" | |
| ERROR_FIXTURE = FIXTURE_DIR / "synthetic_session_with_error.jsonl" | |
| OK_FIXTURE = FIXTURE_DIR / "synthetic_session.jsonl" | |
| # ---------------------------------------------------------------------- | |
| # Error classifier | |
| # ---------------------------------------------------------------------- | |
| def test_classify_file_not_found(): | |
| assert default_classify_error( | |
| "Error: File does not exist: /etc/foo.yaml" | |
| ) == "file_not_found" | |
| assert default_classify_error( | |
| "no such file or directory: /tmp/x" | |
| ) == "file_not_found" | |
| def test_classify_permission_denied(): | |
| assert default_classify_error("Permission denied") == "permission_denied" | |
| def test_classify_command_not_found(): | |
| assert default_classify_error("bash: foo: command not found") == "command_not_found" | |
| def test_classify_unknown_falls_back(): | |
| assert default_classify_error("something weird went wrong") == "tool_error" | |
| # ---------------------------------------------------------------------- | |
| # Adapter — happy path with error site | |
| # ---------------------------------------------------------------------- | |
| def test_adapter_emits_one_example_per_state(): | |
| ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) | |
| states = list(ingester.ingest(ERROR_FIXTURE)) | |
| examples = claude_states_to_trace_examples(states) | |
| assert len(examples) == len(states) | |
| def test_adapter_detects_tool_error_on_recovery_turn(): | |
| """The assistant turn IMMEDIATELY AFTER a [TOOL_RESULT (ERROR)] user | |
| turn must be marked with tool_error. Earlier assistant turns (before | |
| any error) and assistant turns separated from the error by a | |
| successful tool result must NOT be marked.""" | |
| ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) | |
| states = list(ingester.ingest(ERROR_FIXTURE)) | |
| examples = claude_states_to_trace_examples(states) | |
| # Find the example with at least one error turn | |
| error_examples = [ | |
| ex for ex in examples | |
| if any(t.get("tool_error") for t in ex["turns"]) | |
| ] | |
| assert error_examples, ( | |
| f"Expected ≥1 example with a tool_error turn; got {len(error_examples)}. " | |
| f"Per-example error turns: {[(ex['trace_id'], sum(1 for t in ex['turns'] if t.get('tool_error'))) for ex in examples]}" | |
| ) | |
| # The error fixture has one error site; one of the late states should have exactly 1 error turn | |
| err_counts = [ | |
| sum(1 for t in ex["turns"] if t.get("tool_error")) | |
| for ex in examples | |
| ] | |
| assert max(err_counts) == 1, ( | |
| f"Expected exactly 1 error turn in some state; counts: {err_counts}" | |
| ) | |
| def test_adapter_classifies_file_not_found_in_fixture(): | |
| ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) | |
| states = list(ingester.ingest(ERROR_FIXTURE)) | |
| examples = claude_states_to_trace_examples(states) | |
| error_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")] | |
| assert any(t["tool_error"] == "file_not_found" for t in error_turns), ( | |
| f"Expected 'file_not_found' classification on the fixture's " | |
| f"non-existent-config error; got: " | |
| f"{[t['tool_error'] for t in error_turns]}" | |
| ) | |
| def test_adapter_no_errors_on_clean_fixture(): | |
| """The original Spike 007 fixture has no is_error: true rows, so no | |
| error turns should be detected.""" | |
| ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) | |
| states = list(ingester.ingest(OK_FIXTURE)) | |
| examples = claude_states_to_trace_examples(states) | |
| err_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")] | |
| assert not err_turns, ( | |
| f"Clean fixture should have 0 error turns; got " | |
| f"{len(err_turns)}: {[t['tool_error'] for t in err_turns]}" | |
| ) | |
| def test_adapter_preserves_role_and_content(): | |
| """Every output turn should have role + content from the input messages.""" | |
| ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) | |
| states = list(ingester.ingest(ERROR_FIXTURE)) | |
| examples = claude_states_to_trace_examples(states) | |
| for ex in examples: | |
| for turn in ex["turns"]: | |
| assert "role" in turn | |
| assert "content" in turn | |
| assert turn["role"] in ("system", "user", "assistant", "tool") | |
| def test_adapter_custom_error_kind_fn(): | |
| """User-provided error_kind_fn should override default classification.""" | |
| ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) | |
| states = list(ingester.ingest(ERROR_FIXTURE)) | |
| def custom_kind(content: str) -> str: | |
| return "custom_kind" | |
| examples = claude_states_to_trace_examples(states, error_kind_fn=custom_kind) | |
| error_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")] | |
| assert all(t["tool_error"] == "custom_kind" for t in error_turns) | |
| def test_adapter_threads_final_reward(): | |
| ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) | |
| states = list(ingester.ingest(ERROR_FIXTURE)) | |
| examples = claude_states_to_trace_examples(states, final_reward=0.5) | |
| assert all(ex["final_reward"] == 0.5 for ex in examples) | |
| # ---------------------------------------------------------------------- | |
| # Tool error tag constant | |
| # ---------------------------------------------------------------------- | |
| def test_tool_error_tag_matches_ingester_output(): | |
| """The TOOL_ERROR_TAG constant must match what ClaudeCodeIngester | |
| actually writes for is_error: true records.""" | |
| ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) | |
| states = list(ingester.ingest(ERROR_FIXTURE)) | |
| # Find a user-message containing an error tool_result | |
| contents = [ | |
| m.get("content", "") | |
| for s in states for m in s["messages"] | |
| if m.get("role") == "user" | |
| ] | |
| assert any(TOOL_ERROR_TAG in c for c in contents if isinstance(c, str)), ( | |
| f"TOOL_ERROR_TAG {TOOL_ERROR_TAG!r} not found in any user content; " | |
| f"the constant has drifted from the ingester's output format." | |
| ) | |
| # ---------------------------------------------------------------------- | |
| # Structural error flag (Wave 20 — eliminate TOOL_ERROR_TAG coupling) | |
| # ---------------------------------------------------------------------- | |
| def test_ingester_sets_structural_tool_error_flag(): | |
| """The ingester must set a STRUCTURAL `tool_error: True` boolean on | |
| user messages whose source JSONL had `is_error: true`, independent of | |
| the rendered string tag.""" | |
| ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) | |
| states = list(ingester.ingest(ERROR_FIXTURE)) | |
| flagged = [ | |
| m for s in states for m in s["messages"] | |
| if m.get("role") == "user" and m.get("tool_error") is True | |
| ] | |
| assert flagged, ( | |
| "Expected ≥1 user message with structural tool_error=True flag; " | |
| "the ingester is not surfacing is_error structurally." | |
| ) | |
| # And every structurally-flagged message must also render the tag | |
| # (the tag is kept for readability — both should co-occur on the fixture). | |
| for m in flagged: | |
| assert TOOL_ERROR_TAG in m["content"], ( | |
| "Structural flag set but string tag missing — the two views " | |
| "of the same error have diverged within the ingester." | |
| ) | |
| def test_clean_fixture_has_no_structural_flag(): | |
| """No user message on the clean fixture should carry tool_error=True.""" | |
| ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) | |
| states = list(ingester.ingest(OK_FIXTURE)) | |
| flagged = [ | |
| m for s in states for m in s["messages"] | |
| if m.get("role") == "user" and m.get("tool_error") | |
| ] | |
| assert not flagged, f"Clean fixture should have 0 structural flags; got {len(flagged)}" | |
| def test_structural_flag_survives_tag_drift(): | |
| """THE drift-resilience guarantee: if the rendered string tag drifts | |
| (e.g. a future serialization change strips or renames it) but the | |
| structural `tool_error: True` flag is present, the adapter MUST still | |
| detect the error site. This is the entire point of the Wave 20 fix — | |
| detection no longer depends on grepping a human-readable string.""" | |
| # Hand-build a state where the tag is ABSENT from content but the | |
| # structural flag is set — simulating ingester serialization drift. | |
| states = [{ | |
| "state_id": "drift-0", | |
| "messages": [ | |
| {"role": "system", "content": "sys"}, | |
| {"role": "user", "content": "run the build"}, | |
| {"role": "assistant", "content": "[TOOL_USE] name=Bash input={}"}, | |
| # Tag DELIBERATELY absent from content; only the structural flag. | |
| {"role": "user", "content": "build failed: missing target", | |
| "tool_error": True}, | |
| {"role": "assistant", "content": "Let me fix the target."}, | |
| ], | |
| }] | |
| examples = claude_states_to_trace_examples(states) | |
| assert len(examples) == 1 | |
| err_turns = [t for t in examples[0]["turns"] if t.get("tool_error")] | |
| assert len(err_turns) == 1, ( | |
| "Structural flag present but adapter failed to detect the error " | |
| "site without the string tag — the coupling fix is broken." | |
| ) | |
| # The recovery turn is the assistant immediately after the flagged user turn. | |
| assert err_turns[0]["content"] == "Let me fix the target." | |
| def test_structural_false_suppresses_tag_match(): | |
| """Inverse drift case: a producer sets `tool_error: False` to assert | |
| 'this is NOT an error' even though the rendered content happens to | |
| contain the tag string. The structural flag must WIN over the string.""" | |
| states = [{ | |
| "state_id": "false-0", | |
| "messages": [ | |
| {"role": "system", "content": "sys"}, | |
| {"role": "user", "content": "look at this log"}, | |
| {"role": "assistant", "content": "[TOOL_USE] name=Read input={}"}, | |
| # Content contains the tag verbatim (e.g. quoting a prior log) | |
| # but the producer asserts it's not a live error site. | |
| {"role": "user", | |
| "content": f"the docs mention {TOOL_ERROR_TAG} as an example", | |
| "tool_error": False}, | |
| {"role": "assistant", "content": "I see, that's just documentation."}, | |
| ], | |
| }] | |
| examples = claude_states_to_trace_examples(states) | |
| err_turns = [t for t in examples[0]["turns"] if t.get("tool_error")] | |
| assert not err_turns, ( | |
| "tool_error=False should suppress detection even when the string " | |
| "tag is present in content; structural flag must take precedence." | |
| ) | |
| def test_string_tag_fallback_when_no_structural_flag(): | |
| """Backward-compat: an OLD trace (no structural flag anywhere) with the | |
| tag in content must STILL be detected via the string fallback path.""" | |
| states = [{ | |
| "state_id": "legacy-0", | |
| "messages": [ | |
| {"role": "system", "content": "sys"}, | |
| {"role": "user", "content": "run it"}, | |
| {"role": "assistant", "content": "[TOOL_USE] name=Bash input={}"}, | |
| # No tool_error key at all — pure legacy serialization. | |
| {"role": "user", | |
| "content": f"{TOOL_ERROR_TAG} (id=x)\nno such file or directory"}, | |
| {"role": "assistant", "content": "Creating the file first."}, | |
| ], | |
| }] | |
| examples = claude_states_to_trace_examples(states) | |
| err_turns = [t for t in examples[0]["turns"] if t.get("tool_error")] | |
| assert len(err_turns) == 1, ( | |
| "Legacy trace without structural flag must fall back to the string " | |
| "tag match; backward compatibility broken." | |
| ) | |
| assert err_turns[0]["tool_error"] == "file_not_found" | |
| # ---------------------------------------------------------------------- | |
| # Empty input | |
| # ---------------------------------------------------------------------- | |
| def test_adapter_empty_input(): | |
| assert claude_states_to_trace_examples([]) == [] | |
| def test_adapter_state_with_no_messages(): | |
| """A degenerate state with empty messages should be skipped silently.""" | |
| examples = claude_states_to_trace_examples([{"state_id": "empty", "messages": []}]) | |
| assert examples == [] | |