Spaces:
Sleeping
Sleeping
| # coding=utf-8 | |
| # Copyright 2024 HuggingFace Inc. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import os | |
| import tempfile | |
| import unittest | |
| from pathlib import Path | |
| from textwrap import dedent | |
| from typing import Any, Dict, List, Optional, Tuple, Union | |
| from unittest.mock import MagicMock, patch | |
| import mcp | |
| import numpy as np | |
| import pytest | |
| import torch | |
| from transformers import is_torch_available, is_vision_available | |
| from transformers.testing_utils import get_tests_dir | |
| from smolagents.agent_types import _AGENT_TYPE_MAPPING, AgentAudio, AgentImage, AgentText | |
| from smolagents.tools import AUTHORIZED_TYPES, Tool, ToolCollection, tool | |
| if is_torch_available(): | |
| import torch | |
| if is_vision_available(): | |
| from PIL import Image | |
| def create_inputs(tool_inputs: Dict[str, Dict[Union[str, type], str]]): | |
| inputs = {} | |
| for input_name, input_desc in tool_inputs.items(): | |
| input_type = input_desc["type"] | |
| if input_type == "string": | |
| inputs[input_name] = "Text input" | |
| elif input_type == "image": | |
| inputs[input_name] = Image.open(Path(get_tests_dir("fixtures")) / "000000039769.png").resize((512, 512)) | |
| elif input_type == "audio": | |
| inputs[input_name] = np.ones(3000) | |
| else: | |
| raise ValueError(f"Invalid type requested: {input_type}") | |
| return inputs | |
| def output_type(output): | |
| if isinstance(output, (str, AgentText)): | |
| return "string" | |
| elif isinstance(output, (Image.Image, AgentImage)): | |
| return "image" | |
| elif isinstance(output, (torch.Tensor, AgentAudio)): | |
| return "audio" | |
| else: | |
| raise TypeError(f"Invalid output: {output}") | |
| class ToolTesterMixin: | |
| def test_inputs_output(self): | |
| self.assertTrue(hasattr(self.tool, "inputs")) | |
| self.assertTrue(hasattr(self.tool, "output_type")) | |
| inputs = self.tool.inputs | |
| self.assertTrue(isinstance(inputs, dict)) | |
| for _, input_spec in inputs.items(): | |
| self.assertTrue("type" in input_spec) | |
| self.assertTrue("description" in input_spec) | |
| self.assertTrue(input_spec["type"] in AUTHORIZED_TYPES) | |
| self.assertTrue(isinstance(input_spec["description"], str)) | |
| output_type = self.tool.output_type | |
| self.assertTrue(output_type in AUTHORIZED_TYPES) | |
| def test_common_attributes(self): | |
| self.assertTrue(hasattr(self.tool, "description")) | |
| self.assertTrue(hasattr(self.tool, "name")) | |
| self.assertTrue(hasattr(self.tool, "inputs")) | |
| self.assertTrue(hasattr(self.tool, "output_type")) | |
| def test_agent_type_output(self): | |
| inputs = create_inputs(self.tool.inputs) | |
| output = self.tool(**inputs, sanitize_inputs_outputs=True) | |
| if self.tool.output_type != "any": | |
| agent_type = _AGENT_TYPE_MAPPING[self.tool.output_type] | |
| self.assertTrue(isinstance(output, agent_type)) | |
| class ToolTests(unittest.TestCase): | |
| def test_tool_init_with_decorator(self): | |
| def coolfunc(a: str, b: int) -> float: | |
| """Cool function | |
| Args: | |
| a: The first argument | |
| b: The second one | |
| """ | |
| return b + 2, a | |
| assert coolfunc.output_type == "number" | |
| def test_tool_init_vanilla(self): | |
| class HFModelDownloadsTool(Tool): | |
| name = "model_download_counter" | |
| description = """ | |
| This is a tool that returns the most downloaded model of a given task on the Hugging Face Hub. | |
| It returns the name of the checkpoint.""" | |
| inputs = { | |
| "task": { | |
| "type": "string", | |
| "description": "the task category (such as text-classification, depth-estimation, etc)", | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, task: str) -> str: | |
| return "best model" | |
| tool = HFModelDownloadsTool() | |
| assert list(tool.inputs.keys())[0] == "task" | |
| def test_tool_init_decorator_raises_issues(self): | |
| with pytest.raises(Exception) as e: | |
| def coolfunc(a: str, b: int): | |
| """Cool function | |
| Args: | |
| a: The first argument | |
| b: The second one | |
| """ | |
| return a + b | |
| assert coolfunc.output_type == "number" | |
| assert "Tool return type not found" in str(e) | |
| with pytest.raises(Exception) as e: | |
| def coolfunc(a: str, b: int) -> int: | |
| """Cool function | |
| Args: | |
| a: The first argument | |
| """ | |
| return b + a | |
| assert coolfunc.output_type == "number" | |
| assert "docstring has no description for the argument" in str(e) | |
| def test_saving_tool_raises_error_imports_outside_function(self): | |
| with pytest.raises(Exception) as e: | |
| import numpy as np | |
| def get_current_time() -> str: | |
| """ | |
| Gets the current time. | |
| """ | |
| return str(np.random.random()) | |
| get_current_time.save("output") | |
| assert "np" in str(e) | |
| # Also test with classic definition | |
| with pytest.raises(Exception) as e: | |
| class GetCurrentTimeTool(Tool): | |
| name = "get_current_time_tool" | |
| description = "Gets the current time" | |
| inputs = {} | |
| output_type = "string" | |
| def forward(self): | |
| return str(np.random.random()) | |
| get_current_time = GetCurrentTimeTool() | |
| get_current_time.save("output") | |
| assert "np" in str(e) | |
| def test_tool_definition_raises_no_error_imports_in_function(self): | |
| def get_current_time() -> str: | |
| """ | |
| Gets the current time. | |
| """ | |
| from datetime import datetime | |
| return str(datetime.now()) | |
| class GetCurrentTimeTool(Tool): | |
| name = "get_current_time_tool" | |
| description = "Gets the current time" | |
| inputs = {} | |
| output_type = "string" | |
| def forward(self): | |
| from datetime import datetime | |
| return str(datetime.now()) | |
| def test_tool_to_dict_allows_no_arg_in_init(self): | |
| """Test that a tool cannot be saved with required args in init""" | |
| class FailTool(Tool): | |
| name = "specific" | |
| description = "test description" | |
| inputs = {"string_input": {"type": "string", "description": "input description"}} | |
| output_type = "string" | |
| def __init__(self, url): | |
| super().__init__(self) | |
| self.url = url | |
| def forward(self, string_input: str) -> str: | |
| return self.url + string_input | |
| fail_tool = FailTool("dummy_url") | |
| with pytest.raises(Exception) as e: | |
| fail_tool.to_dict() | |
| assert "Parameters in __init__ must have default values, found required parameters" in str(e) | |
| class PassTool(Tool): | |
| name = "specific" | |
| description = "test description" | |
| inputs = {"string_input": {"type": "string", "description": "input description"}} | |
| output_type = "string" | |
| def __init__(self, url: Optional[str] = "none"): | |
| super().__init__(self) | |
| self.url = url | |
| def forward(self, string_input: str) -> str: | |
| return self.url + string_input | |
| fail_tool = PassTool() | |
| fail_tool.to_dict() | |
| def test_saving_tool_allows_no_imports_from_outside_methods(self): | |
| # Test that using imports from outside functions fails | |
| import numpy as np | |
| class FailTool(Tool): | |
| name = "specific" | |
| description = "test description" | |
| inputs = {"string_input": {"type": "string", "description": "input description"}} | |
| output_type = "string" | |
| def useless_method(self): | |
| self.client = np.random.random() | |
| return "" | |
| def forward(self, string_input): | |
| return self.useless_method() + string_input | |
| fail_tool = FailTool() | |
| with pytest.raises(Exception) as e: | |
| fail_tool.save("output") | |
| assert "'np' is undefined" in str(e) | |
| # Test that putting these imports inside functions works | |
| class SuccessTool(Tool): | |
| name = "specific" | |
| description = "test description" | |
| inputs = {"string_input": {"type": "string", "description": "input description"}} | |
| output_type = "string" | |
| def useless_method(self): | |
| import numpy as np | |
| self.client = np.random.random() | |
| return "" | |
| def forward(self, string_input): | |
| return self.useless_method() + string_input | |
| success_tool = SuccessTool() | |
| success_tool.save("output") | |
| def test_tool_missing_class_attributes_raises_error(self): | |
| with pytest.raises(Exception) as e: | |
| class GetWeatherTool(Tool): | |
| name = "get_weather" | |
| description = "Get weather in the next days at given location." | |
| inputs = { | |
| "location": {"type": "string", "description": "the location"}, | |
| "celsius": { | |
| "type": "string", | |
| "description": "the temperature type", | |
| }, | |
| } | |
| def forward(self, location: str, celsius: Optional[bool] = False) -> str: | |
| return "The weather is UNGODLY with torrential rains and temperatures below -10°C" | |
| GetWeatherTool() | |
| assert "You must set an attribute output_type" in str(e) | |
| def test_tool_from_decorator_optional_args(self): | |
| def get_weather(location: str, celsius: Optional[bool] = False) -> str: | |
| """ | |
| Get weather in the next days at given location. | |
| Secretly this tool does not care about the location, it hates the weather everywhere. | |
| Args: | |
| location: the location | |
| celsius: the temperature type | |
| """ | |
| return "The weather is UNGODLY with torrential rains and temperatures below -10°C" | |
| assert "nullable" in get_weather.inputs["celsius"] | |
| assert get_weather.inputs["celsius"]["nullable"] | |
| assert "nullable" not in get_weather.inputs["location"] | |
| def test_tool_mismatching_nullable_args_raises_error(self): | |
| with pytest.raises(Exception) as e: | |
| class GetWeatherTool(Tool): | |
| name = "get_weather" | |
| description = "Get weather in the next days at given location." | |
| inputs = { | |
| "location": {"type": "string", "description": "the location"}, | |
| "celsius": { | |
| "type": "string", | |
| "description": "the temperature type", | |
| }, | |
| } | |
| output_type = "string" | |
| def forward(self, location: str, celsius: Optional[bool] = False) -> str: | |
| return "The weather is UNGODLY with torrential rains and temperatures below -10°C" | |
| GetWeatherTool() | |
| assert "Nullable" in str(e) | |
| with pytest.raises(Exception) as e: | |
| class GetWeatherTool2(Tool): | |
| name = "get_weather" | |
| description = "Get weather in the next days at given location." | |
| inputs = { | |
| "location": {"type": "string", "description": "the location"}, | |
| "celsius": { | |
| "type": "string", | |
| "description": "the temperature type", | |
| }, | |
| } | |
| output_type = "string" | |
| def forward(self, location: str, celsius: bool = False) -> str: | |
| return "The weather is UNGODLY with torrential rains and temperatures below -10°C" | |
| GetWeatherTool2() | |
| assert "Nullable" in str(e) | |
| with pytest.raises(Exception) as e: | |
| class GetWeatherTool3(Tool): | |
| name = "get_weather" | |
| description = "Get weather in the next days at given location." | |
| inputs = { | |
| "location": {"type": "string", "description": "the location"}, | |
| "celsius": { | |
| "type": "string", | |
| "description": "the temperature type", | |
| "nullable": True, | |
| }, | |
| } | |
| output_type = "string" | |
| def forward(self, location, celsius: str) -> str: | |
| return "The weather is UNGODLY with torrential rains and temperatures below -10°C" | |
| GetWeatherTool3() | |
| assert "Nullable" in str(e) | |
| def test_tool_default_parameters_is_nullable(self): | |
| def get_weather(location: str, celsius: bool = False) -> str: | |
| """ | |
| Get weather in the next days at given location. | |
| Args: | |
| location: The location to get the weather for. | |
| celsius: is the temperature given in celsius? | |
| """ | |
| return "The weather is UNGODLY with torrential rains and temperatures below -10°C" | |
| assert get_weather.inputs["celsius"]["nullable"] | |
| def test_tool_supports_any_none(self): | |
| def get_weather(location: Any) -> None: | |
| """ | |
| Get weather in the next days at given location. | |
| Args: | |
| location: The location to get the weather for. | |
| """ | |
| return | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| get_weather.save(tmp_dir) | |
| assert get_weather.inputs["location"]["type"] == "any" | |
| assert get_weather.output_type == "null" | |
| def test_tool_supports_array(self): | |
| def get_weather(locations: List[str], months: Optional[Tuple[str, str]] = None) -> Dict[str, float]: | |
| """ | |
| Get weather in the next days at given locations. | |
| Args: | |
| locations: The locations to get the weather for. | |
| months: The months to get the weather for | |
| """ | |
| return | |
| assert get_weather.inputs["locations"]["type"] == "array" | |
| assert get_weather.inputs["months"]["type"] == "array" | |
| def test_saving_tool_produces_valid_pyhon_code_with_multiline_description(self): | |
| def get_weather(location: Any) -> None: | |
| """ | |
| Get weather in the next days at given location. | |
| And works pretty well. | |
| Args: | |
| location: The location to get the weather for. | |
| """ | |
| return | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| get_weather.save(tmp_dir) | |
| with open(os.path.join(tmp_dir, "tool.py"), "r", encoding="utf-8") as f: | |
| source_code = f.read() | |
| compile(source_code, f.name, "exec") | |
| def test_saving_tool_produces_valid_python_code_with_complex_name(self): | |
| # Test one cannot save tool with additional args in init | |
| class FailTool(Tool): | |
| name = 'spe"\rcific' | |
| description = """test \n\r | |
| description""" | |
| inputs = {"string_input": {"type": "string", "description": "input description"}} | |
| output_type = "string" | |
| def __init__(self): | |
| super().__init__(self) | |
| def forward(self, string_input): | |
| return "foo" | |
| fail_tool = FailTool() | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| fail_tool.save(tmp_dir) | |
| with open(os.path.join(tmp_dir, "tool.py"), "r", encoding="utf-8") as f: | |
| source_code = f.read() | |
| compile(source_code, f.name, "exec") | |
| def mock_server_parameters(): | |
| return MagicMock() | |
| def mock_mcp_adapt(): | |
| with patch("mcpadapt.core.MCPAdapt") as mock: | |
| mock.return_value.__enter__.return_value = ["tool1", "tool2"] | |
| mock.return_value.__exit__.return_value = None | |
| yield mock | |
| def mock_smolagents_adapter(): | |
| with patch("mcpadapt.smolagents_adapter.SmolAgentsAdapter") as mock: | |
| yield mock | |
| class TestToolCollection: | |
| def test_from_mcp(self, mock_server_parameters, mock_mcp_adapt, mock_smolagents_adapter): | |
| with ToolCollection.from_mcp(mock_server_parameters) as tool_collection: | |
| assert isinstance(tool_collection, ToolCollection) | |
| assert len(tool_collection.tools) == 2 | |
| assert "tool1" in tool_collection.tools | |
| assert "tool2" in tool_collection.tools | |
| def test_integration_from_mcp(self): | |
| # define the most simple mcp server with one tool that echoes the input text | |
| mcp_server_script = dedent("""\ | |
| from mcp.server.fastmcp import FastMCP | |
| mcp = FastMCP("Echo Server") | |
| @mcp.tool() | |
| def echo_tool(text: str) -> str: | |
| return text | |
| mcp.run() | |
| """).strip() | |
| mcp_server_params = mcp.StdioServerParameters( | |
| command="python", | |
| args=["-c", mcp_server_script], | |
| ) | |
| with ToolCollection.from_mcp(mcp_server_params) as tool_collection: | |
| assert len(tool_collection.tools) == 1, "Expected 1 tool" | |
| assert tool_collection.tools[0].name == "echo_tool", "Expected tool name to be 'echo_tool'" | |
| assert tool_collection.tools[0](text="Hello") == "Hello", "Expected tool to echo the input text" | |