Spaces:
Sleeping
Sleeping
| """Cloud storage MCP server.""" | |
| import logging | |
| from typing import Any, Dict | |
| try: | |
| from mcp.types import Tool | |
| MCP_AVAILABLE = True | |
| except ImportError: | |
| MCP_AVAILABLE = False | |
| class Tool: | |
| def __init__(self, **kwargs): | |
| pass | |
| from src.mcp.mcp_server import BaseMCPServer | |
| from src.core.config import get_settings | |
| logger = logging.getLogger(__name__) | |
| class CloudMCPServer(BaseMCPServer): | |
| """MCP server for cloud storage operations.""" | |
| def __init__(self): | |
| """Initialize cloud MCP server.""" | |
| super().__init__("cloud_storage_server") | |
| self.settings = get_settings() | |
| self._init_cloud_client() | |
| self._register_tools() | |
| def _init_cloud_client(self): | |
| """Initialize cloud storage client.""" | |
| self.cloud_type = None | |
| self.client = None | |
| # Check for AWS S3 | |
| if self.settings.aws_access_key_id and self.settings.aws_s3_bucket: | |
| try: | |
| import boto3 | |
| self.client = boto3.client( | |
| "s3", | |
| aws_access_key_id=self.settings.aws_access_key_id, | |
| aws_secret_access_key=self.settings.aws_secret_access_key, | |
| region_name=self.settings.aws_region, | |
| ) | |
| self.cloud_type = "s3" | |
| self.bucket_name = self.settings.aws_s3_bucket | |
| except Exception as e: | |
| logger.error(f"Error initializing S3: {e}") | |
| # Check for GCS | |
| elif self.settings.google_application_credentials and self.settings.gcs_bucket_name: | |
| try: | |
| from google.cloud import storage | |
| import os | |
| os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.settings.google_application_credentials | |
| self.client = storage.Client() | |
| self.cloud_type = "gcs" | |
| self.bucket_name = self.settings.gcs_bucket_name | |
| except Exception as e: | |
| logger.error(f"Error initializing GCS: {e}") | |
| def _register_tools(self): | |
| """Register cloud storage tools.""" | |
| if not self.client: | |
| logger.warning("No cloud storage configured, skipping tool registration") | |
| return | |
| # List objects tool | |
| list_tool = Tool( | |
| name="list_cloud_objects", | |
| description="List objects in cloud storage", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "prefix": { | |
| "type": "string", | |
| "description": "Object key prefix to filter", | |
| }, | |
| "max_keys": { | |
| "type": "integer", | |
| "description": "Maximum number of objects to return", | |
| "default": 10, | |
| }, | |
| }, | |
| }, | |
| ) | |
| self.register_tool(list_tool) | |
| # Get object tool | |
| get_tool = Tool( | |
| name="get_cloud_object", | |
| description="Get an object from cloud storage", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "key": { | |
| "type": "string", | |
| "description": "Object key", | |
| }, | |
| }, | |
| "required": ["key"], | |
| }, | |
| ) | |
| self.register_tool(get_tool) | |
| async def _execute_tool(self, name: str, arguments: Dict[str, Any]) -> Any: | |
| """Execute a cloud storage tool.""" | |
| if not self.client: | |
| return {"error": "Cloud storage not configured"} | |
| if name == "list_cloud_objects": | |
| prefix = arguments.get("prefix", "") | |
| max_keys = arguments.get("max_keys", 10) | |
| if self.cloud_type == "s3": | |
| response = self.client.list_objects_v2( | |
| Bucket=self.bucket_name, | |
| Prefix=prefix, | |
| MaxKeys=max_keys, | |
| ) | |
| objects = [ | |
| {"key": obj["Key"], "size": obj["Size"]} | |
| for obj in response.get("Contents", []) | |
| ] | |
| return {"objects": objects, "count": len(objects)} | |
| elif self.cloud_type == "gcs": | |
| bucket = self.client.bucket(self.bucket_name) | |
| blobs = list(bucket.list_blobs(prefix=prefix, max_results=max_keys)) | |
| objects = [{"key": blob.name, "size": blob.size} for blob in blobs] | |
| return {"objects": objects, "count": len(objects)} | |
| elif name == "get_cloud_object": | |
| key = arguments.get("key") | |
| if self.cloud_type == "s3": | |
| try: | |
| response = self.client.get_object(Bucket=self.bucket_name, Key=key) | |
| content = response["Body"].read().decode("utf-8") | |
| return {"key": key, "content": content} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| elif self.cloud_type == "gcs": | |
| try: | |
| bucket = self.client.bucket(self.bucket_name) | |
| blob = bucket.blob(key) | |
| content = blob.download_as_text() | |
| return {"key": key, "content": content} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| else: | |
| raise ValueError(f"Unknown tool: {name}") | |