File size: 4,903 Bytes
02f4a63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""Shared test utilities for FastMCP tool access across versions.

FastMCP 3.0 (released 2026-02-19) changed internal APIs โ€” the
``_tool_manager._tools`` dict no longer exists.  This module provides
version-agnostic helpers **and** a pytest autouse fixture that patches
``mcp._tool_manager._tools`` back in so existing tests work unmodified.
"""

import types
import pytest


# โ”€โ”€ Version-agnostic tool access helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€


def _find_tools_dict(mcp) -> dict | None:
    """Probe FastMCP internals to locate the canonical tool registry.

    Returns the raw dict[str, ToolObj] if found, else ``None``.
    """
    # FastMCP 2.x path
    if hasattr(mcp, "_tool_manager"):
        tm = mcp._tool_manager
        if hasattr(tm, "_tools") and isinstance(tm._tools, dict):
            return tm._tools
        if hasattr(tm, "tools") and isinstance(tm.tools, dict):
            return tm.tools

    # FastMCP 3.x: tools stored directly on mcp
    if hasattr(mcp, "_tools") and isinstance(mcp._tools, dict):
        return mcp._tools

    return None


def _extract_fn(tool_obj):
    """Extract the underlying callable from a Tool wrapper object."""
    if hasattr(tool_obj, "fn"):
        return tool_obj.fn
    if callable(tool_obj):
        return tool_obj
    return None


def get_tool_fn(mcp, name):
    """Get a tool's callable function from a FastMCP instance by name.

    Supports FastMCP 2.x and 3.x.  Returns the raw function so it can
    be called directly in tests.
    """
    tools = _find_tools_dict(mcp)
    if tools is not None:
        tool = tools.get(name)
        if tool is not None:
            return _extract_fn(tool)
    return None


def get_tool_names(mcp) -> set:
    """Return the set of registered tool names."""
    tools = _find_tools_dict(mcp)
    return set(tools.keys()) if tools else set()


def get_tool_count(mcp) -> int:
    """Return the number of registered tools."""
    return len(get_tool_names(mcp))


class ToolWrapper:
    """Compatibility wrapper matching FastMCP 2.x Tool interface."""

    def __init__(self, fn):
        self.fn = fn


def get_tool_obj(mcp, name):
    """Get a tool as an object with a ``.fn`` attribute (FastMCP 2.x compat)."""
    fn = get_tool_fn(mcp, name)
    return ToolWrapper(fn) if fn is not None else None


def get_tools_dict(mcp) -> dict:
    """Return dict mapping tool names โ†’ ToolWrapper objects.

    Drop-in replacement for ``mcp._tool_manager._tools``.
    """
    names = get_tool_names(mcp)
    result = {}
    for name in names:
        fn = get_tool_fn(mcp, name)
        if fn is not None:
            result[name] = ToolWrapper(fn)
    return result


# โ”€โ”€ Autouse fixtures โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

# Monkey-patch FastMCP so that mcp._tool_manager._tools works on 3.x
# This is done via a module-level patch applied when conftest is imported.


def _patch_fastmcp():
    """Ensure FastMCP instances expose ``_tool_manager._tools`` on 3.x."""
    try:
        from fastmcp import FastMCP
    except ImportError:
        return  # fastmcp not installed โ€” nothing to patch

    original_tool = getattr(FastMCP, "tool", None)
    if original_tool is None:
        return

    # Check if _tool_manager._tools already works (FastMCP 2.x)
    test_mcp = FastMCP("__patch_test__")
    if hasattr(test_mcp, "_tool_manager") and hasattr(test_mcp._tool_manager, "_tools"):
        if isinstance(test_mcp._tool_manager._tools, dict):
            return  # Already compatible, no patch needed

    # FastMCP 3.x: We need to create a compatibility shim.
    # Override the tool() method to also store tools in a compat dict.
    _compat_registry = {}  # Will be shared per-mcp instance via __dict__

    original_init = FastMCP.__init__

    def patched_init(self, *args, **kwargs):
        original_init(self, *args, **kwargs)
        # Add compat _tool_manager._tools
        if not hasattr(self, "_tool_manager"):
            self._tool_manager = types.SimpleNamespace()
        if not hasattr(self._tool_manager, "_tools"):
            self._tool_manager._tools = {}

    def patched_tool(self, *args, **kwargs):
        original_decorator = original_tool(self, *args, **kwargs)

        def wrapper(fn):
            result = original_decorator(fn)
            # Also register in our compat dict
            if hasattr(self, "_tool_manager") and hasattr(self._tool_manager, "_tools"):
                self._tool_manager._tools[fn.__name__] = ToolWrapper(fn)
            return result

        return wrapper

    FastMCP.__init__ = patched_init
    FastMCP.tool = patched_tool


# Apply patch at import time (before any tests run)
_patch_fastmcp()