| from typing import Dict, Optional, Tuple, Type | |
| from sglang.srt.parser.harmony_parser import HarmonyParser | |
| class StreamingParseResult: | |
| """Result of streaming incremental parsing.""" | |
| def __init__( | |
| self, | |
| normal_text: Optional[str] = None, | |
| reasoning_text: Optional[str] = None, | |
| ): | |
| self.normal_text = normal_text or "" | |
| self.reasoning_text = reasoning_text or "" | |
| class BaseReasoningFormatDetector: | |
| """Base class providing two sets of interfaces: one-time and streaming incremental.""" | |
| def __init__( | |
| self, | |
| think_start_token: str, | |
| think_end_token: str, | |
| force_reasoning: bool = False, | |
| stream_reasoning: bool = True, | |
| ): | |
| self.think_start_token = think_start_token | |
| self.think_end_token = think_end_token | |
| self._in_reasoning = force_reasoning | |
| self.stream_reasoning = stream_reasoning | |
| self._buffer = "" | |
| self.stripped_think_start = False | |
| def detect_and_parse(self, text: str) -> StreamingParseResult: | |
| """ | |
| One-time parsing: Detects and parses reasoning sections in the provided text. | |
| Returns both reasoning content and normal text separately. | |
| """ | |
| in_reasoning = self._in_reasoning or self.think_start_token in text | |
| if not in_reasoning: | |
| return StreamingParseResult(normal_text=text) | |
| # The text is considered to be in a reasoning block. | |
| processed_text = text.replace(self.think_start_token, "").strip() | |
| if self.think_end_token not in processed_text: | |
| # Assume reasoning was truncated before `</think>` token | |
| return StreamingParseResult(reasoning_text=processed_text) | |
| # Extract reasoning content | |
| splits = processed_text.split(self.think_end_token, maxsplit=1) | |
| reasoning_text = splits[0] | |
| normal_text = splits[1].strip() | |
| return StreamingParseResult( | |
| normal_text=normal_text, reasoning_text=reasoning_text | |
| ) | |
| def parse_streaming_increment(self, new_text: str) -> StreamingParseResult: | |
| """ | |
| Streaming incremental parsing for reasoning content. | |
| Handles partial reasoning tags and content. | |
| If stream_reasoning is False: | |
| Accumulates reasoning content until the end tag is found | |
| If stream_reasoning is True: | |
| Streams reasoning content as it arrives | |
| """ | |
| self._buffer += new_text | |
| current_text = self._buffer | |
| # If the current text is a prefix of the think token, keep buffering | |
| if any( | |
| token.startswith(current_text) and token != current_text | |
| for token in [self.think_start_token, self.think_end_token] | |
| ): | |
| return StreamingParseResult() | |
| # Strip `<think>` token if present | |
| if not self.stripped_think_start and self.think_start_token in current_text: | |
| current_text = current_text.replace(self.think_start_token, "") | |
| self.stripped_think_start = True | |
| self._in_reasoning = True | |
| # Handle end of reasoning block | |
| if self._in_reasoning and self.think_end_token in current_text: | |
| end_idx = current_text.find(self.think_end_token) | |
| reasoning_text = current_text[:end_idx] | |
| self._buffer = "" | |
| self._in_reasoning = False | |
| normal_text = current_text[end_idx + len(self.think_end_token) :] | |
| return StreamingParseResult( | |
| normal_text=normal_text, reasoning_text=reasoning_text.rstrip() | |
| ) | |
| # Continue with reasoning content | |
| if self._in_reasoning: | |
| if self.stream_reasoning: | |
| # Stream the content immediately | |
| self._buffer = "" | |
| return StreamingParseResult(reasoning_text=current_text) | |
| else: | |
| return StreamingParseResult() | |
| # If we're not in a reasoning block return as normal text | |
| if not self._in_reasoning: | |
| self._buffer = "" | |
| return StreamingParseResult(normal_text=current_text) | |
| return StreamingParseResult() | |
| class DeepSeekR1Detector(BaseReasoningFormatDetector): | |
| """ | |
| Detector for DeepSeek-R1 model. | |
| Assumes reasoning format: | |
| (<think>)*(.*)</think> | |
| Returns all the text before the </think> tag as `reasoning_text` | |
| and the rest of the text as `normal_text`. | |
| Supported models: | |
| - DeepSeek-R1: Always generates thinking content without <think> start tag | |
| - DeepSeek-R1-0528: Generates thinking content with <think> start tag | |
| Format patterns: | |
| - DeepSeek-R1: "I need to think about this...</think>The answer is 42." | |
| - DeepSeek-R1-0528: "<think>I need to think about this...</think>The answer is 42." | |
| Args: | |
| stream_reasoning (bool): If False, accumulates reasoning content until the end tag. | |
| If True, streams reasoning content as it arrives. | |
| """ | |
| def __init__(self, stream_reasoning: bool = True, force_reasoning: bool = True): | |
| # DeepSeek-R1 is assumed to be reasoning until `</think>` token | |
| super().__init__( | |
| "<think>", | |
| "</think>", | |
| force_reasoning=True, | |
| stream_reasoning=stream_reasoning, | |
| ) | |
| # https://github.com/sgl-project/sglang/pull/3202#discussion_r1950153599 | |
| class Qwen3Detector(BaseReasoningFormatDetector): | |
| """ | |
| Detector for Qwen3 models (e.g., Qwen/Qwen3-235B-A22B). | |
| Assumes reasoning format: | |
| (<think>)*(.*)</think> | |
| Qwen3 models released before 07/2025 supports switching between thinking mode and normal | |
| mode using `enable_thinking` parameter in the request parameter. | |
| - enable_thinking=True: "<think>reasoning content</think>The answer is 42." | |
| - enable_thinking=False: "The answer is 42." (no thinking tokens) | |
| Args: | |
| stream_reasoning (bool): If False, accumulates reasoning content until the end tag. | |
| If True, streams reasoning content as it arrives. | |
| """ | |
| def __init__(self, stream_reasoning: bool = True, force_reasoning: bool = False): | |
| super().__init__( | |
| "<think>", | |
| "</think>", | |
| force_reasoning=force_reasoning, | |
| stream_reasoning=stream_reasoning, | |
| ) | |
| class KimiDetector(BaseReasoningFormatDetector): | |
| """ | |
| Detector for Kimi Thinking model. | |
| Assumes reasoning format: | |
| ◁think▷*(.*)◁/think▷ | |
| Returns all the text before the ◁/think▷ tag as `reasoning_text` | |
| and the rest of the text as `normal_text`. | |
| """ | |
| def __init__(self, stream_reasoning: bool = True, force_reasoning: bool = False): | |
| super().__init__( | |
| "◁think▷", | |
| "◁/think▷", | |
| force_reasoning=False, | |
| stream_reasoning=stream_reasoning, | |
| ) | |
| class GptOssDetector(BaseReasoningFormatDetector): | |
| """ | |
| Detector for T4-style reasoning format (GPT-OSS), using the HarmonyParser. | |
| """ | |
| def __init__(self, stream_reasoning: bool = True, force_reasoning: bool = True): | |
| super().__init__( | |
| "<|channel|>analysis<|message|>", | |
| "<|end|>", | |
| force_reasoning=force_reasoning, | |
| stream_reasoning=stream_reasoning, | |
| ) | |
| self.parser = HarmonyParser() | |
| def detect_and_parse(self, text: str) -> StreamingParseResult: | |
| events = self.parser.parse(text) | |
| # Flush the buffer for one-shot parsing | |
| events += self.parser.parse("") | |
| reasoning_text = "".join( | |
| [e.content for e in events if e.event_type == "reasoning"] | |
| ) | |
| normal_parts = [] | |
| for e in events: | |
| if e.event_type == "normal": | |
| normal_parts.append(e.content) | |
| elif e.event_type == "tool_call": | |
| # Use raw_text to preserve structural markers for function call detector | |
| normal_parts.append(e.raw_text if e.raw_text else e.content) | |
| normal_text = "".join(normal_parts) | |
| # Tool call events preserve raw text with structural markers | |
| return StreamingParseResult( | |
| normal_text=normal_text, | |
| reasoning_text=reasoning_text, | |
| ) | |
| def parse_streaming_increment(self, new_text: str) -> StreamingParseResult: | |
| events = self.parser.parse(new_text) | |
| reasoning_text = "".join( | |
| [e.content for e in events if e.event_type == "reasoning"] | |
| ) | |
| normal_parts = [] | |
| for e in events: | |
| if e.event_type == "normal": | |
| normal_parts.append(e.content) | |
| elif e.event_type == "tool_call": | |
| # Use raw_text to preserve structural markers for function call detector | |
| normal_parts.append(e.raw_text if e.raw_text else e.content) | |
| normal_text = "".join(normal_parts) | |
| return StreamingParseResult( | |
| normal_text=normal_text, | |
| reasoning_text=reasoning_text, | |
| ) | |
| class ReasoningParser: | |
| """ | |
| Parser that handles both streaming and non-streaming scenarios for extracting | |
| reasoning content from model outputs. | |
| Args: | |
| model_type (str): Type of model to parse reasoning from | |
| stream_reasoning (bool): If False, accumulates reasoning content until complete. | |
| If True, streams reasoning content as it arrives. | |
| """ | |
| DetectorMap: Dict[str, Type[BaseReasoningFormatDetector]] = { | |
| "deepseek-r1": DeepSeekR1Detector, | |
| "deepseek-v3": Qwen3Detector, | |
| "glm45": Qwen3Detector, | |
| "gpt-oss": GptOssDetector, | |
| "kimi": KimiDetector, | |
| "qwen3": Qwen3Detector, | |
| "qwen3-thinking": Qwen3Detector, | |
| "step3": DeepSeekR1Detector, | |
| } | |
| def __init__( | |
| self, | |
| model_type: Optional[str] = None, | |
| stream_reasoning: bool = True, | |
| force_reasoning: Optional[bool] = None, | |
| ): | |
| if not model_type: | |
| raise ValueError("Model type must be specified") | |
| detector_class = self.DetectorMap.get(model_type.lower()) | |
| if not detector_class: | |
| raise ValueError(f"Unsupported model type: {model_type}") | |
| # Special cases where we override force_reasoning | |
| if model_type.lower() in {"qwen3-thinking", "gpt-oss"}: | |
| force_reasoning = True | |
| # Only pass force_reasoning if explicitly set, let detectors use their defaults | |
| kwargs = {"stream_reasoning": stream_reasoning} | |
| if force_reasoning is not None: | |
| kwargs["force_reasoning"] = force_reasoning | |
| self.detector = detector_class(**kwargs) | |
| def parse_non_stream(self, full_text: str) -> Tuple[Optional[str], Optional[str]]: | |
| """Non-streaming call: one-time parsing""" | |
| ret = self.detector.detect_and_parse(full_text) | |
| return ret.reasoning_text, ret.normal_text | |
| def parse_stream_chunk( | |
| self, chunk_text: str | |
| ) -> Tuple[Optional[str], Optional[str]]: | |
| """Streaming call: incremental parsing""" | |
| ret = self.detector.parse_streaming_increment(chunk_text) | |
| return ret.reasoning_text, ret.normal_text | |
Xet Storage Details
- Size:
- 11.1 kB
- Xet hash:
- 9dab256c7c179aa70fe33d81a6ba6dbac727516e256f7f7187d0e152eb9d4564
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.