Spaces:
Running
Running
| import json | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional | |
| from openspace.utils.logging import Logger | |
| logger = Logger.get_logger(__name__) | |
| # Cache path in project root directory (OpenSpace/) | |
| # __file__ = .../OpenSpace/openspace/grounding/backends/mcp/tool_cache.py | |
| # parent x5 = .../OpenSpace/ | |
| DEFAULT_CACHE_PATH = Path(__file__).parent.parent.parent.parent.parent / "mcp_tool_cache.json" | |
| # Sanitized cache path (Claude API compatible JSON Schema) | |
| DEFAULT_SANITIZED_CACHE_PATH = Path(__file__).parent.parent.parent.parent.parent / "mcp_tool_cache_sanitized.json" | |
| class MCPToolCache: | |
| """Simple file-based cache for MCP tool metadata.""" | |
| CACHE_VERSION = 1 | |
| def __init__(self, cache_path: Optional[Path] = None, sanitized_cache_path: Optional[Path] = None): | |
| self.cache_path = cache_path or DEFAULT_CACHE_PATH | |
| self.sanitized_cache_path = sanitized_cache_path or DEFAULT_SANITIZED_CACHE_PATH | |
| self._cache: Optional[Dict] = None | |
| self._sanitized_cache: Optional[Dict] = None | |
| self._server_order: Optional[List[str]] = None | |
| def set_server_order(self, order: List[str]): | |
| """Set expected server order (from config). Used when saving to disk.""" | |
| self._server_order = order | |
| def _reorder_servers(self, servers: Dict[str, List[Dict]]) -> Dict[str, List[Dict]]: | |
| """Reorder servers dict according to _server_order.""" | |
| if not self._server_order: | |
| return servers | |
| ordered = {} | |
| # First add servers in config order | |
| for name in self._server_order: | |
| if name in servers: | |
| ordered[name] = servers[name] | |
| # Then add any remaining servers (not in config) | |
| for name in servers: | |
| if name not in ordered: | |
| ordered[name] = servers[name] | |
| return ordered | |
| def _ensure_dir(self): | |
| """Ensure cache directory exists.""" | |
| self.cache_path.parent.mkdir(parents=True, exist_ok=True) | |
| def load(self) -> Dict[str, Any]: | |
| """Load cache from disk. Returns empty dict if not exists.""" | |
| if self._cache is not None: | |
| return self._cache | |
| if not self.cache_path.exists(): | |
| self._cache = {"version": self.CACHE_VERSION, "servers": {}} | |
| return self._cache | |
| try: | |
| with open(self.cache_path, "r", encoding="utf-8") as f: | |
| self._cache = json.load(f) | |
| logger.info(f"Loaded MCP tool cache: {len(self._cache.get('servers', {}))} servers") | |
| return self._cache | |
| except Exception as e: | |
| logger.warning(f"Failed to load cache: {e}") | |
| self._cache = {"version": self.CACHE_VERSION, "servers": {}} | |
| return self._cache | |
| def save(self, servers: Dict[str, List[Dict]]): | |
| """ | |
| Save tool metadata to disk (overwrites existing cache). | |
| Args: | |
| servers: Dict mapping server_name -> list of tool metadata dicts | |
| Each tool dict should have: name, description, parameters | |
| """ | |
| self._ensure_dir() | |
| cache_data = { | |
| "version": self.CACHE_VERSION, | |
| "updated_at": datetime.now().isoformat(), | |
| "servers": servers, | |
| } | |
| try: | |
| with open(self.cache_path, "w", encoding="utf-8") as f: | |
| json.dump(cache_data, f, indent=2, ensure_ascii=False) | |
| self._cache = cache_data | |
| logger.info(f"Saved MCP tool cache: {len(servers)} servers") | |
| except Exception as e: | |
| logger.error(f"Failed to save cache: {e}") | |
| def save_server(self, server_name: str, tools: List[Dict]): | |
| """ | |
| Save/update a single server's tools to cache (incremental append). | |
| Args: | |
| server_name: Name of the MCP server | |
| tools: List of tool metadata dicts for this server | |
| """ | |
| self._ensure_dir() | |
| # Load existing cache | |
| cache = self.load() | |
| # Update server entry | |
| if "servers" not in cache: | |
| cache["servers"] = {} | |
| cache["servers"][server_name] = tools | |
| cache["servers"] = self._reorder_servers(cache["servers"]) | |
| cache["updated_at"] = datetime.now().isoformat() | |
| # Save back | |
| try: | |
| with open(self.cache_path, "w", encoding="utf-8") as f: | |
| json.dump(cache, f, indent=2, ensure_ascii=False) | |
| self._cache = cache | |
| logger.debug(f"Saved {len(tools)} tools for server '{server_name}'") | |
| except Exception as e: | |
| logger.error(f"Failed to save cache for server '{server_name}': {e}") | |
| def get_server_tools(self, server_name: str) -> Optional[List[Dict]]: | |
| """Get cached tools for a specific server.""" | |
| cache = self.load() | |
| return cache.get("servers", {}).get(server_name) | |
| def get_all_tools(self) -> Dict[str, List[Dict]]: | |
| """Get all cached tools, grouped by server.""" | |
| cache = self.load() | |
| return cache.get("servers", {}) | |
| def has_cache(self) -> bool: | |
| """Check if cache exists and has data.""" | |
| cache = self.load() | |
| return bool(cache.get("servers")) | |
| def clear(self): | |
| """Clear the cache.""" | |
| if self.cache_path.exists(): | |
| self.cache_path.unlink() | |
| self._cache = None | |
| logger.info("MCP tool cache cleared") | |
| def save_failed_server(self, server_name: str, error: str): | |
| """ | |
| Record a failed server to cache. | |
| Args: | |
| server_name: Name of the failed MCP server | |
| error: Error message | |
| """ | |
| self._ensure_dir() | |
| # Load existing cache | |
| cache = self.load() | |
| # Add to failed_servers list | |
| if "failed_servers" not in cache: | |
| cache["failed_servers"] = {} | |
| cache["failed_servers"][server_name] = { | |
| "error": error, | |
| "failed_at": datetime.now().isoformat(), | |
| } | |
| cache["updated_at"] = datetime.now().isoformat() | |
| # Save back | |
| try: | |
| with open(self.cache_path, "w", encoding="utf-8") as f: | |
| json.dump(cache, f, indent=2, ensure_ascii=False) | |
| self._cache = cache | |
| except Exception as e: | |
| logger.error(f"Failed to save failed server '{server_name}': {e}") | |
| def get_failed_servers(self) -> Dict[str, Dict]: | |
| """Get list of failed servers from cache.""" | |
| cache = self.load() | |
| return cache.get("failed_servers", {}) | |
| def load_sanitized(self) -> Dict[str, Any]: | |
| """Load sanitized cache from disk. Returns empty dict if not exists.""" | |
| if self._sanitized_cache is not None: | |
| return self._sanitized_cache | |
| if not self.sanitized_cache_path.exists(): | |
| self._sanitized_cache = {"version": self.CACHE_VERSION, "servers": {}} | |
| return self._sanitized_cache | |
| try: | |
| with open(self.sanitized_cache_path, "r", encoding="utf-8") as f: | |
| self._sanitized_cache = json.load(f) | |
| logger.info(f"Loaded sanitized MCP tool cache: {len(self._sanitized_cache.get('servers', {}))} servers") | |
| return self._sanitized_cache | |
| except Exception as e: | |
| logger.warning(f"Failed to load sanitized cache: {e}") | |
| self._sanitized_cache = {"version": self.CACHE_VERSION, "servers": {}} | |
| return self._sanitized_cache | |
| def save_sanitized(self, servers: Dict[str, List[Dict]]): | |
| """ | |
| Save sanitized tool metadata to disk. | |
| Args: | |
| servers: Dict mapping server_name -> list of sanitized tool metadata dicts | |
| """ | |
| self._ensure_dir() | |
| cache_data = { | |
| "version": self.CACHE_VERSION, | |
| "updated_at": datetime.now().isoformat(), | |
| "sanitized": True, | |
| "servers": servers, | |
| } | |
| try: | |
| with open(self.sanitized_cache_path, "w", encoding="utf-8") as f: | |
| json.dump(cache_data, f, indent=2, ensure_ascii=False) | |
| self._sanitized_cache = cache_data | |
| logger.info(f"Saved sanitized MCP tool cache: {len(servers)} servers") | |
| except Exception as e: | |
| logger.error(f"Failed to save sanitized cache: {e}") | |
| def get_all_sanitized_tools(self) -> Dict[str, List[Dict]]: | |
| """Get all sanitized cached tools, grouped by server.""" | |
| cache = self.load_sanitized() | |
| return cache.get("servers", {}) | |
| def has_sanitized_cache(self) -> bool: | |
| """Check if sanitized cache exists and has data.""" | |
| cache = self.load_sanitized() | |
| return bool(cache.get("servers")) | |
| def clear_sanitized(self): | |
| """Clear the sanitized cache.""" | |
| if self.sanitized_cache_path.exists(): | |
| self.sanitized_cache_path.unlink() | |
| self._sanitized_cache = None | |
| logger.info("Sanitized MCP tool cache cleared") | |
| # Global instance | |
| _tool_cache: Optional[MCPToolCache] = None | |
| def get_tool_cache() -> MCPToolCache: | |
| """Get global tool cache instance.""" | |
| global _tool_cache | |
| if _tool_cache is None: | |
| _tool_cache = MCPToolCache() | |
| return _tool_cache | |