File size: 4,903 Bytes
02f4a63 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | """Shared test utilities for FastMCP tool access across versions.
FastMCP 3.0 (released 2026-02-19) changed internal APIs โ the
``_tool_manager._tools`` dict no longer exists. This module provides
version-agnostic helpers **and** a pytest autouse fixture that patches
``mcp._tool_manager._tools`` back in so existing tests work unmodified.
"""
import types
import pytest
# โโ Version-agnostic tool access helpers โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def _find_tools_dict(mcp) -> dict | None:
"""Probe FastMCP internals to locate the canonical tool registry.
Returns the raw dict[str, ToolObj] if found, else ``None``.
"""
# FastMCP 2.x path
if hasattr(mcp, "_tool_manager"):
tm = mcp._tool_manager
if hasattr(tm, "_tools") and isinstance(tm._tools, dict):
return tm._tools
if hasattr(tm, "tools") and isinstance(tm.tools, dict):
return tm.tools
# FastMCP 3.x: tools stored directly on mcp
if hasattr(mcp, "_tools") and isinstance(mcp._tools, dict):
return mcp._tools
return None
def _extract_fn(tool_obj):
"""Extract the underlying callable from a Tool wrapper object."""
if hasattr(tool_obj, "fn"):
return tool_obj.fn
if callable(tool_obj):
return tool_obj
return None
def get_tool_fn(mcp, name):
"""Get a tool's callable function from a FastMCP instance by name.
Supports FastMCP 2.x and 3.x. Returns the raw function so it can
be called directly in tests.
"""
tools = _find_tools_dict(mcp)
if tools is not None:
tool = tools.get(name)
if tool is not None:
return _extract_fn(tool)
return None
def get_tool_names(mcp) -> set:
"""Return the set of registered tool names."""
tools = _find_tools_dict(mcp)
return set(tools.keys()) if tools else set()
def get_tool_count(mcp) -> int:
"""Return the number of registered tools."""
return len(get_tool_names(mcp))
class ToolWrapper:
"""Compatibility wrapper matching FastMCP 2.x Tool interface."""
def __init__(self, fn):
self.fn = fn
def get_tool_obj(mcp, name):
"""Get a tool as an object with a ``.fn`` attribute (FastMCP 2.x compat)."""
fn = get_tool_fn(mcp, name)
return ToolWrapper(fn) if fn is not None else None
def get_tools_dict(mcp) -> dict:
"""Return dict mapping tool names โ ToolWrapper objects.
Drop-in replacement for ``mcp._tool_manager._tools``.
"""
names = get_tool_names(mcp)
result = {}
for name in names:
fn = get_tool_fn(mcp, name)
if fn is not None:
result[name] = ToolWrapper(fn)
return result
# โโ Autouse fixtures โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Monkey-patch FastMCP so that mcp._tool_manager._tools works on 3.x
# This is done via a module-level patch applied when conftest is imported.
def _patch_fastmcp():
"""Ensure FastMCP instances expose ``_tool_manager._tools`` on 3.x."""
try:
from fastmcp import FastMCP
except ImportError:
return # fastmcp not installed โ nothing to patch
original_tool = getattr(FastMCP, "tool", None)
if original_tool is None:
return
# Check if _tool_manager._tools already works (FastMCP 2.x)
test_mcp = FastMCP("__patch_test__")
if hasattr(test_mcp, "_tool_manager") and hasattr(test_mcp._tool_manager, "_tools"):
if isinstance(test_mcp._tool_manager._tools, dict):
return # Already compatible, no patch needed
# FastMCP 3.x: We need to create a compatibility shim.
# Override the tool() method to also store tools in a compat dict.
_compat_registry = {} # Will be shared per-mcp instance via __dict__
original_init = FastMCP.__init__
def patched_init(self, *args, **kwargs):
original_init(self, *args, **kwargs)
# Add compat _tool_manager._tools
if not hasattr(self, "_tool_manager"):
self._tool_manager = types.SimpleNamespace()
if not hasattr(self._tool_manager, "_tools"):
self._tool_manager._tools = {}
def patched_tool(self, *args, **kwargs):
original_decorator = original_tool(self, *args, **kwargs)
def wrapper(fn):
result = original_decorator(fn)
# Also register in our compat dict
if hasattr(self, "_tool_manager") and hasattr(self._tool_manager, "_tools"):
self._tool_manager._tools[fn.__name__] = ToolWrapper(fn)
return result
return wrapper
FastMCP.__init__ = patched_init
FastMCP.tool = patched_tool
# Apply patch at import time (before any tests run)
_patch_fastmcp()
|