| import re |
| import uuid |
| from enum import Enum |
| from typing import Any |
|
|
| from loguru import logger |
|
|
| |
| |
| |
| _CONTROL_TOKEN_RE = re.compile(r"<\|[^|>]{1,80}\|>") |
| _CONTROL_TOKEN_START = "<|" |
| _CONTROL_TOKEN_END = "|>" |
|
|
|
|
| class ParserState(Enum): |
| TEXT = 1 |
| MATCHING_FUNCTION = 2 |
| PARSING_PARAMETERS = 3 |
|
|
|
|
| class HeuristicToolParser: |
| """ |
| Stateful parser that detects raw text tool calls in the format: |
| β <function=Name><parameter=key>value</parameter>... |
| |
| This is used as a fallback for models that emit tool calls as text |
| instead of using the structured API. |
| """ |
|
|
| |
| _FUNC_START_PATTERN = re.compile(r"β\s*<function=([^>]+)>") |
| _PARAM_PATTERN = re.compile( |
| r"<parameter=([^>]+)>(.*?)(?:</parameter>|$)", re.DOTALL |
| ) |
|
|
| def __init__(self): |
| self._state = ParserState.TEXT |
| self._buffer = "" |
| self._current_tool_id = None |
| self._current_function_name = None |
| self._current_parameters = {} |
|
|
| def _strip_control_tokens(self, text: str) -> str: |
| |
| |
| return _CONTROL_TOKEN_RE.sub("", text) |
|
|
| def _split_incomplete_control_token_tail(self) -> str: |
| """ |
| If the buffer ends with an incomplete "<|...|>" sentinel token, keep that |
| fragment in the buffer and return the safe-to-emit prefix. |
| |
| This prevents leaking raw sentinel fragments to the user when streaming. |
| """ |
| start = self._buffer.rfind(_CONTROL_TOKEN_START) |
| if start == -1: |
| return "" |
| end = self._buffer.find(_CONTROL_TOKEN_END, start) |
| if end != -1: |
| return "" |
|
|
| prefix = self._buffer[:start] |
| self._buffer = self._buffer[start:] |
| return prefix |
|
|
| def feed(self, text: str) -> tuple[str, list[dict[str, Any]]]: |
| """ |
| Feed text into the parser. |
| Returns a tuple of (filtered_text, detected_tool_calls). |
| |
| filtered_text: Text that should be passed through as normal message content. |
| detected_tools: List of Anthropic-format tool_use blocks. |
| """ |
| self._buffer += text |
| self._buffer = self._strip_control_tokens(self._buffer) |
| detected_tools = [] |
| filtered_output_parts: list[str] = [] |
|
|
| while True: |
| if self._state == ParserState.TEXT: |
| |
| if "β" in self._buffer: |
| idx = self._buffer.find("β") |
| filtered_output_parts.append(self._buffer[:idx]) |
| self._buffer = self._buffer[idx:] |
| self._state = ParserState.MATCHING_FUNCTION |
| else: |
| |
| |
| safe_prefix = self._split_incomplete_control_token_tail() |
| if safe_prefix: |
| filtered_output_parts.append(safe_prefix) |
| break |
|
|
| filtered_output_parts.append(self._buffer) |
| self._buffer = "" |
| break |
|
|
| if self._state == ParserState.MATCHING_FUNCTION: |
| |
| |
| match = self._FUNC_START_PATTERN.search(self._buffer) |
| if match: |
| self._current_function_name = match.group(1).strip() |
| self._current_tool_id = f"toolu_heuristic_{uuid.uuid4().hex[:8]}" |
| self._current_parameters = {} |
|
|
| |
| self._buffer = self._buffer[match.end() :] |
| self._state = ParserState.PARSING_PARAMETERS |
| logger.debug( |
| "Heuristic bypass: Detected start of tool call '{}'", |
| self._current_function_name, |
| ) |
| else: |
| |
| |
| if len(self._buffer) > 100: |
| |
| filtered_output_parts.append(self._buffer[0]) |
| self._buffer = self._buffer[1:] |
| self._state = ParserState.TEXT |
| else: |
| break |
|
|
| if self._state == ParserState.PARSING_PARAMETERS: |
| |
| |
|
|
| |
| |
|
|
| finished_tool_call = False |
|
|
| |
| while True: |
| param_match = self._PARAM_PATTERN.search(self._buffer) |
| if param_match and "</parameter>" in param_match.group(0): |
| |
| pre_match_text = self._buffer[: param_match.start()] |
| if pre_match_text: |
| filtered_output_parts.append(pre_match_text) |
|
|
| key = param_match.group(1).strip() |
| val = param_match.group(2).strip() |
| self._current_parameters[key] = val |
| self._buffer = self._buffer[param_match.end() :] |
| else: |
| break |
|
|
| |
| |
| |
| |
|
|
| if "β" in self._buffer: |
| |
| |
| idx = self._buffer.find("β") |
| if idx > 0: |
| filtered_output_parts.append(self._buffer[:idx]) |
| self._buffer = self._buffer[idx:] |
| finished_tool_call = True |
| elif len(self._buffer) > 0 and not self._buffer.strip().startswith("<"): |
| |
| |
| if "<parameter=" not in self._buffer: |
| |
| |
| |
| filtered_output_parts.append(self._buffer) |
| self._buffer = "" |
| finished_tool_call = True |
|
|
| if finished_tool_call: |
| |
| detected_tools.append( |
| { |
| "type": "tool_use", |
| "id": self._current_tool_id, |
| "name": self._current_function_name, |
| "input": self._current_parameters, |
| } |
| ) |
| logger.debug( |
| "Heuristic bypass: Emitting tool call '{}' with {} params", |
| self._current_function_name, |
| len(self._current_parameters), |
| ) |
| self._state = ParserState.TEXT |
| |
| else: |
| break |
|
|
| return "".join(filtered_output_parts), detected_tools |
|
|
| def flush(self) -> list[dict[str, Any]]: |
| """ |
| Flush any remaining tool calls in the buffer. |
| """ |
| self._buffer = self._strip_control_tokens(self._buffer) |
| detected_tools = [] |
| if self._state == ParserState.PARSING_PARAMETERS: |
| |
| |
| partial_matches = re.finditer( |
| r"<parameter=([^>]+)>(.*)$", self._buffer, re.DOTALL |
| ) |
| for m in partial_matches: |
| key = m.group(1).strip() |
| val = m.group(2).strip() |
| self._current_parameters[key] = val |
|
|
| detected_tools.append( |
| { |
| "type": "tool_use", |
| "id": self._current_tool_id, |
| "name": self._current_function_name, |
| "input": self._current_parameters, |
| } |
| ) |
| self._state = ParserState.TEXT |
| self._buffer = "" |
|
|
| return detected_tools |
|
|