Hanrui / sglang /test /registered /parser /test_reasoning_parser.py
Lekr0's picture
Add files using upload-large-folder tool
a402b9b verified
import unittest
from sglang.srt.parser.reasoning_parser import (
BaseReasoningFormatDetector,
DeepSeekR1Detector,
Glm45Detector,
KimiDetector,
KimiK2Detector,
Qwen3Detector,
ReasoningParser,
StreamingParseResult,
)
from sglang.test.ci.ci_register import register_cpu_ci
from sglang.test.test_utils import CustomTestCase
register_cpu_ci(est_time=5, suite="stage-a-cpu-only")
class TestStreamingParseResult(CustomTestCase):
def test_init_default(self):
"""Test default initialization of StreamingParseResult."""
result = StreamingParseResult()
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "")
def test_init_with_values(self):
"""Test initialization with specific values."""
result = StreamingParseResult("normal", "reasoning")
self.assertEqual(result.normal_text, "normal")
self.assertEqual(result.reasoning_text, "reasoning")
class TestBaseReasoningFormatDetector(CustomTestCase):
def setUp(self):
self.detector = BaseReasoningFormatDetector(
think_start_token="<think>",
think_end_token="</think>",
force_reasoning=False,
stream_reasoning=True,
)
def test_init(self):
"""Test initialization of BaseReasoningFormatDetector."""
self.assertEqual(self.detector.think_start_token, "<think>")
self.assertEqual(self.detector.think_end_token, "</think>")
self.assertFalse(self.detector._in_reasoning)
self.assertTrue(self.detector.stream_reasoning)
self.assertEqual(self.detector._buffer, "")
self.assertFalse(self.detector.stripped_think_start)
def test_detect_and_parse_normal_text(self):
"""Test parsing normal text without reasoning."""
text = "This is normal text"
result = self.detector.detect_and_parse(text)
self.assertEqual(result.normal_text, text)
self.assertEqual(result.reasoning_text, "")
def test_detect_and_parse_with_start_token(self):
"""Test parsing text starting with think token."""
text = "<think>This is reasoning"
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "This is reasoning")
self.assertEqual(result.normal_text, "")
def test_detect_and_parse_complete_reasoning(self):
"""Test parsing complete reasoning block."""
text = "<think>This is reasoning</think>This is normal"
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "This is reasoning")
self.assertEqual(result.normal_text, "This is normal")
def test_detect_and_parse_force_reasoning(self):
"""Test forced reasoning mode."""
detector = BaseReasoningFormatDetector(
"<think>", "</think>", force_reasoning=True
)
text = "This should be reasoning"
result = detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "This should be reasoning")
self.assertEqual(result.normal_text, "")
def test_parse_streaming_increment_normal(self):
"""Test streaming parse of normal text."""
result = self.detector.parse_streaming_increment("Hello world")
self.assertEqual(result.normal_text, "Hello world")
self.assertEqual(result.reasoning_text, "")
def test_parse_streaming_increment_partial_token(self):
"""Test streaming parse with partial token."""
# Test partial start token
result = self.detector.parse_streaming_increment("<thi")
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "")
# Reset detector and test partial end token when in reasoning mode
detector = BaseReasoningFormatDetector("<think>", "</think>")
detector._in_reasoning = True
result = detector.parse_streaming_increment("</thi")
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "")
def test_parse_streaming_increment_complete_start(self):
"""Test streaming parse with complete start token."""
result = self.detector.parse_streaming_increment("<think>")
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "")
self.assertTrue(self.detector._in_reasoning)
self.assertTrue(self.detector.stripped_think_start)
def test_parse_streaming_increment_reasoning_content(self):
"""Test streaming parse of reasoning content."""
# First add start token
self.detector.parse_streaming_increment("<think>")
# Then add reasoning content
result = self.detector.parse_streaming_increment("reasoning content")
self.assertEqual(result.reasoning_text, "reasoning content")
self.assertEqual(result.normal_text, "")
def test_parse_streaming_increment_end_token(self):
"""Test streaming parse with end token."""
# Start reasoning mode
self.detector.parse_streaming_increment("<think>")
self.detector.parse_streaming_increment("reasoning")
# End reasoning - the reasoning content accumulated in previous calls is cleared when end token is found
result = self.detector.parse_streaming_increment("</think>normal text")
self.assertEqual(result.reasoning_text, "") # Buffer cleared, returns empty
self.assertEqual(result.normal_text, "normal text")
self.assertFalse(self.detector._in_reasoning)
def test_parse_streaming_increment_no_stream_reasoning(self):
"""Test streaming parse without streaming reasoning."""
detector = BaseReasoningFormatDetector(
"<think>", "</think>", stream_reasoning=False
)
# Start reasoning mode
detector.parse_streaming_increment("<think>")
# Add reasoning content - should not return content
result = detector.parse_streaming_increment("reasoning content")
self.assertEqual(result.reasoning_text, "")
self.assertEqual(result.normal_text, "")
def test_parse_streaming_increment_mixed_content(self):
"""Test streaming parse with mixed content in one chunk."""
result = self.detector.parse_streaming_increment(
"<think>reasoning</think>normal"
)
self.assertEqual(result.reasoning_text, "reasoning")
self.assertEqual(result.normal_text, "normal")
class TestDeepSeekR1Detector(CustomTestCase):
def setUp(self):
self.detector = DeepSeekR1Detector()
def test_init(self):
"""Test DeepSeekR1Detector initialization."""
self.assertEqual(self.detector.think_start_token, "<think>")
self.assertEqual(self.detector.think_end_token, "</think>")
self.assertTrue(self.detector._in_reasoning) # force_reasoning=True
self.assertTrue(self.detector.stream_reasoning)
def test_init_no_stream_reasoning(self):
"""Test DeepSeekR1Detector with stream_reasoning=False."""
detector = DeepSeekR1Detector(stream_reasoning=False)
self.assertFalse(detector.stream_reasoning)
def test_detect_and_parse_r1_format(self):
"""Test parsing DeepSeek-R1 format."""
text = "I need to think about this. The answer is 42."
result = self.detector.detect_and_parse(text)
# Should be treated as reasoning because force_reasoning=True
self.assertEqual(
result.reasoning_text, "I need to think about this. The answer is 42."
)
self.assertEqual(result.normal_text, "")
def test_detect_and_parse_with_end_token(self):
"""Test parsing with end token."""
text = "I think this is the answer</think>The final answer is 42."
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "I think this is the answer")
self.assertEqual(result.normal_text, "The final answer is 42.")
def test_detect_and_parse_with_start_token(self):
"""Test parsing deepseek-ai/DeepSeek-R1-0528 format, which generates the <think> token."""
text = "<think>I need to think about this.</think>The answer is 42."
result = self.detector.detect_and_parse(text)
# Should be treated as reasoning because force_reasoning=True
self.assertEqual(result.reasoning_text, "I need to think about this.")
self.assertEqual(result.normal_text, "The answer is 42.")
class TestQwen3Detector(CustomTestCase):
def setUp(self):
self.detector = Qwen3Detector()
def test_init(self):
"""Test Qwen3Detector initialization."""
self.assertEqual(self.detector.think_start_token, "<think>")
self.assertEqual(self.detector.think_end_token, "</think>")
self.assertFalse(self.detector._in_reasoning) # force_reasoning=False
self.assertTrue(self.detector.stream_reasoning)
def test_detect_and_parse_qwen3_format(self):
"""Test parsing Qwen3 format."""
text = "<think>Let me think about this problem</think>The answer is 42."
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "Let me think about this problem")
self.assertEqual(result.normal_text, "The answer is 42.")
def test_detect_and_parse_without_thinking(self):
"""Test parsing without thinking (enable_thinking=False case)."""
text = "Direct answer without thinking."
result = self.detector.detect_and_parse(text)
self.assertEqual(result.normal_text, text)
self.assertEqual(result.reasoning_text, "")
class TestQwen3ForcedReasoningDetector(CustomTestCase):
def setUp(self):
self.detector = Qwen3Detector(force_reasoning=True)
def test_init(self):
"""Test Qwen3ForcedReasoningDetector initialization."""
self.assertEqual(self.detector.think_start_token, "<think>")
self.assertEqual(self.detector.think_end_token, "</think>")
self.assertTrue(self.detector._in_reasoning) # force_reasoning=True
self.assertTrue(self.detector.stream_reasoning)
def test_detect_and_parse_qwen3_forced_reasoning_format(self):
"""Test parsing Qwen3-ForcedReasoning format (no <think> start tag)."""
text = "I need to think about this step by step.</think>The answer is 42."
result = self.detector.detect_and_parse(text)
self.assertEqual(
result.reasoning_text, "I need to think about this step by step."
)
self.assertEqual(result.normal_text, "The answer is 42.")
def test_detect_and_parse_with_start_token(self):
"""Test parsing Qwen3-ForcedReasoning with optional <think> start tag."""
text = "<think>I need to think about this.</think>The answer is 42."
result = self.detector.detect_and_parse(text)
# Should work because base class logic handles both force_reasoning=True OR start token
self.assertEqual(result.reasoning_text, "I need to think about this.")
self.assertEqual(result.normal_text, "The answer is 42.")
def test_streaming_qwen3_forced_reasoning_format(self):
"""Test streaming parse of Qwen3-ForcedReasoning format."""
# First chunk without <think> start
result = self.detector.parse_streaming_increment("I need to")
self.assertEqual(result.reasoning_text, "I need to")
self.assertEqual(result.normal_text, "")
# More reasoning content
result = self.detector.parse_streaming_increment(" think about this.")
self.assertEqual(result.reasoning_text, " think about this.")
self.assertEqual(result.normal_text, "")
# End token with normal text
result = self.detector.parse_streaming_increment("</think>The answer is 42.")
self.assertEqual(result.reasoning_text, "") # Buffer cleared
self.assertEqual(result.normal_text, "The answer is 42.")
class TestKimiDetector(CustomTestCase):
def setUp(self):
self.detector = KimiDetector()
def test_init(self):
"""Test KimiDetector initialization."""
self.assertEqual(self.detector.think_start_token, "◁think▷")
self.assertEqual(self.detector.think_end_token, "◁/think▷")
self.assertFalse(self.detector._in_reasoning)
self.assertTrue(self.detector.stream_reasoning)
def test_detect_and_parse_kimi_format(self):
"""Test parsing Kimi format."""
text = "◁think▷Let me consider this carefully◁/think▷The answer is 42."
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "Let me consider this carefully")
self.assertEqual(result.normal_text, "The answer is 42.")
def test_detect_and_parse_kimi_no_thinking(self):
"""Test parsing Kimi format without thinking."""
text = "Direct answer without thinking tokens."
result = self.detector.detect_and_parse(text)
self.assertEqual(result.normal_text, text)
self.assertEqual(result.reasoning_text, "")
def test_streaming_kimi_format(self):
"""Test streaming parse of Kimi format."""
# Test partial token
result = self.detector.parse_streaming_increment("◁thi")
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "")
# Complete start token
result = self.detector.parse_streaming_increment("nk▷Start")
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "Start")
self.assertTrue(self.detector._in_reasoning)
# Add reasoning content
result = self.detector.parse_streaming_increment("thinking...")
self.assertEqual(result.reasoning_text, "thinking...")
self.assertEqual(result.normal_text, "")
# End token - reasoning content is cleared when end token is processed
result = self.detector.parse_streaming_increment("◁/think▷answer")
self.assertEqual(result.reasoning_text, "") # Buffer cleared
self.assertEqual(result.normal_text, "answer")
class TestKimiK2Detector(CustomTestCase):
"""Test cases for KimiK2 detector with tool interruption support."""
def setUp(self):
self.detector = KimiK2Detector()
def test_init(self):
"""Test KimiK2Detector initialization."""
self.assertEqual(self.detector.think_start_token, "<think>")
self.assertEqual(self.detector.think_end_token, "</think>")
self.assertEqual(self.detector.tool_start_token, "<|tool_calls_section_begin|>")
self.assertFalse(self.detector._in_reasoning)
self.assertTrue(self.detector.stream_reasoning)
def test_detect_and_parse_tool_interrupt(self):
"""Test parsing with Kimi-K2 tool-section interruption."""
text = "<think>thinking<|tool_calls_section_begin|><|tool_call_begin|>"
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "thinking")
self.assertEqual(
result.normal_text, "<|tool_calls_section_begin|><|tool_call_begin|>"
)
def test_streaming_tool_interrupt(self):
"""Test streaming parse interrupted by tool section."""
self.detector.parse_streaming_increment("<think>")
result1 = self.detector.parse_streaming_increment("reasoning")
self.assertEqual(result1.reasoning_text, "reasoning")
self.assertEqual(result1.normal_text, "")
result2 = self.detector.parse_streaming_increment(
"<|tool_calls_section_begin|>"
)
self.assertEqual(result2.reasoning_text, "")
self.assertEqual(result2.normal_text, "<|tool_calls_section_begin|>")
def test_streaming_after_interrupt_is_normal(self):
"""After interruption, subsequent chunks should be normal text."""
self.detector.parse_streaming_increment("<think>")
self.detector.parse_streaming_increment("reasoning<|tool_calls_section_begin|>")
result = self.detector.parse_streaming_increment("<|tool_call_begin|>")
self.assertEqual(result.reasoning_text, "")
self.assertEqual(result.normal_text, "<|tool_call_begin|>")
class TestGlm45Detector(CustomTestCase):
"""Test cases for GLM45 detector with tool interruption support."""
def setUp(self):
self.detector = Glm45Detector()
def test_init(self):
"""Test Glm45Detector initialization."""
self.assertEqual(self.detector.think_start_token, "<think>")
self.assertEqual(self.detector.think_end_token, "</think>")
self.assertEqual(self.detector.tool_start_token, "<tool_call>")
self.assertFalse(self.detector._in_reasoning)
self.assertTrue(self.detector.stream_reasoning)
def test_detect_and_parse_normal_reasoning(self):
"""Test parsing normal reasoning block without tool interruption."""
text = "<think>Let me think about this step by step</think>The answer is 42."
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "Let me think about this step by step")
self.assertEqual(result.normal_text, "The answer is 42.")
def test_detect_and_parse_tool_interrupt(self):
"""
Test parsing with tool interruption.
GLM45 can interrupt reasoning with tool token (<tool_call>) without closing </think>.
Should split at the first occurrence of tool_start_token using find().
"""
text = "<think>I need to think<tool_call>tool call data"
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "I need to think")
self.assertEqual(result.normal_text, "<tool_call>tool call data")
def test_detect_and_parse_multiple_tool_calls_find(self):
"""
Test that find() finds the FIRST occurrence of tool_start_token.
If multiple tool calls exist in buffer, should split at the first one.
"""
text = "<think>thinking<tool_call>first tool<tool_call>second tool<tool_call>final tool"
result = self.detector.detect_and_parse(text)
# Should split at the first <tool_call>
self.assertEqual(result.reasoning_text, "thinking")
self.assertEqual(
result.normal_text,
"<tool_call>first tool<tool_call>second tool<tool_call>final tool",
)
def test_detect_and_parse_truncated_reasoning(self):
"""
Test truncated reasoning without tool or end tag.
Should return all content as reasoning_text.
"""
text = "<think>This is incomplete"
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "This is incomplete")
self.assertEqual(result.normal_text, "")
def test_detect_and_parse_normal_text_only(self):
"""Test parsing text without reasoning block."""
text = "Just the answer without any reasoning."
result = self.detector.detect_and_parse(text)
self.assertEqual(result.normal_text, text)
self.assertEqual(result.reasoning_text, "")
def test_streaming_normal_flow(self):
"""Test streaming with normal reasoning flow."""
# Start reasoning
result1 = self.detector.parse_streaming_increment("<think>")
self.assertEqual(result1.normal_text, "")
self.assertEqual(result1.reasoning_text, "")
self.assertTrue(self.detector._in_reasoning)
# Reasoning content
result2 = self.detector.parse_streaming_increment("thinking...")
self.assertEqual(result2.normal_text, "")
self.assertEqual(result2.reasoning_text, "thinking...")
# End reasoning
result3 = self.detector.parse_streaming_increment("</think>answer")
self.assertEqual(result3.normal_text, "answer")
self.assertEqual(result3.reasoning_text, "")
self.assertFalse(self.detector._in_reasoning)
def test_streaming_tool_interrupt_split_tokens(self):
"""
Test streaming with tool interruption where tool token is split across chunks.
This tests the buffer prefix logic that prevents partial emission of tool token.
"""
# Start reasoning
self.detector.parse_streaming_increment("<think>")
# Add reasoning
result1 = self.detector.parse_streaming_increment("thinking")
self.assertEqual(result1.reasoning_text, "thinking")
# Send partial tool token (should be buffered, not emitted)
result2 = self.detector.parse_streaming_increment("<tool_call>")
# Tool token is in buffer, causing switch to normal mode
self.assertEqual(result2.reasoning_text, "")
self.assertEqual(result2.normal_text, "<tool_call>")
self.assertFalse(self.detector._in_reasoning)
# Send tool args
result3 = self.detector.parse_streaming_increment("tool args")
self.assertEqual(result3.reasoning_text, "")
self.assertEqual(result3.normal_text, "tool args")
def test_streaming_no_stream_reasoning(self):
"""Test streaming without stream_reasoning enabled."""
detector = Glm45Detector(stream_reasoning=False)
# Start reasoning
detector.parse_streaming_increment("<think>")
# Reasoning content is buffered and not returned yet
result = detector.parse_streaming_increment("thinking")
self.assertEqual(result.reasoning_text, "")
self.assertEqual(result.normal_text, "")
# Tool interruption should still work - flushes buffered reasoning
# Note: buffer preserves original text including <think> tag
result = detector.parse_streaming_increment("<tool_call>tool call")
self.assertEqual(result.reasoning_text, "<think>thinking")
self.assertEqual(result.normal_text, "<tool_call>tool call")
def test_streaming_empty_reasoning_with_tool(self):
"""Test empty reasoning block followed by tool call."""
result1 = self.detector.parse_streaming_increment("<think>")
result2 = self.detector.parse_streaming_increment("<tool_call>tool call")
self.assertEqual(result2.reasoning_text, "")
self.assertEqual(result2.normal_text, "<tool_call>tool call")
def test_forced_reasoning_mode(self):
"""Test GLM45 with force_reasoning=True."""
detector = Glm45Detector(force_reasoning=True)
# Without start token, should still be in reasoning mode
text = "This is reasoning"
result = detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "This is reasoning")
self.assertEqual(result.normal_text, "")
# Tool interruption should work with forced reasoning
text = "More reasoning<tool_call>tool call"
result = detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "More reasoning")
self.assertEqual(result.normal_text, "<tool_call>tool call")
class TestReasoningParser(CustomTestCase):
def test_init_valid_model(self):
"""Test initialization with valid model types."""
parser = ReasoningParser("deepseek-r1")
self.assertIsInstance(parser.detector, DeepSeekR1Detector)
parser = ReasoningParser("qwen3")
self.assertIsInstance(parser.detector, Qwen3Detector)
parser = ReasoningParser("kimi")
self.assertIsInstance(parser.detector, KimiDetector)
parser = ReasoningParser("kimi_k2")
self.assertIsInstance(parser.detector, KimiK2Detector)
parser = ReasoningParser("glm45")
self.assertIsInstance(parser.detector, Glm45Detector)
def test_init_invalid_model(self):
"""Test initialization with invalid model type."""
with self.assertRaises(ValueError) as context:
ReasoningParser("invalid-model")
self.assertIn("Unsupported model type", str(context.exception))
def test_init_no_model(self):
"""Test initialization without model type."""
with self.assertRaises(ValueError) as context:
ReasoningParser(None)
self.assertEqual(str(context.exception), "Model type must be specified")
def test_parse_non_stream(self):
"""Test non-streaming parsing."""
parser = ReasoningParser("qwen3")
reasoning, normal = parser.parse_non_stream(
"<think>Let me think</think>The answer is 42."
)
self.assertEqual(reasoning, "Let me think")
self.assertEqual(normal, "The answer is 42.")
def test_parse_stream_chunk(self):
"""Test streaming chunk parsing."""
parser = ReasoningParser("qwen3")
# First chunk with start token
reasoning, normal = parser.parse_stream_chunk("<think>")
self.assertEqual(reasoning, "")
self.assertEqual(normal, "")
# Second chunk with reasoning content
reasoning, normal = parser.parse_stream_chunk("thinking...")
self.assertEqual(reasoning, "thinking...")
self.assertEqual(normal, "")
# Third chunk with end token and normal text
reasoning, normal = parser.parse_stream_chunk("</think>answer")
self.assertEqual(reasoning, "") # Buffer cleared when end token processed
self.assertEqual(normal, "answer")
def test_case_insensitive_model_type(self):
"""Test case insensitive model type matching."""
parser1 = ReasoningParser("DeepSeek-R1")
parser2 = ReasoningParser("QWEN3")
parser3 = ReasoningParser("Kimi")
self.assertIsInstance(parser1.detector, DeepSeekR1Detector)
self.assertIsInstance(parser2.detector, Qwen3Detector)
self.assertIsInstance(parser3.detector, KimiDetector)
def test_stream_reasoning_parameter(self):
"""Test stream_reasoning parameter is passed correctly."""
parser = ReasoningParser("qwen3", stream_reasoning=False)
self.assertFalse(parser.detector.stream_reasoning)
parser = ReasoningParser("qwen3", stream_reasoning=True)
self.assertTrue(parser.detector.stream_reasoning)
def test_glm45_tool_interruption(self):
"""Test GLM45 tool interruption through ReasoningParser API."""
parser = ReasoningParser("glm45")
# Non-streaming: tool interrupt
reasoning, normal = parser.parse_non_stream(
"<think>thinking<tool_call>tool call"
)
self.assertEqual(reasoning, "thinking")
self.assertEqual(normal, "<tool_call>tool call")
# Streaming: tool interrupt
parser = ReasoningParser("glm45")
chunks = ["<think>", "reasoning", "<tool_call>", "tool args"]
all_reasoning = ""
all_normal = ""
for chunk in chunks:
reasoning, normal = parser.parse_stream_chunk(chunk)
if reasoning:
all_reasoning += reasoning
if normal:
all_normal += normal
self.assertEqual(all_reasoning, "reasoning")
self.assertEqual(all_normal, "<tool_call>tool args")
def test_kimik2_tool_interruption(self):
"""Test Kimi-K2 tool interruption through ReasoningParser API."""
parser = ReasoningParser("kimi_k2")
# Non-streaming: tool interrupt
reasoning, normal = parser.parse_non_stream(
"<think>thinking<|tool_calls_section_begin|><|tool_call_begin|>"
)
self.assertEqual(reasoning, "thinking")
self.assertEqual(normal, "<|tool_calls_section_begin|><|tool_call_begin|>")
# Streaming: tool interrupt
parser = ReasoningParser("kimi_k2")
chunks = [
"<think>",
"reasoning",
"<|tool_calls_section_begin|>",
"<|tool_call_begin|>",
]
all_reasoning = ""
all_normal = ""
for chunk in chunks:
reasoning, normal = parser.parse_stream_chunk(chunk)
if reasoning:
all_reasoning += reasoning
if normal:
all_normal += normal
self.assertEqual(all_reasoning, "reasoning")
self.assertEqual(all_normal, "<|tool_calls_section_begin|><|tool_call_begin|>")
class TestIntegrationScenarios(CustomTestCase):
"""Integration tests for realistic usage scenarios."""
def test_deepseek_r1_complete_response(self):
"""Test complete DeepSeek-R1 response parsing."""
parser = ReasoningParser("deepseek-r1")
text = "I need to solve this step by step. First, I'll analyze the problem. The given equation is x + 2 = 5. To solve for x, I subtract 2 from both sides: x = 5 - 2 = 3.</think>The answer is x = 3."
reasoning, normal = parser.parse_non_stream(text)
self.assertIn("step by step", reasoning)
self.assertIn(
"= 3", reasoning
) # The reasoning contains "x = 5 - 2 = 3" which has "= 3"
self.assertEqual(normal, "The answer is x = 3.")
def test_qwen3_streaming_scenario(self):
"""Test Qwen3 streaming scenario."""
parser = ReasoningParser("qwen3")
chunks = [
"<think>",
"Let me analyze this problem.",
" I need to consider multiple factors.",
"</think>",
"Based on my analysis, the solution is to use a different approach.",
]
all_reasoning = ""
all_normal = ""
for chunk in chunks:
reasoning, normal = parser.parse_stream_chunk(chunk)
all_reasoning += reasoning
all_normal += normal
self.assertIn("analyze", all_reasoning)
self.assertIn("multiple factors", all_reasoning)
self.assertIn("different approach", all_normal)
def test_kimi_streaming_scenario(self):
"""Test Kimi streaming scenario."""
parser = ReasoningParser("kimi")
chunks = [
"◁thi",
"nk▷",
"Let me analyze this problem.",
" I need to consider multiple factors.",
"◁/th",
"ink▷",
"The answer is 42.",
]
all_reasoning = ""
all_normal = ""
for chunk in chunks:
reasoning, normal = parser.parse_stream_chunk(chunk)
all_reasoning += reasoning
all_normal += normal
self.assertIn("analyze", all_reasoning)
self.assertIn("multiple factors", all_reasoning)
self.assertIn("42", all_normal)
def test_empty_reasoning_blocks(self):
"""Test handling of empty reasoning blocks."""
parser = ReasoningParser("qwen3")
text = "<think></think>Just the answer."
reasoning, normal = parser.parse_non_stream(text)
self.assertEqual(reasoning, "")
self.assertEqual(normal, "Just the answer.")
def test_qwen3_forced_reasoning_complete_response(self):
"""Test complete Qwen3-ForcedReasoning response parsing."""
parser = ReasoningParser("qwen3", force_reasoning=True)
text = "Let me solve this step by step. The equation is x + 2 = 5. Subtracting 2 from both sides gives x = 3.</think>The solution is x = 3."
reasoning, normal = parser.parse_non_stream(text)
self.assertIn("step by step", reasoning)
self.assertIn("x = 3", reasoning)
self.assertEqual(normal, "The solution is x = 3.")
def test_qwen3_forced_reasoning_streaming_scenario(self):
"""Test Qwen3-ForcedReasoning streaming scenario."""
parser = ReasoningParser("qwen3", force_reasoning=True)
chunks = [
"I need to analyze",
" this problem carefully.",
" Let me break it down.",
"</think>",
"The final answer is 42.",
]
all_reasoning = ""
all_normal = ""
for chunk in chunks:
reasoning, normal = parser.parse_stream_chunk(chunk)
all_reasoning += reasoning
all_normal += normal
self.assertIn("analyze", all_reasoning)
self.assertIn("break it down", all_reasoning)
self.assertIn("final answer", all_normal)
class TestBufferLossBugFix(CustomTestCase):
"""Test cases for the buffer loss bug fix in parse_streaming_increment."""
def test_partial_end_tag_buffer_loss_bug(self):
"""
Test the bug where partial end tag fragments are lost when followed by normal text.
Bug scenario:
1. _in_reasoning is False
2. new_text is "</" (part of closing thinking tag)
3. Fragment is stored in buffer and empty string is returned
4. Next step: new_text is "answer", _in_reasoning still False
5. Buffer is cleared and "answer" is returned directly
6. The "</" from previous step is lost
This test verifies the fix where line 108 was changed from:
return StreamingParseResult(normal_text=new_text)
to:
return StreamingParseResult(normal_text=current_text)
"""
detector = BaseReasoningFormatDetector("<think>", "</think>")
# Step 1: Send partial end tag when not in reasoning mode
# This should be buffered since it could be start of "</think>"
result1 = detector.parse_streaming_increment("</")
self.assertEqual(result1.normal_text, "")
self.assertEqual(result1.reasoning_text, "")
# Step 2: Send normal text that doesn't complete the end tag
# Before fix: would return only "answer", losing the "</"
# After fix: should return the complete buffered content "</answer"
result2 = detector.parse_streaming_increment("answer")
self.assertEqual(result2.normal_text, "</answer")
self.assertEqual(result2.reasoning_text, "")
def test_partial_start_tag_buffer_preservation(self):
"""
Test that partial start tag fragments are properly preserved.
"""
detector = BaseReasoningFormatDetector("<think>", "</think>")
# Send partial start tag
result1 = detector.parse_streaming_increment("<th")
self.assertEqual(result1.normal_text, "")
self.assertEqual(result1.reasoning_text, "")
# Complete with non-matching text
result2 = detector.parse_streaming_increment("is is text")
self.assertEqual(result2.normal_text, "<this is text")
self.assertEqual(result2.reasoning_text, "")
def test_partial_end_tag_in_reasoning_mode(self):
"""
Test partial end tag handling when already in reasoning mode.
"""
detector = BaseReasoningFormatDetector("<think>", "</think>")
# Enter reasoning mode
detector.parse_streaming_increment("<think>")
detector.parse_streaming_increment("some reasoning")
# Send partial end tag
result1 = detector.parse_streaming_increment("</")
self.assertEqual(result1.normal_text, "")
self.assertEqual(result1.reasoning_text, "")
# Complete the end tag with normal text
result2 = detector.parse_streaming_increment("think>normal text")
self.assertEqual(result2.normal_text, "normal text")
# The reasoning text should be empty since buffer was cleared when end tag was processed
self.assertEqual(result2.reasoning_text, "")
def test_multiple_partial_fragments(self):
"""
Test handling of multiple partial fragments that don't match any tokens.
"""
detector = BaseReasoningFormatDetector("<think>", "</think>")
# Send multiple partial fragments
result1 = detector.parse_streaming_increment("<")
self.assertEqual(result1.normal_text, "")
self.assertEqual(result1.reasoning_text, "")
result2 = detector.parse_streaming_increment("/")
self.assertEqual(result2.normal_text, "")
self.assertEqual(result2.reasoning_text, "")
result3 = detector.parse_streaming_increment("random>")
self.assertEqual(result3.normal_text, "</random>")
self.assertEqual(result3.reasoning_text, "")
def test_edge_case_exact_token_match(self):
"""
Test edge case where buffer content exactly matches a token.
"""
detector = BaseReasoningFormatDetector("<think>", "</think>")
# Build up the exact start token character by character
detector.parse_streaming_increment("<")
detector.parse_streaming_increment("t")
detector.parse_streaming_increment("h")
detector.parse_streaming_increment("i")
detector.parse_streaming_increment("n")
result = detector.parse_streaming_increment("k>")
# Should enter reasoning mode
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "")
self.assertTrue(detector._in_reasoning)
self.assertTrue(detector.stripped_think_start)
if __name__ == "__main__":
unittest.main()