| import unittest |
|
|
| from sglang.srt.parser.reasoning_parser import ( |
| BaseReasoningFormatDetector, |
| DeepSeekR1Detector, |
| Glm45Detector, |
| KimiDetector, |
| KimiK2Detector, |
| Qwen3Detector, |
| ReasoningParser, |
| StreamingParseResult, |
| ) |
| from sglang.test.ci.ci_register import register_cpu_ci |
| from sglang.test.test_utils import CustomTestCase |
|
|
| register_cpu_ci(est_time=5, suite="stage-a-cpu-only") |
|
|
|
|
| class TestStreamingParseResult(CustomTestCase): |
| def test_init_default(self): |
| """Test default initialization of StreamingParseResult.""" |
| result = StreamingParseResult() |
| self.assertEqual(result.normal_text, "") |
| self.assertEqual(result.reasoning_text, "") |
|
|
| def test_init_with_values(self): |
| """Test initialization with specific values.""" |
| result = StreamingParseResult("normal", "reasoning") |
| self.assertEqual(result.normal_text, "normal") |
| self.assertEqual(result.reasoning_text, "reasoning") |
|
|
|
|
| class TestBaseReasoningFormatDetector(CustomTestCase): |
| def setUp(self): |
| self.detector = BaseReasoningFormatDetector( |
| think_start_token="<think>", |
| think_end_token="</think>", |
| force_reasoning=False, |
| stream_reasoning=True, |
| ) |
|
|
| def test_init(self): |
| """Test initialization of BaseReasoningFormatDetector.""" |
| self.assertEqual(self.detector.think_start_token, "<think>") |
| self.assertEqual(self.detector.think_end_token, "</think>") |
| self.assertFalse(self.detector._in_reasoning) |
| self.assertTrue(self.detector.stream_reasoning) |
| self.assertEqual(self.detector._buffer, "") |
| self.assertFalse(self.detector.stripped_think_start) |
|
|
| def test_detect_and_parse_normal_text(self): |
| """Test parsing normal text without reasoning.""" |
| text = "This is normal text" |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.normal_text, text) |
| self.assertEqual(result.reasoning_text, "") |
|
|
| def test_detect_and_parse_with_start_token(self): |
| """Test parsing text starting with think token.""" |
| text = "<think>This is reasoning" |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.reasoning_text, "This is reasoning") |
| self.assertEqual(result.normal_text, "") |
|
|
| def test_detect_and_parse_complete_reasoning(self): |
| """Test parsing complete reasoning block.""" |
| text = "<think>This is reasoning</think>This is normal" |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.reasoning_text, "This is reasoning") |
| self.assertEqual(result.normal_text, "This is normal") |
|
|
| def test_detect_and_parse_force_reasoning(self): |
| """Test forced reasoning mode.""" |
| detector = BaseReasoningFormatDetector( |
| "<think>", "</think>", force_reasoning=True |
| ) |
| text = "This should be reasoning" |
| result = detector.detect_and_parse(text) |
| self.assertEqual(result.reasoning_text, "This should be reasoning") |
| self.assertEqual(result.normal_text, "") |
|
|
| def test_parse_streaming_increment_normal(self): |
| """Test streaming parse of normal text.""" |
| result = self.detector.parse_streaming_increment("Hello world") |
| self.assertEqual(result.normal_text, "Hello world") |
| self.assertEqual(result.reasoning_text, "") |
|
|
| def test_parse_streaming_increment_partial_token(self): |
| """Test streaming parse with partial token.""" |
| |
| result = self.detector.parse_streaming_increment("<thi") |
| self.assertEqual(result.normal_text, "") |
| self.assertEqual(result.reasoning_text, "") |
|
|
| |
| detector = BaseReasoningFormatDetector("<think>", "</think>") |
| detector._in_reasoning = True |
| result = detector.parse_streaming_increment("</thi") |
| self.assertEqual(result.normal_text, "") |
| self.assertEqual(result.reasoning_text, "") |
|
|
| def test_parse_streaming_increment_complete_start(self): |
| """Test streaming parse with complete start token.""" |
| result = self.detector.parse_streaming_increment("<think>") |
| self.assertEqual(result.normal_text, "") |
| self.assertEqual(result.reasoning_text, "") |
| self.assertTrue(self.detector._in_reasoning) |
| self.assertTrue(self.detector.stripped_think_start) |
|
|
| def test_parse_streaming_increment_reasoning_content(self): |
| """Test streaming parse of reasoning content.""" |
| |
| self.detector.parse_streaming_increment("<think>") |
|
|
| |
| result = self.detector.parse_streaming_increment("reasoning content") |
| self.assertEqual(result.reasoning_text, "reasoning content") |
| self.assertEqual(result.normal_text, "") |
|
|
| def test_parse_streaming_increment_end_token(self): |
| """Test streaming parse with end token.""" |
| |
| self.detector.parse_streaming_increment("<think>") |
| self.detector.parse_streaming_increment("reasoning") |
|
|
| |
| result = self.detector.parse_streaming_increment("</think>normal text") |
| self.assertEqual(result.reasoning_text, "") |
| self.assertEqual(result.normal_text, "normal text") |
| self.assertFalse(self.detector._in_reasoning) |
|
|
| def test_parse_streaming_increment_no_stream_reasoning(self): |
| """Test streaming parse without streaming reasoning.""" |
| detector = BaseReasoningFormatDetector( |
| "<think>", "</think>", stream_reasoning=False |
| ) |
|
|
| |
| detector.parse_streaming_increment("<think>") |
|
|
| |
| result = detector.parse_streaming_increment("reasoning content") |
| self.assertEqual(result.reasoning_text, "") |
| self.assertEqual(result.normal_text, "") |
|
|
| def test_parse_streaming_increment_mixed_content(self): |
| """Test streaming parse with mixed content in one chunk.""" |
| result = self.detector.parse_streaming_increment( |
| "<think>reasoning</think>normal" |
| ) |
| self.assertEqual(result.reasoning_text, "reasoning") |
| self.assertEqual(result.normal_text, "normal") |
|
|
|
|
| class TestDeepSeekR1Detector(CustomTestCase): |
| def setUp(self): |
| self.detector = DeepSeekR1Detector() |
|
|
| def test_init(self): |
| """Test DeepSeekR1Detector initialization.""" |
| self.assertEqual(self.detector.think_start_token, "<think>") |
| self.assertEqual(self.detector.think_end_token, "</think>") |
| self.assertTrue(self.detector._in_reasoning) |
| self.assertTrue(self.detector.stream_reasoning) |
|
|
| def test_init_no_stream_reasoning(self): |
| """Test DeepSeekR1Detector with stream_reasoning=False.""" |
| detector = DeepSeekR1Detector(stream_reasoning=False) |
| self.assertFalse(detector.stream_reasoning) |
|
|
| def test_detect_and_parse_r1_format(self): |
| """Test parsing DeepSeek-R1 format.""" |
| text = "I need to think about this. The answer is 42." |
| result = self.detector.detect_and_parse(text) |
| |
| self.assertEqual( |
| result.reasoning_text, "I need to think about this. The answer is 42." |
| ) |
| self.assertEqual(result.normal_text, "") |
|
|
| def test_detect_and_parse_with_end_token(self): |
| """Test parsing with end token.""" |
| text = "I think this is the answer</think>The final answer is 42." |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.reasoning_text, "I think this is the answer") |
| self.assertEqual(result.normal_text, "The final answer is 42.") |
|
|
| def test_detect_and_parse_with_start_token(self): |
| """Test parsing deepseek-ai/DeepSeek-R1-0528 format, which generates the <think> token.""" |
| text = "<think>I need to think about this.</think>The answer is 42." |
| result = self.detector.detect_and_parse(text) |
| |
| self.assertEqual(result.reasoning_text, "I need to think about this.") |
| self.assertEqual(result.normal_text, "The answer is 42.") |
|
|
|
|
| class TestQwen3Detector(CustomTestCase): |
| def setUp(self): |
| self.detector = Qwen3Detector() |
|
|
| def test_init(self): |
| """Test Qwen3Detector initialization.""" |
| self.assertEqual(self.detector.think_start_token, "<think>") |
| self.assertEqual(self.detector.think_end_token, "</think>") |
| self.assertFalse(self.detector._in_reasoning) |
| self.assertTrue(self.detector.stream_reasoning) |
|
|
| def test_detect_and_parse_qwen3_format(self): |
| """Test parsing Qwen3 format.""" |
| text = "<think>Let me think about this problem</think>The answer is 42." |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.reasoning_text, "Let me think about this problem") |
| self.assertEqual(result.normal_text, "The answer is 42.") |
|
|
| def test_detect_and_parse_without_thinking(self): |
| """Test parsing without thinking (enable_thinking=False case).""" |
| text = "Direct answer without thinking." |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.normal_text, text) |
| self.assertEqual(result.reasoning_text, "") |
|
|
|
|
| class TestQwen3ForcedReasoningDetector(CustomTestCase): |
| def setUp(self): |
| self.detector = Qwen3Detector(force_reasoning=True) |
|
|
| def test_init(self): |
| """Test Qwen3ForcedReasoningDetector initialization.""" |
| self.assertEqual(self.detector.think_start_token, "<think>") |
| self.assertEqual(self.detector.think_end_token, "</think>") |
| self.assertTrue(self.detector._in_reasoning) |
| self.assertTrue(self.detector.stream_reasoning) |
|
|
| def test_detect_and_parse_qwen3_forced_reasoning_format(self): |
| """Test parsing Qwen3-ForcedReasoning format (no <think> start tag).""" |
| text = "I need to think about this step by step.</think>The answer is 42." |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual( |
| result.reasoning_text, "I need to think about this step by step." |
| ) |
| self.assertEqual(result.normal_text, "The answer is 42.") |
|
|
| def test_detect_and_parse_with_start_token(self): |
| """Test parsing Qwen3-ForcedReasoning with optional <think> start tag.""" |
| text = "<think>I need to think about this.</think>The answer is 42." |
| result = self.detector.detect_and_parse(text) |
| |
| self.assertEqual(result.reasoning_text, "I need to think about this.") |
| self.assertEqual(result.normal_text, "The answer is 42.") |
|
|
| def test_streaming_qwen3_forced_reasoning_format(self): |
| """Test streaming parse of Qwen3-ForcedReasoning format.""" |
| |
| result = self.detector.parse_streaming_increment("I need to") |
| self.assertEqual(result.reasoning_text, "I need to") |
| self.assertEqual(result.normal_text, "") |
|
|
| |
| result = self.detector.parse_streaming_increment(" think about this.") |
| self.assertEqual(result.reasoning_text, " think about this.") |
| self.assertEqual(result.normal_text, "") |
|
|
| |
| result = self.detector.parse_streaming_increment("</think>The answer is 42.") |
| self.assertEqual(result.reasoning_text, "") |
| self.assertEqual(result.normal_text, "The answer is 42.") |
|
|
|
|
| class TestKimiDetector(CustomTestCase): |
| def setUp(self): |
| self.detector = KimiDetector() |
|
|
| def test_init(self): |
| """Test KimiDetector initialization.""" |
| self.assertEqual(self.detector.think_start_token, "◁think▷") |
| self.assertEqual(self.detector.think_end_token, "◁/think▷") |
| self.assertFalse(self.detector._in_reasoning) |
| self.assertTrue(self.detector.stream_reasoning) |
|
|
| def test_detect_and_parse_kimi_format(self): |
| """Test parsing Kimi format.""" |
| text = "◁think▷Let me consider this carefully◁/think▷The answer is 42." |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.reasoning_text, "Let me consider this carefully") |
| self.assertEqual(result.normal_text, "The answer is 42.") |
|
|
| def test_detect_and_parse_kimi_no_thinking(self): |
| """Test parsing Kimi format without thinking.""" |
| text = "Direct answer without thinking tokens." |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.normal_text, text) |
| self.assertEqual(result.reasoning_text, "") |
|
|
| def test_streaming_kimi_format(self): |
| """Test streaming parse of Kimi format.""" |
| |
| result = self.detector.parse_streaming_increment("◁thi") |
| self.assertEqual(result.normal_text, "") |
| self.assertEqual(result.reasoning_text, "") |
|
|
| |
| result = self.detector.parse_streaming_increment("nk▷Start") |
| self.assertEqual(result.normal_text, "") |
| self.assertEqual(result.reasoning_text, "Start") |
| self.assertTrue(self.detector._in_reasoning) |
|
|
| |
| result = self.detector.parse_streaming_increment("thinking...") |
| self.assertEqual(result.reasoning_text, "thinking...") |
| self.assertEqual(result.normal_text, "") |
|
|
| |
| result = self.detector.parse_streaming_increment("◁/think▷answer") |
| self.assertEqual(result.reasoning_text, "") |
| self.assertEqual(result.normal_text, "answer") |
|
|
|
|
| class TestKimiK2Detector(CustomTestCase): |
| """Test cases for KimiK2 detector with tool interruption support.""" |
|
|
| def setUp(self): |
| self.detector = KimiK2Detector() |
|
|
| def test_init(self): |
| """Test KimiK2Detector initialization.""" |
| self.assertEqual(self.detector.think_start_token, "<think>") |
| self.assertEqual(self.detector.think_end_token, "</think>") |
| self.assertEqual(self.detector.tool_start_token, "<|tool_calls_section_begin|>") |
| self.assertFalse(self.detector._in_reasoning) |
| self.assertTrue(self.detector.stream_reasoning) |
|
|
| def test_detect_and_parse_tool_interrupt(self): |
| """Test parsing with Kimi-K2 tool-section interruption.""" |
| text = "<think>thinking<|tool_calls_section_begin|><|tool_call_begin|>" |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.reasoning_text, "thinking") |
| self.assertEqual( |
| result.normal_text, "<|tool_calls_section_begin|><|tool_call_begin|>" |
| ) |
|
|
| def test_streaming_tool_interrupt(self): |
| """Test streaming parse interrupted by tool section.""" |
| self.detector.parse_streaming_increment("<think>") |
| result1 = self.detector.parse_streaming_increment("reasoning") |
| self.assertEqual(result1.reasoning_text, "reasoning") |
| self.assertEqual(result1.normal_text, "") |
|
|
| result2 = self.detector.parse_streaming_increment( |
| "<|tool_calls_section_begin|>" |
| ) |
| self.assertEqual(result2.reasoning_text, "") |
| self.assertEqual(result2.normal_text, "<|tool_calls_section_begin|>") |
|
|
| def test_streaming_after_interrupt_is_normal(self): |
| """After interruption, subsequent chunks should be normal text.""" |
| self.detector.parse_streaming_increment("<think>") |
| self.detector.parse_streaming_increment("reasoning<|tool_calls_section_begin|>") |
| result = self.detector.parse_streaming_increment("<|tool_call_begin|>") |
| self.assertEqual(result.reasoning_text, "") |
| self.assertEqual(result.normal_text, "<|tool_call_begin|>") |
|
|
|
|
| class TestGlm45Detector(CustomTestCase): |
| """Test cases for GLM45 detector with tool interruption support.""" |
|
|
| def setUp(self): |
| self.detector = Glm45Detector() |
|
|
| def test_init(self): |
| """Test Glm45Detector initialization.""" |
| self.assertEqual(self.detector.think_start_token, "<think>") |
| self.assertEqual(self.detector.think_end_token, "</think>") |
| self.assertEqual(self.detector.tool_start_token, "<tool_call>") |
| self.assertFalse(self.detector._in_reasoning) |
| self.assertTrue(self.detector.stream_reasoning) |
|
|
| def test_detect_and_parse_normal_reasoning(self): |
| """Test parsing normal reasoning block without tool interruption.""" |
| text = "<think>Let me think about this step by step</think>The answer is 42." |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.reasoning_text, "Let me think about this step by step") |
| self.assertEqual(result.normal_text, "The answer is 42.") |
|
|
| def test_detect_and_parse_tool_interrupt(self): |
| """ |
| Test parsing with tool interruption. |
| |
| GLM45 can interrupt reasoning with tool token (<tool_call>) without closing </think>. |
| Should split at the first occurrence of tool_start_token using find(). |
| """ |
| text = "<think>I need to think<tool_call>tool call data" |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.reasoning_text, "I need to think") |
| self.assertEqual(result.normal_text, "<tool_call>tool call data") |
|
|
| def test_detect_and_parse_multiple_tool_calls_find(self): |
| """ |
| Test that find() finds the FIRST occurrence of tool_start_token. |
| |
| If multiple tool calls exist in buffer, should split at the first one. |
| """ |
| text = "<think>thinking<tool_call>first tool<tool_call>second tool<tool_call>final tool" |
| result = self.detector.detect_and_parse(text) |
| |
| self.assertEqual(result.reasoning_text, "thinking") |
| self.assertEqual( |
| result.normal_text, |
| "<tool_call>first tool<tool_call>second tool<tool_call>final tool", |
| ) |
|
|
| def test_detect_and_parse_truncated_reasoning(self): |
| """ |
| Test truncated reasoning without tool or end tag. |
| |
| Should return all content as reasoning_text. |
| """ |
| text = "<think>This is incomplete" |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.reasoning_text, "This is incomplete") |
| self.assertEqual(result.normal_text, "") |
|
|
| def test_detect_and_parse_normal_text_only(self): |
| """Test parsing text without reasoning block.""" |
| text = "Just the answer without any reasoning." |
| result = self.detector.detect_and_parse(text) |
| self.assertEqual(result.normal_text, text) |
| self.assertEqual(result.reasoning_text, "") |
|
|
| def test_streaming_normal_flow(self): |
| """Test streaming with normal reasoning flow.""" |
| |
| result1 = self.detector.parse_streaming_increment("<think>") |
| self.assertEqual(result1.normal_text, "") |
| self.assertEqual(result1.reasoning_text, "") |
| self.assertTrue(self.detector._in_reasoning) |
|
|
| |
| result2 = self.detector.parse_streaming_increment("thinking...") |
| self.assertEqual(result2.normal_text, "") |
| self.assertEqual(result2.reasoning_text, "thinking...") |
|
|
| |
| result3 = self.detector.parse_streaming_increment("</think>answer") |
| self.assertEqual(result3.normal_text, "answer") |
| self.assertEqual(result3.reasoning_text, "") |
| self.assertFalse(self.detector._in_reasoning) |
|
|
| def test_streaming_tool_interrupt_split_tokens(self): |
| """ |
| Test streaming with tool interruption where tool token is split across chunks. |
| |
| This tests the buffer prefix logic that prevents partial emission of tool token. |
| """ |
| |
| self.detector.parse_streaming_increment("<think>") |
|
|
| |
| result1 = self.detector.parse_streaming_increment("thinking") |
| self.assertEqual(result1.reasoning_text, "thinking") |
|
|
| |
| result2 = self.detector.parse_streaming_increment("<tool_call>") |
| |
| self.assertEqual(result2.reasoning_text, "") |
| self.assertEqual(result2.normal_text, "<tool_call>") |
| self.assertFalse(self.detector._in_reasoning) |
|
|
| |
| result3 = self.detector.parse_streaming_increment("tool args") |
| self.assertEqual(result3.reasoning_text, "") |
| self.assertEqual(result3.normal_text, "tool args") |
|
|
| def test_streaming_no_stream_reasoning(self): |
| """Test streaming without stream_reasoning enabled.""" |
| detector = Glm45Detector(stream_reasoning=False) |
|
|
| |
| detector.parse_streaming_increment("<think>") |
|
|
| |
| result = detector.parse_streaming_increment("thinking") |
| self.assertEqual(result.reasoning_text, "") |
| self.assertEqual(result.normal_text, "") |
|
|
| |
| |
| result = detector.parse_streaming_increment("<tool_call>tool call") |
| self.assertEqual(result.reasoning_text, "<think>thinking") |
| self.assertEqual(result.normal_text, "<tool_call>tool call") |
|
|
| def test_streaming_empty_reasoning_with_tool(self): |
| """Test empty reasoning block followed by tool call.""" |
| result1 = self.detector.parse_streaming_increment("<think>") |
| result2 = self.detector.parse_streaming_increment("<tool_call>tool call") |
| self.assertEqual(result2.reasoning_text, "") |
| self.assertEqual(result2.normal_text, "<tool_call>tool call") |
|
|
| def test_forced_reasoning_mode(self): |
| """Test GLM45 with force_reasoning=True.""" |
| detector = Glm45Detector(force_reasoning=True) |
|
|
| |
| text = "This is reasoning" |
| result = detector.detect_and_parse(text) |
| self.assertEqual(result.reasoning_text, "This is reasoning") |
| self.assertEqual(result.normal_text, "") |
|
|
| |
| text = "More reasoning<tool_call>tool call" |
| result = detector.detect_and_parse(text) |
| self.assertEqual(result.reasoning_text, "More reasoning") |
| self.assertEqual(result.normal_text, "<tool_call>tool call") |
|
|
|
|
| class TestReasoningParser(CustomTestCase): |
| def test_init_valid_model(self): |
| """Test initialization with valid model types.""" |
| parser = ReasoningParser("deepseek-r1") |
| self.assertIsInstance(parser.detector, DeepSeekR1Detector) |
|
|
| parser = ReasoningParser("qwen3") |
| self.assertIsInstance(parser.detector, Qwen3Detector) |
|
|
| parser = ReasoningParser("kimi") |
| self.assertIsInstance(parser.detector, KimiDetector) |
|
|
| parser = ReasoningParser("kimi_k2") |
| self.assertIsInstance(parser.detector, KimiK2Detector) |
|
|
| parser = ReasoningParser("glm45") |
| self.assertIsInstance(parser.detector, Glm45Detector) |
|
|
| def test_init_invalid_model(self): |
| """Test initialization with invalid model type.""" |
| with self.assertRaises(ValueError) as context: |
| ReasoningParser("invalid-model") |
| self.assertIn("Unsupported model type", str(context.exception)) |
|
|
| def test_init_no_model(self): |
| """Test initialization without model type.""" |
| with self.assertRaises(ValueError) as context: |
| ReasoningParser(None) |
| self.assertEqual(str(context.exception), "Model type must be specified") |
|
|
| def test_parse_non_stream(self): |
| """Test non-streaming parsing.""" |
| parser = ReasoningParser("qwen3") |
| reasoning, normal = parser.parse_non_stream( |
| "<think>Let me think</think>The answer is 42." |
| ) |
| self.assertEqual(reasoning, "Let me think") |
| self.assertEqual(normal, "The answer is 42.") |
|
|
| def test_parse_stream_chunk(self): |
| """Test streaming chunk parsing.""" |
| parser = ReasoningParser("qwen3") |
|
|
| |
| reasoning, normal = parser.parse_stream_chunk("<think>") |
| self.assertEqual(reasoning, "") |
| self.assertEqual(normal, "") |
|
|
| |
| reasoning, normal = parser.parse_stream_chunk("thinking...") |
| self.assertEqual(reasoning, "thinking...") |
| self.assertEqual(normal, "") |
|
|
| |
| reasoning, normal = parser.parse_stream_chunk("</think>answer") |
| self.assertEqual(reasoning, "") |
| self.assertEqual(normal, "answer") |
|
|
| def test_case_insensitive_model_type(self): |
| """Test case insensitive model type matching.""" |
| parser1 = ReasoningParser("DeepSeek-R1") |
| parser2 = ReasoningParser("QWEN3") |
| parser3 = ReasoningParser("Kimi") |
|
|
| self.assertIsInstance(parser1.detector, DeepSeekR1Detector) |
| self.assertIsInstance(parser2.detector, Qwen3Detector) |
| self.assertIsInstance(parser3.detector, KimiDetector) |
|
|
| def test_stream_reasoning_parameter(self): |
| """Test stream_reasoning parameter is passed correctly.""" |
| parser = ReasoningParser("qwen3", stream_reasoning=False) |
| self.assertFalse(parser.detector.stream_reasoning) |
|
|
| parser = ReasoningParser("qwen3", stream_reasoning=True) |
| self.assertTrue(parser.detector.stream_reasoning) |
|
|
| def test_glm45_tool_interruption(self): |
| """Test GLM45 tool interruption through ReasoningParser API.""" |
| parser = ReasoningParser("glm45") |
|
|
| |
| reasoning, normal = parser.parse_non_stream( |
| "<think>thinking<tool_call>tool call" |
| ) |
| self.assertEqual(reasoning, "thinking") |
| self.assertEqual(normal, "<tool_call>tool call") |
|
|
| |
| parser = ReasoningParser("glm45") |
| chunks = ["<think>", "reasoning", "<tool_call>", "tool args"] |
| all_reasoning = "" |
| all_normal = "" |
| for chunk in chunks: |
| reasoning, normal = parser.parse_stream_chunk(chunk) |
| if reasoning: |
| all_reasoning += reasoning |
| if normal: |
| all_normal += normal |
|
|
| self.assertEqual(all_reasoning, "reasoning") |
| self.assertEqual(all_normal, "<tool_call>tool args") |
|
|
| def test_kimik2_tool_interruption(self): |
| """Test Kimi-K2 tool interruption through ReasoningParser API.""" |
| parser = ReasoningParser("kimi_k2") |
|
|
| |
| reasoning, normal = parser.parse_non_stream( |
| "<think>thinking<|tool_calls_section_begin|><|tool_call_begin|>" |
| ) |
| self.assertEqual(reasoning, "thinking") |
| self.assertEqual(normal, "<|tool_calls_section_begin|><|tool_call_begin|>") |
|
|
| |
| parser = ReasoningParser("kimi_k2") |
| chunks = [ |
| "<think>", |
| "reasoning", |
| "<|tool_calls_section_begin|>", |
| "<|tool_call_begin|>", |
| ] |
| all_reasoning = "" |
| all_normal = "" |
| for chunk in chunks: |
| reasoning, normal = parser.parse_stream_chunk(chunk) |
| if reasoning: |
| all_reasoning += reasoning |
| if normal: |
| all_normal += normal |
|
|
| self.assertEqual(all_reasoning, "reasoning") |
| self.assertEqual(all_normal, "<|tool_calls_section_begin|><|tool_call_begin|>") |
|
|
|
|
| class TestIntegrationScenarios(CustomTestCase): |
| """Integration tests for realistic usage scenarios.""" |
|
|
| def test_deepseek_r1_complete_response(self): |
| """Test complete DeepSeek-R1 response parsing.""" |
| parser = ReasoningParser("deepseek-r1") |
| text = "I need to solve this step by step. First, I'll analyze the problem. The given equation is x + 2 = 5. To solve for x, I subtract 2 from both sides: x = 5 - 2 = 3.</think>The answer is x = 3." |
|
|
| reasoning, normal = parser.parse_non_stream(text) |
| self.assertIn("step by step", reasoning) |
| self.assertIn( |
| "= 3", reasoning |
| ) |
| self.assertEqual(normal, "The answer is x = 3.") |
|
|
| def test_qwen3_streaming_scenario(self): |
| """Test Qwen3 streaming scenario.""" |
| parser = ReasoningParser("qwen3") |
|
|
| chunks = [ |
| "<think>", |
| "Let me analyze this problem.", |
| " I need to consider multiple factors.", |
| "</think>", |
| "Based on my analysis, the solution is to use a different approach.", |
| ] |
|
|
| all_reasoning = "" |
| all_normal = "" |
|
|
| for chunk in chunks: |
| reasoning, normal = parser.parse_stream_chunk(chunk) |
| all_reasoning += reasoning |
| all_normal += normal |
|
|
| self.assertIn("analyze", all_reasoning) |
| self.assertIn("multiple factors", all_reasoning) |
| self.assertIn("different approach", all_normal) |
|
|
| def test_kimi_streaming_scenario(self): |
| """Test Kimi streaming scenario.""" |
| parser = ReasoningParser("kimi") |
| chunks = [ |
| "◁thi", |
| "nk▷", |
| "Let me analyze this problem.", |
| " I need to consider multiple factors.", |
| "◁/th", |
| "ink▷", |
| "The answer is 42.", |
| ] |
| all_reasoning = "" |
| all_normal = "" |
| for chunk in chunks: |
| reasoning, normal = parser.parse_stream_chunk(chunk) |
| all_reasoning += reasoning |
| all_normal += normal |
|
|
| self.assertIn("analyze", all_reasoning) |
| self.assertIn("multiple factors", all_reasoning) |
| self.assertIn("42", all_normal) |
|
|
| def test_empty_reasoning_blocks(self): |
| """Test handling of empty reasoning blocks.""" |
| parser = ReasoningParser("qwen3") |
| text = "<think></think>Just the answer." |
|
|
| reasoning, normal = parser.parse_non_stream(text) |
| self.assertEqual(reasoning, "") |
| self.assertEqual(normal, "Just the answer.") |
|
|
| def test_qwen3_forced_reasoning_complete_response(self): |
| """Test complete Qwen3-ForcedReasoning response parsing.""" |
| parser = ReasoningParser("qwen3", force_reasoning=True) |
| text = "Let me solve this step by step. The equation is x + 2 = 5. Subtracting 2 from both sides gives x = 3.</think>The solution is x = 3." |
|
|
| reasoning, normal = parser.parse_non_stream(text) |
| self.assertIn("step by step", reasoning) |
| self.assertIn("x = 3", reasoning) |
| self.assertEqual(normal, "The solution is x = 3.") |
|
|
| def test_qwen3_forced_reasoning_streaming_scenario(self): |
| """Test Qwen3-ForcedReasoning streaming scenario.""" |
| parser = ReasoningParser("qwen3", force_reasoning=True) |
|
|
| chunks = [ |
| "I need to analyze", |
| " this problem carefully.", |
| " Let me break it down.", |
| "</think>", |
| "The final answer is 42.", |
| ] |
|
|
| all_reasoning = "" |
| all_normal = "" |
|
|
| for chunk in chunks: |
| reasoning, normal = parser.parse_stream_chunk(chunk) |
| all_reasoning += reasoning |
| all_normal += normal |
|
|
| self.assertIn("analyze", all_reasoning) |
| self.assertIn("break it down", all_reasoning) |
| self.assertIn("final answer", all_normal) |
|
|
|
|
| class TestBufferLossBugFix(CustomTestCase): |
| """Test cases for the buffer loss bug fix in parse_streaming_increment.""" |
|
|
| def test_partial_end_tag_buffer_loss_bug(self): |
| """ |
| Test the bug where partial end tag fragments are lost when followed by normal text. |
| |
| Bug scenario: |
| 1. _in_reasoning is False |
| 2. new_text is "</" (part of closing thinking tag) |
| 3. Fragment is stored in buffer and empty string is returned |
| 4. Next step: new_text is "answer", _in_reasoning still False |
| 5. Buffer is cleared and "answer" is returned directly |
| 6. The "</" from previous step is lost |
| |
| This test verifies the fix where line 108 was changed from: |
| return StreamingParseResult(normal_text=new_text) |
| to: |
| return StreamingParseResult(normal_text=current_text) |
| """ |
| detector = BaseReasoningFormatDetector("<think>", "</think>") |
|
|
| |
| |
| result1 = detector.parse_streaming_increment("</") |
| self.assertEqual(result1.normal_text, "") |
| self.assertEqual(result1.reasoning_text, "") |
|
|
| |
| |
| |
| result2 = detector.parse_streaming_increment("answer") |
| self.assertEqual(result2.normal_text, "</answer") |
| self.assertEqual(result2.reasoning_text, "") |
|
|
| def test_partial_start_tag_buffer_preservation(self): |
| """ |
| Test that partial start tag fragments are properly preserved. |
| """ |
| detector = BaseReasoningFormatDetector("<think>", "</think>") |
|
|
| |
| result1 = detector.parse_streaming_increment("<th") |
| self.assertEqual(result1.normal_text, "") |
| self.assertEqual(result1.reasoning_text, "") |
|
|
| |
| result2 = detector.parse_streaming_increment("is is text") |
| self.assertEqual(result2.normal_text, "<this is text") |
| self.assertEqual(result2.reasoning_text, "") |
|
|
| def test_partial_end_tag_in_reasoning_mode(self): |
| """ |
| Test partial end tag handling when already in reasoning mode. |
| """ |
| detector = BaseReasoningFormatDetector("<think>", "</think>") |
|
|
| |
| detector.parse_streaming_increment("<think>") |
| detector.parse_streaming_increment("some reasoning") |
|
|
| |
| result1 = detector.parse_streaming_increment("</") |
| self.assertEqual(result1.normal_text, "") |
| self.assertEqual(result1.reasoning_text, "") |
|
|
| |
| result2 = detector.parse_streaming_increment("think>normal text") |
| self.assertEqual(result2.normal_text, "normal text") |
| |
| self.assertEqual(result2.reasoning_text, "") |
|
|
| def test_multiple_partial_fragments(self): |
| """ |
| Test handling of multiple partial fragments that don't match any tokens. |
| """ |
| detector = BaseReasoningFormatDetector("<think>", "</think>") |
|
|
| |
| result1 = detector.parse_streaming_increment("<") |
| self.assertEqual(result1.normal_text, "") |
| self.assertEqual(result1.reasoning_text, "") |
|
|
| result2 = detector.parse_streaming_increment("/") |
| self.assertEqual(result2.normal_text, "") |
| self.assertEqual(result2.reasoning_text, "") |
|
|
| result3 = detector.parse_streaming_increment("random>") |
| self.assertEqual(result3.normal_text, "</random>") |
| self.assertEqual(result3.reasoning_text, "") |
|
|
| def test_edge_case_exact_token_match(self): |
| """ |
| Test edge case where buffer content exactly matches a token. |
| """ |
| detector = BaseReasoningFormatDetector("<think>", "</think>") |
|
|
| |
| detector.parse_streaming_increment("<") |
| detector.parse_streaming_increment("t") |
| detector.parse_streaming_increment("h") |
| detector.parse_streaming_increment("i") |
| detector.parse_streaming_increment("n") |
| result = detector.parse_streaming_increment("k>") |
|
|
| |
| self.assertEqual(result.normal_text, "") |
| self.assertEqual(result.reasoning_text, "") |
| self.assertTrue(detector._in_reasoning) |
| self.assertTrue(detector.stripped_think_start) |
|
|
|
|
| if __name__ == "__main__": |
| unittest.main() |
|
|