|
|
"""ThinkingStreamProcessor 单元测试 |
|
|
|
|
|
覆盖 <thinking> 标签在流式分片中被拆分的场景,避免思维链泄露到 text 输出。 |
|
|
""" |
|
|
|
|
|
from pathlib import Path |
|
|
import sys |
|
|
|
|
|
import pytest |
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
|
|
|
from kiro_proxy.handlers.anthropic import ThinkingStreamProcessor |
|
|
|
|
|
|
|
|
def _collect_events(chunks: list[str]) -> list[dict]: |
|
|
processor = ThinkingStreamProcessor(thinking_enabled=True) |
|
|
events: list[dict] = [] |
|
|
for chunk in chunks: |
|
|
events.extend(processor.process_content(chunk)) |
|
|
events.extend(processor.finalize()) |
|
|
return events |
|
|
|
|
|
|
|
|
def _extract_text(events: list[dict]) -> str: |
|
|
return "".join( |
|
|
e["delta"]["text"] |
|
|
for e in events |
|
|
if e.get("type") == "content_block_delta" |
|
|
and e.get("delta", {}).get("type") == "text_delta" |
|
|
) |
|
|
|
|
|
|
|
|
def _extract_thinking(events: list[dict]) -> str: |
|
|
return "".join( |
|
|
e["delta"]["thinking"] |
|
|
for e in events |
|
|
if e.get("type") == "content_block_delta" |
|
|
and e.get("delta", {}).get("type") == "thinking_delta" |
|
|
) |
|
|
|
|
|
|
|
|
@pytest.mark.parametrize( |
|
|
"chunks,expected_thinking,expected_text", |
|
|
[ |
|
|
|
|
|
(["<thi", "nking>AAA</thinking>BBB"], "AAA", "BBB"), |
|
|
|
|
|
(["<thinking>AAA</think", "ing>BBB"], "AAA", "BBB"), |
|
|
|
|
|
(["<thi", "nking>AAA</thi", "nking>BBB"], "AAA", "BBB"), |
|
|
|
|
|
(["Hello <thi", " there"], "", "Hello <thi there"), |
|
|
|
|
|
(["<thinking>AAA"], "AAA", ""), |
|
|
], |
|
|
) |
|
|
def test_thinking_stream_processor_chunk_splitting(chunks, expected_thinking, expected_text): |
|
|
events = _collect_events(chunks) |
|
|
assert _extract_thinking(events) == expected_thinking |
|
|
assert _extract_text(events) == expected_text |
|
|
|
|
|
|
|
|
text = _extract_text(events) |
|
|
assert "<thinking>" not in text |
|
|
assert "</thinking>" not in text |
|
|
|
|
|
|