| """Utilities for parsing model output into structured tool calls.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import re |
| import warnings |
| from typing import Any |
|
|
| from .models import ToolAction |
| from .tool_catalog import ( |
| KNOWN_TOOL_NAMES, |
| ToolValidationError, |
| canonicalize_tool_name, |
| validate_tool_arguments, |
| ) |
|
|
|
|
| class ToolParseError(ValueError): |
| """Raised when model output cannot be converted into a ToolAction.""" |
|
|
|
|
| class ParseError(ToolParseError): |
| """Raised when tool-call JSON cannot be extracted or validated safely.""" |
|
|
|
|
| class ParseWarning(UserWarning): |
| """Warning emitted when the parser must use a weaker extraction fallback.""" |
|
|
|
|
| def _format_parse_error(message: str, raw_output: str) -> str: |
| """Attach a compact raw-output preview to parser failures for debugging.""" |
|
|
| preview = raw_output.strip().replace("\n", "\\n") |
| if len(preview) > 240: |
| preview = preview[:237] + "..." |
| return f"{message} Raw output: {preview}" |
|
|
|
|
| def _normalize_action_payload(payload: Any, raw_output: str) -> dict[str, Any]: |
| """Normalize schema variants into a canonical tool-action payload.""" |
|
|
| if not isinstance(payload, dict): |
| raise ParseError(_format_parse_error("Parsed JSON must be an object.", raw_output)) |
|
|
| if "action" in payload: |
| nested_action = payload["action"] |
| if not isinstance(nested_action, dict): |
| raise ParseError( |
| _format_parse_error("Top-level 'action' must itself be a JSON object.", raw_output) |
| ) |
| if "reasoning" in payload and "reasoning" not in nested_action: |
| nested_action = {**nested_action, "reasoning": payload["reasoning"]} |
| payload = nested_action |
|
|
| if "tool_name" not in payload: |
| raise ParseError( |
| _format_parse_error( |
| "Tool-call JSON must contain either a top-level 'tool_name' or 'action' key.", |
| raw_output, |
| ) |
| ) |
|
|
| return payload |
|
|
|
|
| def _decode_json(candidate: str, raw_output: str) -> dict[str, Any]: |
| """Decode one JSON candidate and normalize it to the expected action schema.""" |
|
|
| try: |
| payload = json.loads(candidate) |
| except json.JSONDecodeError as exc: |
| raise ParseError( |
| _format_parse_error(f"Could not decode model output as JSON: {exc}", raw_output) |
| ) from exc |
|
|
| return _normalize_action_payload(payload, raw_output) |
|
|
|
|
| def _iter_schema_objects(text: str) -> list[dict[str, Any]]: |
| """Scan raw text for standalone JSON objects with tool-action top-level keys.""" |
|
|
| decoder = json.JSONDecoder() |
| matches: list[dict[str, Any]] = [] |
| for index, character in enumerate(text): |
| if character != "{": |
| continue |
| try: |
| payload, _end_index = decoder.raw_decode(text[index:]) |
| except json.JSONDecodeError: |
| continue |
| if isinstance(payload, dict) and ("tool_name" in payload or "action" in payload): |
| matches.append(payload) |
| return matches |
|
|
|
|
| def parse_with_fallback(llm_output: str, log_warnings: bool = True) -> dict[str, Any]: |
| """Parse LLM output with a strict extraction hierarchy and visible fallbacks.""" |
|
|
| candidate = llm_output.strip() |
| fenced_blocks = re.findall(r"```(?:json)?\s*(.*?)\s*```", candidate, re.DOTALL) |
| for block in fenced_blocks: |
| try: |
| return _decode_json(block.strip(), llm_output) |
| except ParseError: |
| continue |
|
|
| for payload in _iter_schema_objects(candidate): |
| return _normalize_action_payload(payload, llm_output) |
|
|
| first = candidate.find("{") |
| last = candidate.rfind("}") |
| if first != -1 and last != -1 and last > first: |
| if log_warnings: |
| warnings.warn( |
| "Parser fell back to broad brace extraction because no fenced block or schema-keyed JSON object was found.", |
| ParseWarning, |
| stacklevel=2, |
| ) |
| return _decode_json(candidate[first : last + 1], llm_output) |
|
|
| raise ParseError(_format_parse_error("No JSON object could be extracted from model output.", llm_output)) |
|
|
|
|
| def extract_json_object(text: str) -> dict[str, Any]: |
| """Backward-compatible wrapper around the stricter hierarchical parser.""" |
|
|
| return parse_with_fallback(text, log_warnings=True) |
|
|
|
|
| def parse_tool_action( |
| text: str, |
| *, |
| allowed_tools: list[str] | None = None, |
| ) -> ToolAction: |
| """Parse raw model output into a validated ToolAction.""" |
|
|
| payload = parse_with_fallback(text, log_warnings=True) |
| tool_name = payload.get("tool_name") |
| arguments = payload.get("arguments", {}) |
| reasoning = payload.get("reasoning") |
|
|
| if not isinstance(tool_name, str) or not tool_name.strip(): |
| raise ParseError(_format_parse_error("tool_name must be a non-empty string.", text)) |
|
|
| if reasoning is not None and not isinstance(reasoning, str): |
| raise ParseError(_format_parse_error("reasoning must be a string when provided.", text)) |
|
|
| valid_tools = allowed_tools or list(KNOWN_TOOL_NAMES) |
| canonical_tool_name = canonicalize_tool_name(tool_name, allowed_tools=valid_tools) |
| try: |
| normalized_arguments = validate_tool_arguments( |
| canonical_tool_name, |
| arguments, |
| allowed_tools=valid_tools, |
| ) |
| except ToolValidationError as exc: |
| raise ParseError(_format_parse_error(str(exc), text)) from exc |
|
|
| return ToolAction( |
| tool_name=canonical_tool_name, |
| arguments=normalized_arguments, |
| reasoning=reasoning, |
| ) |
|
|