File size: 9,453 Bytes
399b80c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import json
from pathlib import Path
from datetime import datetime
from typing import Any, Dict, List, Optional

from openspace.utils.logging import Logger

logger = Logger.get_logger(__name__)

# Cache path in project root directory (OpenSpace/)
# __file__ = .../OpenSpace/openspace/grounding/backends/mcp/tool_cache.py
# parent x5 = .../OpenSpace/
DEFAULT_CACHE_PATH = Path(__file__).parent.parent.parent.parent.parent / "mcp_tool_cache.json"
# Sanitized cache path (Claude API compatible JSON Schema)
DEFAULT_SANITIZED_CACHE_PATH = Path(__file__).parent.parent.parent.parent.parent / "mcp_tool_cache_sanitized.json"


class MCPToolCache:
    """Simple file-based cache for MCP tool metadata."""
    
    CACHE_VERSION = 1
    
    def __init__(self, cache_path: Optional[Path] = None, sanitized_cache_path: Optional[Path] = None):
        self.cache_path = cache_path or DEFAULT_CACHE_PATH
        self.sanitized_cache_path = sanitized_cache_path or DEFAULT_SANITIZED_CACHE_PATH
        self._cache: Optional[Dict] = None
        self._sanitized_cache: Optional[Dict] = None
        self._server_order: Optional[List[str]] = None
    
    def set_server_order(self, order: List[str]):
        """Set expected server order (from config). Used when saving to disk."""
        self._server_order = order
    
    def _reorder_servers(self, servers: Dict[str, List[Dict]]) -> Dict[str, List[Dict]]:
        """Reorder servers dict according to _server_order."""
        if not self._server_order:
            return servers
        
        ordered = {}
        # First add servers in config order
        for name in self._server_order:
            if name in servers:
                ordered[name] = servers[name]
        # Then add any remaining servers (not in config)
        for name in servers:
            if name not in ordered:
                ordered[name] = servers[name]
        return ordered
    
    def _ensure_dir(self):
        """Ensure cache directory exists."""
        self.cache_path.parent.mkdir(parents=True, exist_ok=True)
    
    def load(self) -> Dict[str, Any]:
        """Load cache from disk. Returns empty dict if not exists."""
        if self._cache is not None:
            return self._cache
        
        if not self.cache_path.exists():
            self._cache = {"version": self.CACHE_VERSION, "servers": {}}
            return self._cache
        
        try:
            with open(self.cache_path, "r", encoding="utf-8") as f:
                self._cache = json.load(f)
            logger.info(f"Loaded MCP tool cache: {len(self._cache.get('servers', {}))} servers")
            return self._cache
        except Exception as e:
            logger.warning(f"Failed to load cache: {e}")
            self._cache = {"version": self.CACHE_VERSION, "servers": {}}
            return self._cache
    
    def save(self, servers: Dict[str, List[Dict]]):
        """
        Save tool metadata to disk (overwrites existing cache).
        
        Args:
            servers: Dict mapping server_name -> list of tool metadata dicts
                     Each tool dict should have: name, description, parameters
        """
        self._ensure_dir()
        
        cache_data = {
            "version": self.CACHE_VERSION,
            "updated_at": datetime.now().isoformat(),
            "servers": servers,
        }
        
        try:
            with open(self.cache_path, "w", encoding="utf-8") as f:
                json.dump(cache_data, f, indent=2, ensure_ascii=False)
            self._cache = cache_data
            logger.info(f"Saved MCP tool cache: {len(servers)} servers")
        except Exception as e:
            logger.error(f"Failed to save cache: {e}")
    
    def save_server(self, server_name: str, tools: List[Dict]):
        """
        Save/update a single server's tools to cache (incremental append).
        
        Args:
            server_name: Name of the MCP server
            tools: List of tool metadata dicts for this server
        """
        self._ensure_dir()
        
        # Load existing cache
        cache = self.load()
        
        # Update server entry
        if "servers" not in cache:
            cache["servers"] = {}
        cache["servers"][server_name] = tools
        cache["servers"] = self._reorder_servers(cache["servers"])
        cache["updated_at"] = datetime.now().isoformat()
        
        # Save back
        try:
            with open(self.cache_path, "w", encoding="utf-8") as f:
                json.dump(cache, f, indent=2, ensure_ascii=False)
            self._cache = cache
            logger.debug(f"Saved {len(tools)} tools for server '{server_name}'")
        except Exception as e:
            logger.error(f"Failed to save cache for server '{server_name}': {e}")
    
    def get_server_tools(self, server_name: str) -> Optional[List[Dict]]:
        """Get cached tools for a specific server."""
        cache = self.load()
        return cache.get("servers", {}).get(server_name)
    
    def get_all_tools(self) -> Dict[str, List[Dict]]:
        """Get all cached tools, grouped by server."""
        cache = self.load()
        return cache.get("servers", {})
    
    def has_cache(self) -> bool:
        """Check if cache exists and has data."""
        cache = self.load()
        return bool(cache.get("servers"))
    
    def clear(self):
        """Clear the cache."""
        if self.cache_path.exists():
            self.cache_path.unlink()
        self._cache = None
        logger.info("MCP tool cache cleared")
    
    def save_failed_server(self, server_name: str, error: str):
        """
        Record a failed server to cache.
        
        Args:
            server_name: Name of the failed MCP server
            error: Error message
        """
        self._ensure_dir()
        
        # Load existing cache
        cache = self.load()
        
        # Add to failed_servers list
        if "failed_servers" not in cache:
            cache["failed_servers"] = {}
        cache["failed_servers"][server_name] = {
            "error": error,
            "failed_at": datetime.now().isoformat(),
        }
        cache["updated_at"] = datetime.now().isoformat()
        
        # Save back
        try:
            with open(self.cache_path, "w", encoding="utf-8") as f:
                json.dump(cache, f, indent=2, ensure_ascii=False)
            self._cache = cache
        except Exception as e:
            logger.error(f"Failed to save failed server '{server_name}': {e}")
    
    def get_failed_servers(self) -> Dict[str, Dict]:
        """Get list of failed servers from cache."""
        cache = self.load()
        return cache.get("failed_servers", {})
    
    def load_sanitized(self) -> Dict[str, Any]:
        """Load sanitized cache from disk. Returns empty dict if not exists."""
        if self._sanitized_cache is not None:
            return self._sanitized_cache
        
        if not self.sanitized_cache_path.exists():
            self._sanitized_cache = {"version": self.CACHE_VERSION, "servers": {}}
            return self._sanitized_cache
        
        try:
            with open(self.sanitized_cache_path, "r", encoding="utf-8") as f:
                self._sanitized_cache = json.load(f)
            logger.info(f"Loaded sanitized MCP tool cache: {len(self._sanitized_cache.get('servers', {}))} servers")
            return self._sanitized_cache
        except Exception as e:
            logger.warning(f"Failed to load sanitized cache: {e}")
            self._sanitized_cache = {"version": self.CACHE_VERSION, "servers": {}}
            return self._sanitized_cache
    
    def save_sanitized(self, servers: Dict[str, List[Dict]]):
        """
        Save sanitized tool metadata to disk.
        
        Args:
            servers: Dict mapping server_name -> list of sanitized tool metadata dicts
        """
        self._ensure_dir()
        
        cache_data = {
            "version": self.CACHE_VERSION,
            "updated_at": datetime.now().isoformat(),
            "sanitized": True,
            "servers": servers,
        }
        
        try:
            with open(self.sanitized_cache_path, "w", encoding="utf-8") as f:
                json.dump(cache_data, f, indent=2, ensure_ascii=False)
            self._sanitized_cache = cache_data
            logger.info(f"Saved sanitized MCP tool cache: {len(servers)} servers")
        except Exception as e:
            logger.error(f"Failed to save sanitized cache: {e}")
    
    def get_all_sanitized_tools(self) -> Dict[str, List[Dict]]:
        """Get all sanitized cached tools, grouped by server."""
        cache = self.load_sanitized()
        return cache.get("servers", {})
    
    def has_sanitized_cache(self) -> bool:
        """Check if sanitized cache exists and has data."""
        cache = self.load_sanitized()
        return bool(cache.get("servers"))
    
    def clear_sanitized(self):
        """Clear the sanitized cache."""
        if self.sanitized_cache_path.exists():
            self.sanitized_cache_path.unlink()
        self._sanitized_cache = None
        logger.info("Sanitized MCP tool cache cleared")


# Global instance
_tool_cache: Optional[MCPToolCache] = None


def get_tool_cache() -> MCPToolCache:
    """Get global tool cache instance."""
    global _tool_cache
    if _tool_cache is None:
        _tool_cache = MCPToolCache()
    return _tool_cache