Spaces:
Running
Running
| """Python function tools for the public ResearchHarness embedding API.""" | |
| from __future__ import annotations | |
| import inspect | |
| import re | |
| from collections.abc import Callable, Sequence as AbcSequence | |
| from types import UnionType | |
| from typing import Any, Literal, Sequence, Union, get_args, get_origin, get_type_hints | |
| from agent_base.tools.tooling import ToolBase | |
| TOOL_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_-]{0,63}$") | |
| CONTEXT_PARAMETER_NAMES = frozenset({"workspace_root", "runtime_deadline", "model_name"}) | |
| class FunctionTool(ToolBase): | |
| """ToolBase adapter for a validated Python function.""" | |
| def __init__(self, func: Callable[..., Any], *, name: str | None = None, description: str | None = None): | |
| self.func = func | |
| self._context_parameters: set[str] = set() | |
| self.name = _resolve_tool_name(func, name) | |
| self.description = _resolve_tool_description(func, description) | |
| self.parameters = _schema_from_signature(func, self._context_parameters) | |
| super().__init__() | |
| def call(self, params: str | dict[str, Any], **kwargs: Any) -> Any: | |
| parsed = self.parse_json_args(params) | |
| call_kwargs = dict(parsed) | |
| for name in self._context_parameters: | |
| if name in kwargs: | |
| call_kwargs[name] = kwargs[name] | |
| return self.func(**call_kwargs) | |
| def tool( | |
| func: Callable[..., Any] | None = None, | |
| *, | |
| name: str | None = None, | |
| description: str | None = None, | |
| ) -> Callable[..., Any]: | |
| """Mark a Python function as a ResearchHarness custom tool. | |
| The decorated function remains directly callable. ResearchHarness converts it | |
| into a ToolBase instance when passed to create_agent(tools=[...]). | |
| """ | |
| def decorate(inner: Callable[..., Any]) -> Callable[..., Any]: | |
| if not callable(inner): | |
| raise TypeError("@tool can only decorate a callable.") | |
| setattr(inner, "__researchharness_tool__", {"name": name, "description": description}) | |
| return inner | |
| if func is None: | |
| return decorate | |
| return decorate(func) | |
| def build_custom_tool_map(custom_tools: Sequence[Any] | None) -> dict[str, ToolBase]: | |
| """Validate and instantiate user-provided custom tools.""" | |
| resolved: dict[str, ToolBase] = {} | |
| for item in custom_tools or []: | |
| tool_obj = _coerce_custom_tool(item) | |
| if tool_obj.name in resolved: | |
| raise ValueError(f"Duplicate custom tool name: {tool_obj.name}") | |
| resolved[tool_obj.name] = tool_obj | |
| return resolved | |
| def _coerce_custom_tool(item: Any) -> ToolBase: | |
| if isinstance(item, ToolBase): | |
| return item | |
| if callable(item): | |
| metadata = getattr(item, "__researchharness_tool__", None) | |
| if not isinstance(metadata, dict): | |
| raise ValueError( | |
| f"Custom tool function {getattr(item, '__name__', item)!r} must be decorated with @researchharness.tool." | |
| ) | |
| return FunctionTool( | |
| item, | |
| name=metadata.get("name"), | |
| description=metadata.get("description"), | |
| ) | |
| raise ValueError(f"Custom tool must be a decorated function or ToolBase instance, got {type(item).__name__}.") | |
| def _resolve_tool_name(func: Callable[..., Any], override: str | None) -> str: | |
| name = str(override or getattr(func, "__name__", "")).strip() | |
| if not name: | |
| raise ValueError("Custom tool name must be non-empty.") | |
| if not TOOL_NAME_RE.fullmatch(name): | |
| raise ValueError( | |
| f"Invalid custom tool name {name!r}. Use 1-64 characters: letters, numbers, underscore, or hyphen; start with a letter or underscore." | |
| ) | |
| return name | |
| def _resolve_tool_description(func: Callable[..., Any], override: str | None) -> str: | |
| description = str(override or inspect.getdoc(func) or "").strip() | |
| if not description: | |
| raise ValueError(f"Custom tool {getattr(func, '__name__', '<callable>')!r} must have a docstring or description.") | |
| return description | |
| def _schema_from_signature(func: Callable[..., Any], context_parameters: set[str]) -> dict[str, Any]: | |
| signature = inspect.signature(func) | |
| try: | |
| hints = get_type_hints(func) | |
| except Exception as exc: | |
| raise ValueError(f"Could not resolve type hints for custom tool {func.__name__}: {exc}") from exc | |
| properties: dict[str, Any] = {} | |
| required: list[str] = [] | |
| for param in signature.parameters.values(): | |
| if param.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD): | |
| raise ValueError(f"Custom tool {func.__name__} may not use *args or **kwargs.") | |
| if param.kind == inspect.Parameter.POSITIONAL_ONLY: | |
| raise ValueError(f"Custom tool {func.__name__} may not use positional-only parameters.") | |
| if param.name in CONTEXT_PARAMETER_NAMES: | |
| if param.kind is not inspect.Parameter.KEYWORD_ONLY: | |
| raise ValueError(f"Context parameter {param.name!r} in custom tool {func.__name__} must be keyword-only.") | |
| context_parameters.add(param.name) | |
| continue | |
| if param.name not in hints: | |
| raise ValueError(f"Custom tool {func.__name__} parameter {param.name!r} must have a type annotation.") | |
| schema, nullable = _annotation_to_schema(hints[param.name], f"{func.__name__}.{param.name}") | |
| if param.default is inspect.Parameter.empty and not nullable: | |
| required.append(param.name) | |
| elif param.default is not inspect.Parameter.empty: | |
| schema["default"] = param.default | |
| properties[param.name] = schema | |
| return { | |
| "type": "object", | |
| "properties": properties, | |
| "required": required, | |
| "additionalProperties": False, | |
| } | |
| def _annotation_to_schema(annotation: Any, label: str) -> tuple[dict[str, Any], bool]: | |
| origin = get_origin(annotation) | |
| args = get_args(annotation) | |
| if annotation is Any: | |
| raise ValueError(f"Custom tool parameter {label} may not use Any; use a concrete JSON-compatible type.") | |
| if origin in (UnionType, Union): | |
| non_none = [arg for arg in args if arg is not type(None)] | |
| if len(non_none) == 1 and len(non_none) != len(args): | |
| schema, _ = _annotation_to_schema(non_none[0], label) | |
| return schema, True | |
| raise ValueError(f"Custom tool parameter {label} uses an unsupported union type.") | |
| if origin is Literal: | |
| values = list(args) | |
| if not values: | |
| raise ValueError(f"Custom tool parameter {label} uses an empty Literal.") | |
| value_types = {type(value) for value in values} | |
| if len(value_types) != 1 or next(iter(value_types)) not in {str, int, float, bool}: | |
| raise ValueError(f"Custom tool parameter {label} uses unsupported Literal values.") | |
| schema, _ = _annotation_to_schema(type(values[0]), label) | |
| schema["enum"] = values | |
| return schema, False | |
| if annotation is str: | |
| return {"type": "string"}, False | |
| if annotation is int: | |
| return {"type": "integer"}, False | |
| if annotation is float: | |
| return {"type": "number"}, False | |
| if annotation is bool: | |
| return {"type": "boolean"}, False | |
| if annotation is dict: | |
| return {"type": "object"}, False | |
| if annotation in (list, tuple): | |
| return {"type": "array"}, False | |
| if origin in (list, tuple, Sequence, AbcSequence): | |
| item_schema: dict[str, Any] = {} | |
| if args and args[0] is not Ellipsis: | |
| item_schema, _ = _annotation_to_schema(args[0], label) | |
| return {"type": "array", "items": item_schema}, False | |
| if origin is dict: | |
| key_type = args[0] if args else str | |
| if key_type is not str: | |
| raise ValueError(f"Custom tool parameter {label} dict keys must be str.") | |
| return {"type": "object"}, False | |
| raise ValueError(f"Custom tool parameter {label} has unsupported type annotation: {annotation!r}") | |