Spaces:
Running
Running
File size: 7,938 Bytes
75ff73e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | """Python function tools for the public ResearchHarness embedding API."""
from __future__ import annotations
import inspect
import re
from collections.abc import Callable, Sequence as AbcSequence
from types import UnionType
from typing import Any, Literal, Sequence, Union, get_args, get_origin, get_type_hints
from agent_base.tools.tooling import ToolBase
TOOL_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_-]{0,63}$")
CONTEXT_PARAMETER_NAMES = frozenset({"workspace_root", "runtime_deadline", "model_name"})
class FunctionTool(ToolBase):
"""ToolBase adapter for a validated Python function."""
def __init__(self, func: Callable[..., Any], *, name: str | None = None, description: str | None = None):
self.func = func
self._context_parameters: set[str] = set()
self.name = _resolve_tool_name(func, name)
self.description = _resolve_tool_description(func, description)
self.parameters = _schema_from_signature(func, self._context_parameters)
super().__init__()
def call(self, params: str | dict[str, Any], **kwargs: Any) -> Any:
parsed = self.parse_json_args(params)
call_kwargs = dict(parsed)
for name in self._context_parameters:
if name in kwargs:
call_kwargs[name] = kwargs[name]
return self.func(**call_kwargs)
def tool(
func: Callable[..., Any] | None = None,
*,
name: str | None = None,
description: str | None = None,
) -> Callable[..., Any]:
"""Mark a Python function as a ResearchHarness custom tool.
The decorated function remains directly callable. ResearchHarness converts it
into a ToolBase instance when passed to create_agent(tools=[...]).
"""
def decorate(inner: Callable[..., Any]) -> Callable[..., Any]:
if not callable(inner):
raise TypeError("@tool can only decorate a callable.")
setattr(inner, "__researchharness_tool__", {"name": name, "description": description})
return inner
if func is None:
return decorate
return decorate(func)
def build_custom_tool_map(custom_tools: Sequence[Any] | None) -> dict[str, ToolBase]:
"""Validate and instantiate user-provided custom tools."""
resolved: dict[str, ToolBase] = {}
for item in custom_tools or []:
tool_obj = _coerce_custom_tool(item)
if tool_obj.name in resolved:
raise ValueError(f"Duplicate custom tool name: {tool_obj.name}")
resolved[tool_obj.name] = tool_obj
return resolved
def _coerce_custom_tool(item: Any) -> ToolBase:
if isinstance(item, ToolBase):
return item
if callable(item):
metadata = getattr(item, "__researchharness_tool__", None)
if not isinstance(metadata, dict):
raise ValueError(
f"Custom tool function {getattr(item, '__name__', item)!r} must be decorated with @researchharness.tool."
)
return FunctionTool(
item,
name=metadata.get("name"),
description=metadata.get("description"),
)
raise ValueError(f"Custom tool must be a decorated function or ToolBase instance, got {type(item).__name__}.")
def _resolve_tool_name(func: Callable[..., Any], override: str | None) -> str:
name = str(override or getattr(func, "__name__", "")).strip()
if not name:
raise ValueError("Custom tool name must be non-empty.")
if not TOOL_NAME_RE.fullmatch(name):
raise ValueError(
f"Invalid custom tool name {name!r}. Use 1-64 characters: letters, numbers, underscore, or hyphen; start with a letter or underscore."
)
return name
def _resolve_tool_description(func: Callable[..., Any], override: str | None) -> str:
description = str(override or inspect.getdoc(func) or "").strip()
if not description:
raise ValueError(f"Custom tool {getattr(func, '__name__', '<callable>')!r} must have a docstring or description.")
return description
def _schema_from_signature(func: Callable[..., Any], context_parameters: set[str]) -> dict[str, Any]:
signature = inspect.signature(func)
try:
hints = get_type_hints(func)
except Exception as exc:
raise ValueError(f"Could not resolve type hints for custom tool {func.__name__}: {exc}") from exc
properties: dict[str, Any] = {}
required: list[str] = []
for param in signature.parameters.values():
if param.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD):
raise ValueError(f"Custom tool {func.__name__} may not use *args or **kwargs.")
if param.kind == inspect.Parameter.POSITIONAL_ONLY:
raise ValueError(f"Custom tool {func.__name__} may not use positional-only parameters.")
if param.name in CONTEXT_PARAMETER_NAMES:
if param.kind is not inspect.Parameter.KEYWORD_ONLY:
raise ValueError(f"Context parameter {param.name!r} in custom tool {func.__name__} must be keyword-only.")
context_parameters.add(param.name)
continue
if param.name not in hints:
raise ValueError(f"Custom tool {func.__name__} parameter {param.name!r} must have a type annotation.")
schema, nullable = _annotation_to_schema(hints[param.name], f"{func.__name__}.{param.name}")
if param.default is inspect.Parameter.empty and not nullable:
required.append(param.name)
elif param.default is not inspect.Parameter.empty:
schema["default"] = param.default
properties[param.name] = schema
return {
"type": "object",
"properties": properties,
"required": required,
"additionalProperties": False,
}
def _annotation_to_schema(annotation: Any, label: str) -> tuple[dict[str, Any], bool]:
origin = get_origin(annotation)
args = get_args(annotation)
if annotation is Any:
raise ValueError(f"Custom tool parameter {label} may not use Any; use a concrete JSON-compatible type.")
if origin in (UnionType, Union):
non_none = [arg for arg in args if arg is not type(None)]
if len(non_none) == 1 and len(non_none) != len(args):
schema, _ = _annotation_to_schema(non_none[0], label)
return schema, True
raise ValueError(f"Custom tool parameter {label} uses an unsupported union type.")
if origin is Literal:
values = list(args)
if not values:
raise ValueError(f"Custom tool parameter {label} uses an empty Literal.")
value_types = {type(value) for value in values}
if len(value_types) != 1 or next(iter(value_types)) not in {str, int, float, bool}:
raise ValueError(f"Custom tool parameter {label} uses unsupported Literal values.")
schema, _ = _annotation_to_schema(type(values[0]), label)
schema["enum"] = values
return schema, False
if annotation is str:
return {"type": "string"}, False
if annotation is int:
return {"type": "integer"}, False
if annotation is float:
return {"type": "number"}, False
if annotation is bool:
return {"type": "boolean"}, False
if annotation is dict:
return {"type": "object"}, False
if annotation in (list, tuple):
return {"type": "array"}, False
if origin in (list, tuple, Sequence, AbcSequence):
item_schema: dict[str, Any] = {}
if args and args[0] is not Ellipsis:
item_schema, _ = _annotation_to_schema(args[0], label)
return {"type": "array", "items": item_schema}, False
if origin is dict:
key_type = args[0] if args else str
if key_type is not str:
raise ValueError(f"Custom tool parameter {label} dict keys must be str.")
return {"type": "object"}, False
raise ValueError(f"Custom tool parameter {label} has unsupported type annotation: {annotation!r}")
|