import unittest from sglang.srt.parser.reasoning_parser import ( BaseReasoningFormatDetector, DeepSeekR1Detector, Glm45Detector, KimiDetector, KimiK2Detector, Qwen3Detector, ReasoningParser, StreamingParseResult, ) from sglang.test.ci.ci_register import register_cpu_ci from sglang.test.test_utils import CustomTestCase register_cpu_ci(est_time=5, suite="stage-a-cpu-only") class TestStreamingParseResult(CustomTestCase): def test_init_default(self): """Test default initialization of StreamingParseResult.""" result = StreamingParseResult() self.assertEqual(result.normal_text, "") self.assertEqual(result.reasoning_text, "") def test_init_with_values(self): """Test initialization with specific values.""" result = StreamingParseResult("normal", "reasoning") self.assertEqual(result.normal_text, "normal") self.assertEqual(result.reasoning_text, "reasoning") class TestBaseReasoningFormatDetector(CustomTestCase): def setUp(self): self.detector = BaseReasoningFormatDetector( think_start_token="", think_end_token="", force_reasoning=False, stream_reasoning=True, ) def test_init(self): """Test initialization of BaseReasoningFormatDetector.""" self.assertEqual(self.detector.think_start_token, "") self.assertEqual(self.detector.think_end_token, "") self.assertFalse(self.detector._in_reasoning) self.assertTrue(self.detector.stream_reasoning) self.assertEqual(self.detector._buffer, "") self.assertFalse(self.detector.stripped_think_start) def test_detect_and_parse_normal_text(self): """Test parsing normal text without reasoning.""" text = "This is normal text" result = self.detector.detect_and_parse(text) self.assertEqual(result.normal_text, text) self.assertEqual(result.reasoning_text, "") def test_detect_and_parse_with_start_token(self): """Test parsing text starting with think token.""" text = "This is reasoning" result = self.detector.detect_and_parse(text) self.assertEqual(result.reasoning_text, "This is reasoning") self.assertEqual(result.normal_text, "") def test_detect_and_parse_complete_reasoning(self): """Test parsing complete reasoning block.""" text = "This is reasoningThis is normal" result = self.detector.detect_and_parse(text) self.assertEqual(result.reasoning_text, "This is reasoning") self.assertEqual(result.normal_text, "This is normal") def test_detect_and_parse_force_reasoning(self): """Test forced reasoning mode.""" detector = BaseReasoningFormatDetector( "", "", force_reasoning=True ) text = "This should be reasoning" result = detector.detect_and_parse(text) self.assertEqual(result.reasoning_text, "This should be reasoning") self.assertEqual(result.normal_text, "") def test_parse_streaming_increment_normal(self): """Test streaming parse of normal text.""" result = self.detector.parse_streaming_increment("Hello world") self.assertEqual(result.normal_text, "Hello world") self.assertEqual(result.reasoning_text, "") def test_parse_streaming_increment_partial_token(self): """Test streaming parse with partial token.""" # Test partial start token result = self.detector.parse_streaming_increment("", "") detector._in_reasoning = True result = detector.parse_streaming_increment("") self.assertEqual(result.normal_text, "") self.assertEqual(result.reasoning_text, "") self.assertTrue(self.detector._in_reasoning) self.assertTrue(self.detector.stripped_think_start) def test_parse_streaming_increment_reasoning_content(self): """Test streaming parse of reasoning content.""" # First add start token self.detector.parse_streaming_increment("") # Then add reasoning content result = self.detector.parse_streaming_increment("reasoning content") self.assertEqual(result.reasoning_text, "reasoning content") self.assertEqual(result.normal_text, "") def test_parse_streaming_increment_end_token(self): """Test streaming parse with end token.""" # Start reasoning mode self.detector.parse_streaming_increment("") self.detector.parse_streaming_increment("reasoning") # End reasoning - the reasoning content accumulated in previous calls is cleared when end token is found result = self.detector.parse_streaming_increment("normal text") self.assertEqual(result.reasoning_text, "") # Buffer cleared, returns empty self.assertEqual(result.normal_text, "normal text") self.assertFalse(self.detector._in_reasoning) def test_parse_streaming_increment_no_stream_reasoning(self): """Test streaming parse without streaming reasoning.""" detector = BaseReasoningFormatDetector( "", "", stream_reasoning=False ) # Start reasoning mode detector.parse_streaming_increment("") # Add reasoning content - should not return content result = detector.parse_streaming_increment("reasoning content") self.assertEqual(result.reasoning_text, "") self.assertEqual(result.normal_text, "") def test_parse_streaming_increment_mixed_content(self): """Test streaming parse with mixed content in one chunk.""" result = self.detector.parse_streaming_increment( "reasoningnormal" ) self.assertEqual(result.reasoning_text, "reasoning") self.assertEqual(result.normal_text, "normal") class TestDeepSeekR1Detector(CustomTestCase): def setUp(self): self.detector = DeepSeekR1Detector() def test_init(self): """Test DeepSeekR1Detector initialization.""" self.assertEqual(self.detector.think_start_token, "") self.assertEqual(self.detector.think_end_token, "") self.assertTrue(self.detector._in_reasoning) # force_reasoning=True self.assertTrue(self.detector.stream_reasoning) def test_init_no_stream_reasoning(self): """Test DeepSeekR1Detector with stream_reasoning=False.""" detector = DeepSeekR1Detector(stream_reasoning=False) self.assertFalse(detector.stream_reasoning) def test_detect_and_parse_r1_format(self): """Test parsing DeepSeek-R1 format.""" text = "I need to think about this. The answer is 42." result = self.detector.detect_and_parse(text) # Should be treated as reasoning because force_reasoning=True self.assertEqual( result.reasoning_text, "I need to think about this. The answer is 42." ) self.assertEqual(result.normal_text, "") def test_detect_and_parse_with_end_token(self): """Test parsing with end token.""" text = "I think this is the answerThe final answer is 42." result = self.detector.detect_and_parse(text) self.assertEqual(result.reasoning_text, "I think this is the answer") self.assertEqual(result.normal_text, "The final answer is 42.") def test_detect_and_parse_with_start_token(self): """Test parsing deepseek-ai/DeepSeek-R1-0528 format, which generates the token.""" text = "I need to think about this.The answer is 42." result = self.detector.detect_and_parse(text) # Should be treated as reasoning because force_reasoning=True self.assertEqual(result.reasoning_text, "I need to think about this.") self.assertEqual(result.normal_text, "The answer is 42.") class TestQwen3Detector(CustomTestCase): def setUp(self): self.detector = Qwen3Detector() def test_init(self): """Test Qwen3Detector initialization.""" self.assertEqual(self.detector.think_start_token, "") self.assertEqual(self.detector.think_end_token, "") self.assertFalse(self.detector._in_reasoning) # force_reasoning=False self.assertTrue(self.detector.stream_reasoning) def test_detect_and_parse_qwen3_format(self): """Test parsing Qwen3 format.""" text = "Let me think about this problemThe answer is 42." result = self.detector.detect_and_parse(text) self.assertEqual(result.reasoning_text, "Let me think about this problem") self.assertEqual(result.normal_text, "The answer is 42.") def test_detect_and_parse_without_thinking(self): """Test parsing without thinking (enable_thinking=False case).""" text = "Direct answer without thinking." result = self.detector.detect_and_parse(text) self.assertEqual(result.normal_text, text) self.assertEqual(result.reasoning_text, "") class TestQwen3ForcedReasoningDetector(CustomTestCase): def setUp(self): self.detector = Qwen3Detector(force_reasoning=True) def test_init(self): """Test Qwen3ForcedReasoningDetector initialization.""" self.assertEqual(self.detector.think_start_token, "") self.assertEqual(self.detector.think_end_token, "") self.assertTrue(self.detector._in_reasoning) # force_reasoning=True self.assertTrue(self.detector.stream_reasoning) def test_detect_and_parse_qwen3_forced_reasoning_format(self): """Test parsing Qwen3-ForcedReasoning format (no start tag).""" text = "I need to think about this step by step.The answer is 42." result = self.detector.detect_and_parse(text) self.assertEqual( result.reasoning_text, "I need to think about this step by step." ) self.assertEqual(result.normal_text, "The answer is 42.") def test_detect_and_parse_with_start_token(self): """Test parsing Qwen3-ForcedReasoning with optional start tag.""" text = "I need to think about this.The answer is 42." result = self.detector.detect_and_parse(text) # Should work because base class logic handles both force_reasoning=True OR start token self.assertEqual(result.reasoning_text, "I need to think about this.") self.assertEqual(result.normal_text, "The answer is 42.") def test_streaming_qwen3_forced_reasoning_format(self): """Test streaming parse of Qwen3-ForcedReasoning format.""" # First chunk without start result = self.detector.parse_streaming_increment("I need to") self.assertEqual(result.reasoning_text, "I need to") self.assertEqual(result.normal_text, "") # More reasoning content result = self.detector.parse_streaming_increment(" think about this.") self.assertEqual(result.reasoning_text, " think about this.") self.assertEqual(result.normal_text, "") # End token with normal text result = self.detector.parse_streaming_increment("The answer is 42.") self.assertEqual(result.reasoning_text, "") # Buffer cleared self.assertEqual(result.normal_text, "The answer is 42.") class TestKimiDetector(CustomTestCase): def setUp(self): self.detector = KimiDetector() def test_init(self): """Test KimiDetector initialization.""" self.assertEqual(self.detector.think_start_token, "◁think▷") self.assertEqual(self.detector.think_end_token, "◁/think▷") self.assertFalse(self.detector._in_reasoning) self.assertTrue(self.detector.stream_reasoning) def test_detect_and_parse_kimi_format(self): """Test parsing Kimi format.""" text = "◁think▷Let me consider this carefully◁/think▷The answer is 42." result = self.detector.detect_and_parse(text) self.assertEqual(result.reasoning_text, "Let me consider this carefully") self.assertEqual(result.normal_text, "The answer is 42.") def test_detect_and_parse_kimi_no_thinking(self): """Test parsing Kimi format without thinking.""" text = "Direct answer without thinking tokens." result = self.detector.detect_and_parse(text) self.assertEqual(result.normal_text, text) self.assertEqual(result.reasoning_text, "") def test_streaming_kimi_format(self): """Test streaming parse of Kimi format.""" # Test partial token result = self.detector.parse_streaming_increment("◁thi") self.assertEqual(result.normal_text, "") self.assertEqual(result.reasoning_text, "") # Complete start token result = self.detector.parse_streaming_increment("nk▷Start") self.assertEqual(result.normal_text, "") self.assertEqual(result.reasoning_text, "Start") self.assertTrue(self.detector._in_reasoning) # Add reasoning content result = self.detector.parse_streaming_increment("thinking...") self.assertEqual(result.reasoning_text, "thinking...") self.assertEqual(result.normal_text, "") # End token - reasoning content is cleared when end token is processed result = self.detector.parse_streaming_increment("◁/think▷answer") self.assertEqual(result.reasoning_text, "") # Buffer cleared self.assertEqual(result.normal_text, "answer") class TestKimiK2Detector(CustomTestCase): """Test cases for KimiK2 detector with tool interruption support.""" def setUp(self): self.detector = KimiK2Detector() def test_init(self): """Test KimiK2Detector initialization.""" self.assertEqual(self.detector.think_start_token, "") self.assertEqual(self.detector.think_end_token, "") self.assertEqual(self.detector.tool_start_token, "<|tool_calls_section_begin|>") self.assertFalse(self.detector._in_reasoning) self.assertTrue(self.detector.stream_reasoning) def test_detect_and_parse_tool_interrupt(self): """Test parsing with Kimi-K2 tool-section interruption.""" text = "thinking<|tool_calls_section_begin|><|tool_call_begin|>" result = self.detector.detect_and_parse(text) self.assertEqual(result.reasoning_text, "thinking") self.assertEqual( result.normal_text, "<|tool_calls_section_begin|><|tool_call_begin|>" ) def test_streaming_tool_interrupt(self): """Test streaming parse interrupted by tool section.""" self.detector.parse_streaming_increment("") result1 = self.detector.parse_streaming_increment("reasoning") self.assertEqual(result1.reasoning_text, "reasoning") self.assertEqual(result1.normal_text, "") result2 = self.detector.parse_streaming_increment( "<|tool_calls_section_begin|>" ) self.assertEqual(result2.reasoning_text, "") self.assertEqual(result2.normal_text, "<|tool_calls_section_begin|>") def test_streaming_after_interrupt_is_normal(self): """After interruption, subsequent chunks should be normal text.""" self.detector.parse_streaming_increment("") self.detector.parse_streaming_increment("reasoning<|tool_calls_section_begin|>") result = self.detector.parse_streaming_increment("<|tool_call_begin|>") self.assertEqual(result.reasoning_text, "") self.assertEqual(result.normal_text, "<|tool_call_begin|>") class TestGlm45Detector(CustomTestCase): """Test cases for GLM45 detector with tool interruption support.""" def setUp(self): self.detector = Glm45Detector() def test_init(self): """Test Glm45Detector initialization.""" self.assertEqual(self.detector.think_start_token, "") self.assertEqual(self.detector.think_end_token, "") self.assertEqual(self.detector.tool_start_token, "") self.assertFalse(self.detector._in_reasoning) self.assertTrue(self.detector.stream_reasoning) def test_detect_and_parse_normal_reasoning(self): """Test parsing normal reasoning block without tool interruption.""" text = "Let me think about this step by stepThe answer is 42." result = self.detector.detect_and_parse(text) self.assertEqual(result.reasoning_text, "Let me think about this step by step") self.assertEqual(result.normal_text, "The answer is 42.") def test_detect_and_parse_tool_interrupt(self): """ Test parsing with tool interruption. GLM45 can interrupt reasoning with tool token () without closing . Should split at the first occurrence of tool_start_token using find(). """ text = "I need to thinktool call data" result = self.detector.detect_and_parse(text) self.assertEqual(result.reasoning_text, "I need to think") self.assertEqual(result.normal_text, "tool call data") def test_detect_and_parse_multiple_tool_calls_find(self): """ Test that find() finds the FIRST occurrence of tool_start_token. If multiple tool calls exist in buffer, should split at the first one. """ text = "thinkingfirst toolsecond toolfinal tool" result = self.detector.detect_and_parse(text) # Should split at the first self.assertEqual(result.reasoning_text, "thinking") self.assertEqual( result.normal_text, "first toolsecond toolfinal tool", ) def test_detect_and_parse_truncated_reasoning(self): """ Test truncated reasoning without tool or end tag. Should return all content as reasoning_text. """ text = "This is incomplete" result = self.detector.detect_and_parse(text) self.assertEqual(result.reasoning_text, "This is incomplete") self.assertEqual(result.normal_text, "") def test_detect_and_parse_normal_text_only(self): """Test parsing text without reasoning block.""" text = "Just the answer without any reasoning." result = self.detector.detect_and_parse(text) self.assertEqual(result.normal_text, text) self.assertEqual(result.reasoning_text, "") def test_streaming_normal_flow(self): """Test streaming with normal reasoning flow.""" # Start reasoning result1 = self.detector.parse_streaming_increment("") self.assertEqual(result1.normal_text, "") self.assertEqual(result1.reasoning_text, "") self.assertTrue(self.detector._in_reasoning) # Reasoning content result2 = self.detector.parse_streaming_increment("thinking...") self.assertEqual(result2.normal_text, "") self.assertEqual(result2.reasoning_text, "thinking...") # End reasoning result3 = self.detector.parse_streaming_increment("answer") self.assertEqual(result3.normal_text, "answer") self.assertEqual(result3.reasoning_text, "") self.assertFalse(self.detector._in_reasoning) def test_streaming_tool_interrupt_split_tokens(self): """ Test streaming with tool interruption where tool token is split across chunks. This tests the buffer prefix logic that prevents partial emission of tool token. """ # Start reasoning self.detector.parse_streaming_increment("") # Add reasoning result1 = self.detector.parse_streaming_increment("thinking") self.assertEqual(result1.reasoning_text, "thinking") # Send partial tool token (should be buffered, not emitted) result2 = self.detector.parse_streaming_increment("") # Tool token is in buffer, causing switch to normal mode self.assertEqual(result2.reasoning_text, "") self.assertEqual(result2.normal_text, "") self.assertFalse(self.detector._in_reasoning) # Send tool args result3 = self.detector.parse_streaming_increment("tool args") self.assertEqual(result3.reasoning_text, "") self.assertEqual(result3.normal_text, "tool args") def test_streaming_no_stream_reasoning(self): """Test streaming without stream_reasoning enabled.""" detector = Glm45Detector(stream_reasoning=False) # Start reasoning detector.parse_streaming_increment("") # Reasoning content is buffered and not returned yet result = detector.parse_streaming_increment("thinking") self.assertEqual(result.reasoning_text, "") self.assertEqual(result.normal_text, "") # Tool interruption should still work - flushes buffered reasoning # Note: buffer preserves original text including tag result = detector.parse_streaming_increment("tool call") self.assertEqual(result.reasoning_text, "thinking") self.assertEqual(result.normal_text, "tool call") def test_streaming_empty_reasoning_with_tool(self): """Test empty reasoning block followed by tool call.""" result1 = self.detector.parse_streaming_increment("") result2 = self.detector.parse_streaming_increment("tool call") self.assertEqual(result2.reasoning_text, "") self.assertEqual(result2.normal_text, "tool call") def test_forced_reasoning_mode(self): """Test GLM45 with force_reasoning=True.""" detector = Glm45Detector(force_reasoning=True) # Without start token, should still be in reasoning mode text = "This is reasoning" result = detector.detect_and_parse(text) self.assertEqual(result.reasoning_text, "This is reasoning") self.assertEqual(result.normal_text, "") # Tool interruption should work with forced reasoning text = "More reasoningtool call" result = detector.detect_and_parse(text) self.assertEqual(result.reasoning_text, "More reasoning") self.assertEqual(result.normal_text, "tool call") class TestReasoningParser(CustomTestCase): def test_init_valid_model(self): """Test initialization with valid model types.""" parser = ReasoningParser("deepseek-r1") self.assertIsInstance(parser.detector, DeepSeekR1Detector) parser = ReasoningParser("qwen3") self.assertIsInstance(parser.detector, Qwen3Detector) parser = ReasoningParser("kimi") self.assertIsInstance(parser.detector, KimiDetector) parser = ReasoningParser("kimi_k2") self.assertIsInstance(parser.detector, KimiK2Detector) parser = ReasoningParser("glm45") self.assertIsInstance(parser.detector, Glm45Detector) def test_init_invalid_model(self): """Test initialization with invalid model type.""" with self.assertRaises(ValueError) as context: ReasoningParser("invalid-model") self.assertIn("Unsupported model type", str(context.exception)) def test_init_no_model(self): """Test initialization without model type.""" with self.assertRaises(ValueError) as context: ReasoningParser(None) self.assertEqual(str(context.exception), "Model type must be specified") def test_parse_non_stream(self): """Test non-streaming parsing.""" parser = ReasoningParser("qwen3") reasoning, normal = parser.parse_non_stream( "Let me thinkThe answer is 42." ) self.assertEqual(reasoning, "Let me think") self.assertEqual(normal, "The answer is 42.") def test_parse_stream_chunk(self): """Test streaming chunk parsing.""" parser = ReasoningParser("qwen3") # First chunk with start token reasoning, normal = parser.parse_stream_chunk("") self.assertEqual(reasoning, "") self.assertEqual(normal, "") # Second chunk with reasoning content reasoning, normal = parser.parse_stream_chunk("thinking...") self.assertEqual(reasoning, "thinking...") self.assertEqual(normal, "") # Third chunk with end token and normal text reasoning, normal = parser.parse_stream_chunk("answer") self.assertEqual(reasoning, "") # Buffer cleared when end token processed self.assertEqual(normal, "answer") def test_case_insensitive_model_type(self): """Test case insensitive model type matching.""" parser1 = ReasoningParser("DeepSeek-R1") parser2 = ReasoningParser("QWEN3") parser3 = ReasoningParser("Kimi") self.assertIsInstance(parser1.detector, DeepSeekR1Detector) self.assertIsInstance(parser2.detector, Qwen3Detector) self.assertIsInstance(parser3.detector, KimiDetector) def test_stream_reasoning_parameter(self): """Test stream_reasoning parameter is passed correctly.""" parser = ReasoningParser("qwen3", stream_reasoning=False) self.assertFalse(parser.detector.stream_reasoning) parser = ReasoningParser("qwen3", stream_reasoning=True) self.assertTrue(parser.detector.stream_reasoning) def test_glm45_tool_interruption(self): """Test GLM45 tool interruption through ReasoningParser API.""" parser = ReasoningParser("glm45") # Non-streaming: tool interrupt reasoning, normal = parser.parse_non_stream( "thinkingtool call" ) self.assertEqual(reasoning, "thinking") self.assertEqual(normal, "tool call") # Streaming: tool interrupt parser = ReasoningParser("glm45") chunks = ["", "reasoning", "", "tool args"] all_reasoning = "" all_normal = "" for chunk in chunks: reasoning, normal = parser.parse_stream_chunk(chunk) if reasoning: all_reasoning += reasoning if normal: all_normal += normal self.assertEqual(all_reasoning, "reasoning") self.assertEqual(all_normal, "tool args") def test_kimik2_tool_interruption(self): """Test Kimi-K2 tool interruption through ReasoningParser API.""" parser = ReasoningParser("kimi_k2") # Non-streaming: tool interrupt reasoning, normal = parser.parse_non_stream( "thinking<|tool_calls_section_begin|><|tool_call_begin|>" ) self.assertEqual(reasoning, "thinking") self.assertEqual(normal, "<|tool_calls_section_begin|><|tool_call_begin|>") # Streaming: tool interrupt parser = ReasoningParser("kimi_k2") chunks = [ "", "reasoning", "<|tool_calls_section_begin|>", "<|tool_call_begin|>", ] all_reasoning = "" all_normal = "" for chunk in chunks: reasoning, normal = parser.parse_stream_chunk(chunk) if reasoning: all_reasoning += reasoning if normal: all_normal += normal self.assertEqual(all_reasoning, "reasoning") self.assertEqual(all_normal, "<|tool_calls_section_begin|><|tool_call_begin|>") class TestIntegrationScenarios(CustomTestCase): """Integration tests for realistic usage scenarios.""" def test_deepseek_r1_complete_response(self): """Test complete DeepSeek-R1 response parsing.""" parser = ReasoningParser("deepseek-r1") text = "I need to solve this step by step. First, I'll analyze the problem. The given equation is x + 2 = 5. To solve for x, I subtract 2 from both sides: x = 5 - 2 = 3.The answer is x = 3." reasoning, normal = parser.parse_non_stream(text) self.assertIn("step by step", reasoning) self.assertIn( "= 3", reasoning ) # The reasoning contains "x = 5 - 2 = 3" which has "= 3" self.assertEqual(normal, "The answer is x = 3.") def test_qwen3_streaming_scenario(self): """Test Qwen3 streaming scenario.""" parser = ReasoningParser("qwen3") chunks = [ "", "Let me analyze this problem.", " I need to consider multiple factors.", "", "Based on my analysis, the solution is to use a different approach.", ] all_reasoning = "" all_normal = "" for chunk in chunks: reasoning, normal = parser.parse_stream_chunk(chunk) all_reasoning += reasoning all_normal += normal self.assertIn("analyze", all_reasoning) self.assertIn("multiple factors", all_reasoning) self.assertIn("different approach", all_normal) def test_kimi_streaming_scenario(self): """Test Kimi streaming scenario.""" parser = ReasoningParser("kimi") chunks = [ "◁thi", "nk▷", "Let me analyze this problem.", " I need to consider multiple factors.", "◁/th", "ink▷", "The answer is 42.", ] all_reasoning = "" all_normal = "" for chunk in chunks: reasoning, normal = parser.parse_stream_chunk(chunk) all_reasoning += reasoning all_normal += normal self.assertIn("analyze", all_reasoning) self.assertIn("multiple factors", all_reasoning) self.assertIn("42", all_normal) def test_empty_reasoning_blocks(self): """Test handling of empty reasoning blocks.""" parser = ReasoningParser("qwen3") text = "Just the answer." reasoning, normal = parser.parse_non_stream(text) self.assertEqual(reasoning, "") self.assertEqual(normal, "Just the answer.") def test_qwen3_forced_reasoning_complete_response(self): """Test complete Qwen3-ForcedReasoning response parsing.""" parser = ReasoningParser("qwen3", force_reasoning=True) text = "Let me solve this step by step. The equation is x + 2 = 5. Subtracting 2 from both sides gives x = 3.The solution is x = 3." reasoning, normal = parser.parse_non_stream(text) self.assertIn("step by step", reasoning) self.assertIn("x = 3", reasoning) self.assertEqual(normal, "The solution is x = 3.") def test_qwen3_forced_reasoning_streaming_scenario(self): """Test Qwen3-ForcedReasoning streaming scenario.""" parser = ReasoningParser("qwen3", force_reasoning=True) chunks = [ "I need to analyze", " this problem carefully.", " Let me break it down.", "", "The final answer is 42.", ] all_reasoning = "" all_normal = "" for chunk in chunks: reasoning, normal = parser.parse_stream_chunk(chunk) all_reasoning += reasoning all_normal += normal self.assertIn("analyze", all_reasoning) self.assertIn("break it down", all_reasoning) self.assertIn("final answer", all_normal) class TestBufferLossBugFix(CustomTestCase): """Test cases for the buffer loss bug fix in parse_streaming_increment.""" def test_partial_end_tag_buffer_loss_bug(self): """ Test the bug where partial end tag fragments are lost when followed by normal text. Bug scenario: 1. _in_reasoning is False 2. new_text is "", "") # Step 1: Send partial end tag when not in reasoning mode # This should be buffered since it could be start of "" result1 = detector.parse_streaming_increment("", "") # Send partial start tag result1 = detector.parse_streaming_increment("", "") # Enter reasoning mode detector.parse_streaming_increment("") detector.parse_streaming_increment("some reasoning") # Send partial end tag result1 = detector.parse_streaming_increment("normal text") self.assertEqual(result2.normal_text, "normal text") # The reasoning text should be empty since buffer was cleared when end tag was processed self.assertEqual(result2.reasoning_text, "") def test_multiple_partial_fragments(self): """ Test handling of multiple partial fragments that don't match any tokens. """ detector = BaseReasoningFormatDetector("", "") # Send multiple partial fragments result1 = detector.parse_streaming_increment("<") self.assertEqual(result1.normal_text, "") self.assertEqual(result1.reasoning_text, "") result2 = detector.parse_streaming_increment("/") self.assertEqual(result2.normal_text, "") self.assertEqual(result2.reasoning_text, "") result3 = detector.parse_streaming_increment("random>") self.assertEqual(result3.normal_text, "") self.assertEqual(result3.reasoning_text, "") def test_edge_case_exact_token_match(self): """ Test edge case where buffer content exactly matches a token. """ detector = BaseReasoningFormatDetector("", "") # Build up the exact start token character by character detector.parse_streaming_increment("<") detector.parse_streaming_increment("t") detector.parse_streaming_increment("h") detector.parse_streaming_increment("i") detector.parse_streaming_increment("n") result = detector.parse_streaming_increment("k>") # Should enter reasoning mode self.assertEqual(result.normal_text, "") self.assertEqual(result.reasoning_text, "") self.assertTrue(detector._in_reasoning) self.assertTrue(detector.stripped_think_start) if __name__ == "__main__": unittest.main()