| import logging | |
| from typing import Dict, List, Literal, Optional, Set, Tuple, Type, Union | |
| from sglang.srt.entrypoints.openai.protocol import ( | |
| LegacyStructuralTagResponseFormat, | |
| StructuresResponseFormat, | |
| Tool, | |
| ToolCallConstraint, | |
| ToolChoice, | |
| ) | |
| from sglang.srt.function_call.base_format_detector import BaseFormatDetector | |
| from sglang.srt.function_call.core_types import ToolCallItem | |
| from sglang.srt.function_call.deepseekv3_detector import DeepSeekV3Detector | |
| from sglang.srt.function_call.deepseekv31_detector import DeepSeekV31Detector | |
| from sglang.srt.function_call.glm4_moe_detector import Glm4MoeDetector | |
| from sglang.srt.function_call.gpt_oss_detector import GptOssDetector | |
| from sglang.srt.function_call.kimik2_detector import KimiK2Detector | |
| from sglang.srt.function_call.llama32_detector import Llama32Detector | |
| from sglang.srt.function_call.mistral_detector import MistralDetector | |
| from sglang.srt.function_call.pythonic_detector import PythonicDetector | |
| from sglang.srt.function_call.qwen3_coder_detector import Qwen3CoderDetector | |
| from sglang.srt.function_call.qwen25_detector import Qwen25Detector | |
| from sglang.srt.function_call.step3_detector import Step3Detector | |
| from sglang.srt.function_call.utils import get_json_schema_constraint | |
| logger = logging.getLogger(__name__) | |
| class FunctionCallParser: | |
| """ | |
| Parser for function/tool calls in model outputs. | |
| This class handles both streaming and non-streaming parsing of function calls using a detector. | |
| In streaming scenarios, each time new_text is received, it calls detector.parse_streaming_increment | |
| and returns the resulting normal_text and calls to the upper layer (or SSE). | |
| """ | |
| ToolCallParserEnum: Dict[str, Type[BaseFormatDetector]] = { | |
| "deepseekv3": DeepSeekV3Detector, | |
| "deepseekv31": DeepSeekV31Detector, | |
| "glm": Glm4MoeDetector, | |
| "glm45": Glm4MoeDetector, | |
| "gpt-oss": GptOssDetector, | |
| "kimi_k2": KimiK2Detector, | |
| "llama3": Llama32Detector, | |
| "mistral": MistralDetector, | |
| "pythonic": PythonicDetector, | |
| "qwen": Qwen25Detector, | |
| "qwen25": Qwen25Detector, | |
| "qwen3_coder": Qwen3CoderDetector, | |
| "step3": Step3Detector, | |
| } | |
| def __init__(self, tools: List[Tool], tool_call_parser: str): | |
| detector_class = self.ToolCallParserEnum.get(tool_call_parser) | |
| if detector_class: | |
| detector = detector_class() | |
| else: | |
| raise ValueError(f"Unsupported tool_call_parser: {tool_call_parser}") | |
| self.detector = detector | |
| self.tools = tools | |
| def has_tool_call(self, text: str) -> bool: | |
| """ | |
| Check if the given text contains a tool call in the format supported by this parser. | |
| This delegates to the detector's implementation. | |
| Args: | |
| text: The text to check for tool calls | |
| Returns: | |
| True if the text contains a tool call, False otherwise | |
| """ | |
| if not self.tools: | |
| return False | |
| return self.detector.has_tool_call(text) | |
| def parse_non_stream(self, full_text: str) -> Tuple[str, list[ToolCallItem]]: | |
| """ | |
| One-time parsing of the full text to extract tool calls. | |
| Args: | |
| full_text: The complete text to parse | |
| Returns: | |
| A tuple containing: | |
| - The remaining text after parsing that was not consumed by the detector (can be treated as normal text) | |
| - A list of tool calls parsed from the text | |
| """ | |
| if not self.tools: | |
| return full_text, [] | |
| parsed_result = self.detector.detect_and_parse(full_text, self.tools) | |
| tool_call_list = parsed_result.calls | |
| if tool_call_list: | |
| return parsed_result.normal_text, tool_call_list | |
| else: | |
| return full_text, [] | |
| def parse_stream_chunk(self, chunk_text: str) -> Tuple[str, list[ToolCallItem]]: | |
| """ | |
| Streaming incremental parsing of chunks of text as they arrive. | |
| Args: | |
| chunk_text: The new chunk of text to parse | |
| Returns: | |
| A tuple containing: | |
| - The normal text that should be displayed to the user | |
| - A list of tool calls parsed from the chunk | |
| """ | |
| if not self.tools: | |
| return chunk_text, [] | |
| final_normal_text = "" | |
| final_calls = [] | |
| sp_result = self.detector.parse_streaming_increment(chunk_text, self.tools) | |
| if sp_result.normal_text: | |
| final_normal_text = sp_result.normal_text | |
| if sp_result.calls: | |
| final_calls.extend(sp_result.calls) | |
| final_normal_text = sp_result.normal_text | |
| return final_normal_text, final_calls | |
| def get_structure_tag(self) -> LegacyStructuralTagResponseFormat: | |
| """ | |
| Generate a structural tag response format for all available tools. | |
| This creates the necessary structural tags that guide the model's output format. | |
| """ | |
| tool_structures: List[StructuresResponseFormat] = list() | |
| tool_trigger_set: Set[str] = set() | |
| get_structure_info = self.detector.structure_info() | |
| for tool in self.tools: | |
| function = tool.function | |
| name = function.name | |
| assert name is not None | |
| info = get_structure_info(name) | |
| # accept all if not strict, otherwise only accept the schema | |
| schema = function.parameters if function.strict else {} | |
| tool_structures.append( | |
| StructuresResponseFormat( | |
| begin=info.begin, | |
| schema=schema, # type: ignore | |
| end=info.end, | |
| ) | |
| ) | |
| tool_trigger_set.add(info.trigger) | |
| # TODO(dark): move this into new structural tag format | |
| # This requires all grammar backend support the new format | |
| return LegacyStructuralTagResponseFormat( | |
| type="structural_tag", | |
| structures=tool_structures, | |
| triggers=list(tool_trigger_set), | |
| ) | |
| def get_structure_constraint( | |
| self, tool_choice: Union[ToolChoice, Literal["auto", "required"]] | |
| ) -> Optional[ToolCallConstraint]: | |
| """ | |
| Returns the appropriate structure constraint for tool calls based on the tool_choice. | |
| The constraint is used to guide the model's output format. | |
| Args: | |
| tool_choice: The tool choice setting from the request | |
| Returns: | |
| A tuple of (constraint_type, constraint_value) to be added to sampling parameters, | |
| or None if no constraint applies. | |
| """ | |
| # NOTE: structural_tag only supports JSON-compatible content between the begin and end. | |
| # It cannot parse or validate function call Pythonic or XML-ish syntax. | |
| if ( | |
| self.detector.supports_structural_tag() | |
| and tool_choice == "auto" | |
| and any(tool.function.strict for tool in self.tools) | |
| ): | |
| tag = self.get_structure_tag() | |
| return ("structural_tag", tag) | |
| elif tool_choice == "required" or isinstance(tool_choice, ToolChoice): | |
| json_schema = get_json_schema_constraint(self.tools, tool_choice) | |
| return ("json_schema", json_schema) | |
| def get_ebnf( | |
| self, tool_choice: Union[ToolChoice, Literal["required"]] | |
| ) -> Optional[str]: | |
| """ | |
| Get the EBNF grammar for the specified tool choice. | |
| Args: | |
| tool_choice: The tool choice specification | |
| Returns: | |
| EBNF grammar string, or None if no valid tools found | |
| Note: | |
| If a specific function is requested but not found in available tools, | |
| logs a warning and falls back to using all available tools for backward compatibility. | |
| """ | |
| filtered_tools = [] | |
| if isinstance(tool_choice, ToolChoice): | |
| fn_name = tool_choice.function.name | |
| filtered_tools = [t for t in self.tools if t.function.name == fn_name] | |
| # Check if the requested function exists in available tools | |
| if not filtered_tools: | |
| available_functions = [t.function.name for t in self.tools] | |
| logger.warning( | |
| f"Function '{fn_name}' not found in available tools. " | |
| f"Available functions: {available_functions}. " | |
| f"Skipping tool choice." | |
| ) | |
| # TODO: Return a 400 error instead of warning when adapter supports proper error handling | |
| # For now, fall back to return None | |
| return None | |
| else: | |
| filtered_tools = self.tools | |
| return self.detector.build_ebnf(filtered_tools) | |
Xet Storage Details
- Size:
- 8.76 kB
- Xet hash:
- d19abd1fc1851e553fe7e1071d55a3303444cbb89f9cd1be3a6f436e757e3260
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.