| from unittest.mock import patch |
|
|
| from messaging.rendering.telegram_markdown import ( |
| escape_md_v2, |
| escape_md_v2_code, |
| mdv2_bold, |
| mdv2_code_inline, |
| render_markdown_to_mdv2, |
| ) |
| from messaging.transcript import RenderCtx, TranscriptBuffer |
|
|
|
|
| def _ctx() -> RenderCtx: |
| return RenderCtx( |
| bold=mdv2_bold, |
| code_inline=mdv2_code_inline, |
| escape_code=escape_md_v2_code, |
| escape_text=escape_md_v2, |
| render_markdown=render_markdown_to_mdv2, |
| thinking_tail_max=1000, |
| tool_input_tail_max=1200, |
| tool_output_tail_max=1600, |
| text_tail_max=2000, |
| ) |
|
|
|
|
| def test_transcript_order_thinking_tool_text(): |
| t = TranscriptBuffer() |
| t.apply({"type": "thinking_chunk", "text": "think1"}) |
| t.apply({"type": "tool_use", "id": "tool_1", "name": "ls", "input": {"path": "."}}) |
| t.apply({"type": "text_chunk", "text": "done"}) |
|
|
| out = t.render(_ctx(), limit_chars=3900, status=None) |
| assert out.find("think1") < out.find("Tool call:") < out.find("done") |
|
|
|
|
| def test_transcript_subagent_suppresses_thinking_and_text_inside(): |
| t = TranscriptBuffer() |
|
|
| |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "task_1", |
| "name": "Task", |
| "input": {"description": "Fix bug"}, |
| } |
| ) |
|
|
| |
| t.apply({"type": "thinking_delta", "index": -1, "text": "secret"}) |
| t.apply({"type": "text_chunk", "text": "visible?"}) |
|
|
| |
| t.apply({"type": "tool_use", "id": "tool_2", "name": "ls", "input": {"path": "."}}) |
| t.apply({"type": "tool_result", "tool_use_id": "tool_2", "content": "x"}) |
|
|
| |
| t.apply({"type": "tool_result", "tool_use_id": "task_1", "content": "done"}) |
|
|
| |
| t.apply({"type": "text_chunk", "text": "after"}) |
|
|
| out = t.render(_ctx(), limit_chars=3900, status=None) |
| assert "Subagent:" in out |
| assert "secret" not in out |
| assert "visible?" not in out |
| |
| assert out.count("Tool call:") == 1 |
| assert "\n π " in out or out.startswith(" π ") or " π " in out |
| assert "Tools used:" in out |
| assert "Tool calls:" in out |
| assert "after" in out |
|
|
|
|
| def test_transcript_subagent_closes_on_whitespace_tool_ids(): |
| t = TranscriptBuffer() |
|
|
| |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": " functions.Task:0", |
| "name": "Task", |
| "input": {"description": "Outer"}, |
| } |
| ) |
|
|
| |
| t.apply( |
| {"type": "tool_result", "tool_use_id": "functions.Task:0", "content": "done"} |
| ) |
|
|
| |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "functions.Task:1", |
| "name": "Task", |
| "input": {"description": "Next"}, |
| } |
| ) |
|
|
| out = t.render(_ctx(), limit_chars=3900, status=None) |
| assert out.count("Subagent:") == 2 |
| |
| assert "\n π€ *Subagent:* `Next`" not in out |
|
|
|
|
| def test_transcript_subagent_closes_on_task_result_id_suffix_match(): |
| t = TranscriptBuffer() |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "task_1", |
| "name": "Task", |
| "input": {"description": "Outer"}, |
| } |
| ) |
| t.apply({"type": "tool_result", "tool_use_id": "task_1_result", "content": "done"}) |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "task_2", |
| "name": "Task", |
| "input": {"description": "Next"}, |
| } |
| ) |
|
|
| out = t.render(_ctx(), limit_chars=3900, status=None) |
| assert out.count("Subagent:") == 2 |
| assert "\n π€ *Subagent:* `Next`" not in out |
|
|
|
|
| def test_transcript_unmatched_non_task_tool_result_does_not_pop_subagent(): |
| t = TranscriptBuffer() |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "task_1", |
| "name": "Task", |
| "input": {"description": "Outer"}, |
| } |
| ) |
| t.apply({"type": "tool_result", "tool_use_id": "totally_unrelated", "content": "x"}) |
|
|
| assert t._subagent_stack == ["task_1"] |
|
|
|
|
| def test_transcript_sequential_tasks_mismatched_results_no_depth_drift(): |
| t = TranscriptBuffer() |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "task_1", |
| "name": "Task", |
| "input": {"description": "A"}, |
| } |
| ) |
| t.apply({"type": "tool_result", "tool_use_id": "task_1_result", "content": "done"}) |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "task_2", |
| "name": "Task", |
| "input": {"description": "B"}, |
| } |
| ) |
| t.apply({"type": "tool_result", "tool_use_id": "task_2_result", "content": "done"}) |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "task_3", |
| "name": "Task", |
| "input": {"description": "C"}, |
| } |
| ) |
|
|
| out = t.render(_ctx(), limit_chars=3900, status=None) |
| assert "π€ *Subagent:* `A`\n π€ *Subagent:* `B`" not in out |
| assert "\n π€ *Subagent:* `C`" not in out |
| assert t._subagent_stack == ["task_3"] |
|
|
|
|
| def test_transcript_synthetic_task_start_closes_on_functions_task_result_id(): |
| t = TranscriptBuffer() |
| t.apply( |
| { |
| "type": "tool_use_start", |
| "index": 0, |
| "id": "", |
| "name": "Task", |
| "input": {"description": "Outer"}, |
| } |
| ) |
| t.apply({"type": "tool_result", "tool_use_id": "functions.Task:0", "content": "x"}) |
| t.apply( |
| { |
| "type": "tool_use_start", |
| "index": 1, |
| "id": "", |
| "name": "Task", |
| "input": {"description": "Next"}, |
| } |
| ) |
|
|
| out = t.render(_ctx(), limit_chars=3900, status=None) |
| assert out.count("Subagent:") == 2 |
| assert "\n π€ *Subagent:* `Next`" not in out |
|
|
|
|
| def test_transcript_synthetic_task_not_closed_by_unknown_non_task_result_id(): |
| t = TranscriptBuffer() |
| t.apply( |
| { |
| "type": "tool_use_start", |
| "index": 0, |
| "id": "", |
| "name": "Task", |
| "input": {"description": "Outer"}, |
| } |
| ) |
| t.apply({"type": "tool_result", "tool_use_id": "call_deadbeef", "content": "x"}) |
|
|
| assert t._subagent_stack == ["__task_1"] |
|
|
|
|
| def test_transcript_overlapping_tasks_are_flat_not_nested(): |
| t = TranscriptBuffer() |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "task_a", |
| "name": "Task", |
| "input": {"description": "A"}, |
| } |
| ) |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "task_b", |
| "name": "Task", |
| "input": {"description": "B"}, |
| } |
| ) |
| t.apply({"type": "tool_result", "tool_use_id": "task_b", "content": "done"}) |
| t.apply({"type": "tool_result", "tool_use_id": "task_a", "content": "done"}) |
|
|
| out = t.render(_ctx(), limit_chars=3900, status=None) |
| assert "π€ *Subagent:* `A`" in out |
| assert "π€ *Subagent:* `B`" in out |
| assert out.find("π€ *Subagent:* `A`") < out.find("π€ *Subagent:* `B`") |
| assert "\n π€ *Subagent:* `B`" not in out |
|
|
|
|
| def test_transcript_truncates_by_dropping_oldest_segments(): |
| t = TranscriptBuffer() |
|
|
| |
| for i in range(60): |
| t.apply({"type": "text_start", "index": i}) |
| t.apply( |
| {"type": "text_delta", "index": i, "text": f"segment_{i} " + ("x" * 120)} |
| ) |
| t.apply({"type": "block_stop", "index": i}) |
|
|
| out = t.render(_ctx(), limit_chars=600, status="status") |
| assert escape_md_v2("... (truncated)") in out |
| |
| assert escape_md_v2("segment_59") in out |
| assert escape_md_v2("segment_0") not in out |
|
|
|
|
| def test_transcript_render_many_segments_completes_quickly(): |
| """Render with 200+ segments exercises O(n) truncation (deque popleft).""" |
| t = TranscriptBuffer() |
| for i in range(200): |
| t.apply({"type": "text_start", "index": i}) |
| t.apply({"type": "text_delta", "index": i, "text": f"seg_{i} " + ("y" * 80)}) |
| t.apply({"type": "block_stop", "index": i}) |
|
|
| out = t.render(_ctx(), limit_chars=500, status="ok") |
| assert escape_md_v2("... (truncated)") in out |
| assert "199" in out |
| assert "seg_0 " not in out |
|
|
|
|
| def test_transcript_reused_index_closes_previous_open_block(): |
| t = TranscriptBuffer() |
| |
| t.apply({"type": "text_start", "index": 0}) |
| t.apply({"type": "text_delta", "index": 0, "text": "a"}) |
| |
| t.apply( |
| {"type": "tool_use_start", "index": 0, "id": "t1", "name": "ls", "input": {}} |
| ) |
| |
| assert 0 not in t._open_text_by_index |
| assert 0 in t._open_tools_by_index |
|
|
|
|
| def test_transcript_render_segment_exception_skipped(): |
| """When a segment's render() raises, that segment is skipped and rest is rendered.""" |
| t = TranscriptBuffer() |
| t.apply({"type": "thinking_chunk", "text": "before"}) |
| t.apply({"type": "text_chunk", "text": "middle"}) |
| t.apply({"type": "text_chunk", "text": "after"}) |
|
|
| bad_segment = t._segments[1] |
|
|
| def _raising_render(self, ctx): |
| raise ValueError("render failed") |
|
|
| with patch.object(bad_segment, "render", _raising_render): |
| out = t.render(_ctx(), limit_chars=3900, status=None) |
| assert "before" in out |
| assert "after" in out |
| assert "middle" not in out |
|
|
|
|
| def test_transcript_render_status_only_exceeds_limit(): |
| """When all segments dropped, status-only output; long status returned as-is.""" |
| t = TranscriptBuffer() |
| t.apply({"type": "text_chunk", "text": "x" * 5000}) |
|
|
| long_status = "A" * 500 |
| msg = t.render(_ctx(), limit_chars=100, status=long_status) |
| assert "... (truncated)" in msg or long_status in msg |
|
|
|
|
| def test_transcript_truncation_preserves_last_segment_tail(): |
| """When all segments exceed limit, preserve tail of last segment (not just marker+status).""" |
| t = TranscriptBuffer() |
| t.apply({"type": "thinking_chunk", "text": "Thinking..."}) |
| t.apply( |
| {"type": "text_chunk", "text": "The actual output content here" + "x" * 500} |
| ) |
|
|
| msg = t.render(_ctx(), limit_chars=100, status="β
*Complete*") |
| |
| assert escape_md_v2("... (truncated)") in msg |
| assert "β
*Complete*" in msg |
| assert "actual output" in msg or "content" in msg or "x" in msg |
|
|