File size: 7,938 Bytes
75ff73e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""Python function tools for the public ResearchHarness embedding API."""

from __future__ import annotations

import inspect
import re
from collections.abc import Callable, Sequence as AbcSequence
from types import UnionType
from typing import Any, Literal, Sequence, Union, get_args, get_origin, get_type_hints

from agent_base.tools.tooling import ToolBase


TOOL_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_-]{0,63}$")
CONTEXT_PARAMETER_NAMES = frozenset({"workspace_root", "runtime_deadline", "model_name"})


class FunctionTool(ToolBase):
    """ToolBase adapter for a validated Python function."""

    def __init__(self, func: Callable[..., Any], *, name: str | None = None, description: str | None = None):
        self.func = func
        self._context_parameters: set[str] = set()
        self.name = _resolve_tool_name(func, name)
        self.description = _resolve_tool_description(func, description)
        self.parameters = _schema_from_signature(func, self._context_parameters)
        super().__init__()

    def call(self, params: str | dict[str, Any], **kwargs: Any) -> Any:
        parsed = self.parse_json_args(params)
        call_kwargs = dict(parsed)
        for name in self._context_parameters:
            if name in kwargs:
                call_kwargs[name] = kwargs[name]
        return self.func(**call_kwargs)


def tool(
    func: Callable[..., Any] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
) -> Callable[..., Any]:
    """Mark a Python function as a ResearchHarness custom tool.

    The decorated function remains directly callable. ResearchHarness converts it
    into a ToolBase instance when passed to create_agent(tools=[...]).
    """

    def decorate(inner: Callable[..., Any]) -> Callable[..., Any]:
        if not callable(inner):
            raise TypeError("@tool can only decorate a callable.")
        setattr(inner, "__researchharness_tool__", {"name": name, "description": description})
        return inner

    if func is None:
        return decorate
    return decorate(func)


def build_custom_tool_map(custom_tools: Sequence[Any] | None) -> dict[str, ToolBase]:
    """Validate and instantiate user-provided custom tools."""

    resolved: dict[str, ToolBase] = {}
    for item in custom_tools or []:
        tool_obj = _coerce_custom_tool(item)
        if tool_obj.name in resolved:
            raise ValueError(f"Duplicate custom tool name: {tool_obj.name}")
        resolved[tool_obj.name] = tool_obj
    return resolved


def _coerce_custom_tool(item: Any) -> ToolBase:
    if isinstance(item, ToolBase):
        return item
    if callable(item):
        metadata = getattr(item, "__researchharness_tool__", None)
        if not isinstance(metadata, dict):
            raise ValueError(
                f"Custom tool function {getattr(item, '__name__', item)!r} must be decorated with @researchharness.tool."
            )
        return FunctionTool(
            item,
            name=metadata.get("name"),
            description=metadata.get("description"),
        )
    raise ValueError(f"Custom tool must be a decorated function or ToolBase instance, got {type(item).__name__}.")


def _resolve_tool_name(func: Callable[..., Any], override: str | None) -> str:
    name = str(override or getattr(func, "__name__", "")).strip()
    if not name:
        raise ValueError("Custom tool name must be non-empty.")
    if not TOOL_NAME_RE.fullmatch(name):
        raise ValueError(
            f"Invalid custom tool name {name!r}. Use 1-64 characters: letters, numbers, underscore, or hyphen; start with a letter or underscore."
        )
    return name


def _resolve_tool_description(func: Callable[..., Any], override: str | None) -> str:
    description = str(override or inspect.getdoc(func) or "").strip()
    if not description:
        raise ValueError(f"Custom tool {getattr(func, '__name__', '<callable>')!r} must have a docstring or description.")
    return description


def _schema_from_signature(func: Callable[..., Any], context_parameters: set[str]) -> dict[str, Any]:
    signature = inspect.signature(func)
    try:
        hints = get_type_hints(func)
    except Exception as exc:
        raise ValueError(f"Could not resolve type hints for custom tool {func.__name__}: {exc}") from exc

    properties: dict[str, Any] = {}
    required: list[str] = []
    for param in signature.parameters.values():
        if param.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD):
            raise ValueError(f"Custom tool {func.__name__} may not use *args or **kwargs.")
        if param.kind == inspect.Parameter.POSITIONAL_ONLY:
            raise ValueError(f"Custom tool {func.__name__} may not use positional-only parameters.")
        if param.name in CONTEXT_PARAMETER_NAMES:
            if param.kind is not inspect.Parameter.KEYWORD_ONLY:
                raise ValueError(f"Context parameter {param.name!r} in custom tool {func.__name__} must be keyword-only.")
            context_parameters.add(param.name)
            continue
        if param.name not in hints:
            raise ValueError(f"Custom tool {func.__name__} parameter {param.name!r} must have a type annotation.")
        schema, nullable = _annotation_to_schema(hints[param.name], f"{func.__name__}.{param.name}")
        if param.default is inspect.Parameter.empty and not nullable:
            required.append(param.name)
        elif param.default is not inspect.Parameter.empty:
            schema["default"] = param.default
        properties[param.name] = schema

    return {
        "type": "object",
        "properties": properties,
        "required": required,
        "additionalProperties": False,
    }


def _annotation_to_schema(annotation: Any, label: str) -> tuple[dict[str, Any], bool]:
    origin = get_origin(annotation)
    args = get_args(annotation)

    if annotation is Any:
        raise ValueError(f"Custom tool parameter {label} may not use Any; use a concrete JSON-compatible type.")
    if origin in (UnionType, Union):
        non_none = [arg for arg in args if arg is not type(None)]
        if len(non_none) == 1 and len(non_none) != len(args):
            schema, _ = _annotation_to_schema(non_none[0], label)
            return schema, True
        raise ValueError(f"Custom tool parameter {label} uses an unsupported union type.")
    if origin is Literal:
        values = list(args)
        if not values:
            raise ValueError(f"Custom tool parameter {label} uses an empty Literal.")
        value_types = {type(value) for value in values}
        if len(value_types) != 1 or next(iter(value_types)) not in {str, int, float, bool}:
            raise ValueError(f"Custom tool parameter {label} uses unsupported Literal values.")
        schema, _ = _annotation_to_schema(type(values[0]), label)
        schema["enum"] = values
        return schema, False
    if annotation is str:
        return {"type": "string"}, False
    if annotation is int:
        return {"type": "integer"}, False
    if annotation is float:
        return {"type": "number"}, False
    if annotation is bool:
        return {"type": "boolean"}, False
    if annotation is dict:
        return {"type": "object"}, False
    if annotation in (list, tuple):
        return {"type": "array"}, False
    if origin in (list, tuple, Sequence, AbcSequence):
        item_schema: dict[str, Any] = {}
        if args and args[0] is not Ellipsis:
            item_schema, _ = _annotation_to_schema(args[0], label)
        return {"type": "array", "items": item_schema}, False
    if origin is dict:
        key_type = args[0] if args else str
        if key_type is not str:
            raise ValueError(f"Custom tool parameter {label} dict keys must be str.")
        return {"type": "object"}, False
    raise ValueError(f"Custom tool parameter {label} has unsupported type annotation: {annotation!r}")