"""Python function tools for the public ResearchHarness embedding API.""" from __future__ import annotations import inspect import re from collections.abc import Callable, Sequence as AbcSequence from types import UnionType from typing import Any, Literal, Sequence, Union, get_args, get_origin, get_type_hints from agent_base.tools.tooling import ToolBase TOOL_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_-]{0,63}$") CONTEXT_PARAMETER_NAMES = frozenset({"workspace_root", "runtime_deadline", "model_name"}) class FunctionTool(ToolBase): """ToolBase adapter for a validated Python function.""" def __init__(self, func: Callable[..., Any], *, name: str | None = None, description: str | None = None): self.func = func self._context_parameters: set[str] = set() self.name = _resolve_tool_name(func, name) self.description = _resolve_tool_description(func, description) self.parameters = _schema_from_signature(func, self._context_parameters) super().__init__() def call(self, params: str | dict[str, Any], **kwargs: Any) -> Any: parsed = self.parse_json_args(params) call_kwargs = dict(parsed) for name in self._context_parameters: if name in kwargs: call_kwargs[name] = kwargs[name] return self.func(**call_kwargs) def tool( func: Callable[..., Any] | None = None, *, name: str | None = None, description: str | None = None, ) -> Callable[..., Any]: """Mark a Python function as a ResearchHarness custom tool. The decorated function remains directly callable. ResearchHarness converts it into a ToolBase instance when passed to create_agent(tools=[...]). """ def decorate(inner: Callable[..., Any]) -> Callable[..., Any]: if not callable(inner): raise TypeError("@tool can only decorate a callable.") setattr(inner, "__researchharness_tool__", {"name": name, "description": description}) return inner if func is None: return decorate return decorate(func) def build_custom_tool_map(custom_tools: Sequence[Any] | None) -> dict[str, ToolBase]: """Validate and instantiate user-provided custom tools.""" resolved: dict[str, ToolBase] = {} for item in custom_tools or []: tool_obj = _coerce_custom_tool(item) if tool_obj.name in resolved: raise ValueError(f"Duplicate custom tool name: {tool_obj.name}") resolved[tool_obj.name] = tool_obj return resolved def _coerce_custom_tool(item: Any) -> ToolBase: if isinstance(item, ToolBase): return item if callable(item): metadata = getattr(item, "__researchharness_tool__", None) if not isinstance(metadata, dict): raise ValueError( f"Custom tool function {getattr(item, '__name__', item)!r} must be decorated with @researchharness.tool." ) return FunctionTool( item, name=metadata.get("name"), description=metadata.get("description"), ) raise ValueError(f"Custom tool must be a decorated function or ToolBase instance, got {type(item).__name__}.") def _resolve_tool_name(func: Callable[..., Any], override: str | None) -> str: name = str(override or getattr(func, "__name__", "")).strip() if not name: raise ValueError("Custom tool name must be non-empty.") if not TOOL_NAME_RE.fullmatch(name): raise ValueError( f"Invalid custom tool name {name!r}. Use 1-64 characters: letters, numbers, underscore, or hyphen; start with a letter or underscore." ) return name def _resolve_tool_description(func: Callable[..., Any], override: str | None) -> str: description = str(override or inspect.getdoc(func) or "").strip() if not description: raise ValueError(f"Custom tool {getattr(func, '__name__', '')!r} must have a docstring or description.") return description def _schema_from_signature(func: Callable[..., Any], context_parameters: set[str]) -> dict[str, Any]: signature = inspect.signature(func) try: hints = get_type_hints(func) except Exception as exc: raise ValueError(f"Could not resolve type hints for custom tool {func.__name__}: {exc}") from exc properties: dict[str, Any] = {} required: list[str] = [] for param in signature.parameters.values(): if param.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD): raise ValueError(f"Custom tool {func.__name__} may not use *args or **kwargs.") if param.kind == inspect.Parameter.POSITIONAL_ONLY: raise ValueError(f"Custom tool {func.__name__} may not use positional-only parameters.") if param.name in CONTEXT_PARAMETER_NAMES: if param.kind is not inspect.Parameter.KEYWORD_ONLY: raise ValueError(f"Context parameter {param.name!r} in custom tool {func.__name__} must be keyword-only.") context_parameters.add(param.name) continue if param.name not in hints: raise ValueError(f"Custom tool {func.__name__} parameter {param.name!r} must have a type annotation.") schema, nullable = _annotation_to_schema(hints[param.name], f"{func.__name__}.{param.name}") if param.default is inspect.Parameter.empty and not nullable: required.append(param.name) elif param.default is not inspect.Parameter.empty: schema["default"] = param.default properties[param.name] = schema return { "type": "object", "properties": properties, "required": required, "additionalProperties": False, } def _annotation_to_schema(annotation: Any, label: str) -> tuple[dict[str, Any], bool]: origin = get_origin(annotation) args = get_args(annotation) if annotation is Any: raise ValueError(f"Custom tool parameter {label} may not use Any; use a concrete JSON-compatible type.") if origin in (UnionType, Union): non_none = [arg for arg in args if arg is not type(None)] if len(non_none) == 1 and len(non_none) != len(args): schema, _ = _annotation_to_schema(non_none[0], label) return schema, True raise ValueError(f"Custom tool parameter {label} uses an unsupported union type.") if origin is Literal: values = list(args) if not values: raise ValueError(f"Custom tool parameter {label} uses an empty Literal.") value_types = {type(value) for value in values} if len(value_types) != 1 or next(iter(value_types)) not in {str, int, float, bool}: raise ValueError(f"Custom tool parameter {label} uses unsupported Literal values.") schema, _ = _annotation_to_schema(type(values[0]), label) schema["enum"] = values return schema, False if annotation is str: return {"type": "string"}, False if annotation is int: return {"type": "integer"}, False if annotation is float: return {"type": "number"}, False if annotation is bool: return {"type": "boolean"}, False if annotation is dict: return {"type": "object"}, False if annotation in (list, tuple): return {"type": "array"}, False if origin in (list, tuple, Sequence, AbcSequence): item_schema: dict[str, Any] = {} if args and args[0] is not Ellipsis: item_schema, _ = _annotation_to_schema(args[0], label) return {"type": "array", "items": item_schema}, False if origin is dict: key_type = args[0] if args else str if key_type is not str: raise ValueError(f"Custom tool parameter {label} dict keys must be str.") return {"type": "object"}, False raise ValueError(f"Custom tool parameter {label} has unsupported type annotation: {annotation!r}")