Spaces:
Sleeping
Sleeping
File size: 5,520 Bytes
8bf4d58 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
"""Cloud storage MCP server."""
import logging
from typing import Any, Dict
try:
from mcp.types import Tool
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
class Tool:
def __init__(self, **kwargs):
pass
from src.mcp.mcp_server import BaseMCPServer
from src.core.config import get_settings
logger = logging.getLogger(__name__)
class CloudMCPServer(BaseMCPServer):
"""MCP server for cloud storage operations."""
def __init__(self):
"""Initialize cloud MCP server."""
super().__init__("cloud_storage_server")
self.settings = get_settings()
self._init_cloud_client()
self._register_tools()
def _init_cloud_client(self):
"""Initialize cloud storage client."""
self.cloud_type = None
self.client = None
# Check for AWS S3
if self.settings.aws_access_key_id and self.settings.aws_s3_bucket:
try:
import boto3
self.client = boto3.client(
"s3",
aws_access_key_id=self.settings.aws_access_key_id,
aws_secret_access_key=self.settings.aws_secret_access_key,
region_name=self.settings.aws_region,
)
self.cloud_type = "s3"
self.bucket_name = self.settings.aws_s3_bucket
except Exception as e:
logger.error(f"Error initializing S3: {e}")
# Check for GCS
elif self.settings.google_application_credentials and self.settings.gcs_bucket_name:
try:
from google.cloud import storage
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.settings.google_application_credentials
self.client = storage.Client()
self.cloud_type = "gcs"
self.bucket_name = self.settings.gcs_bucket_name
except Exception as e:
logger.error(f"Error initializing GCS: {e}")
def _register_tools(self):
"""Register cloud storage tools."""
if not self.client:
logger.warning("No cloud storage configured, skipping tool registration")
return
# List objects tool
list_tool = Tool(
name="list_cloud_objects",
description="List objects in cloud storage",
inputSchema={
"type": "object",
"properties": {
"prefix": {
"type": "string",
"description": "Object key prefix to filter",
},
"max_keys": {
"type": "integer",
"description": "Maximum number of objects to return",
"default": 10,
},
},
},
)
self.register_tool(list_tool)
# Get object tool
get_tool = Tool(
name="get_cloud_object",
description="Get an object from cloud storage",
inputSchema={
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "Object key",
},
},
"required": ["key"],
},
)
self.register_tool(get_tool)
async def _execute_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
"""Execute a cloud storage tool."""
if not self.client:
return {"error": "Cloud storage not configured"}
if name == "list_cloud_objects":
prefix = arguments.get("prefix", "")
max_keys = arguments.get("max_keys", 10)
if self.cloud_type == "s3":
response = self.client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=prefix,
MaxKeys=max_keys,
)
objects = [
{"key": obj["Key"], "size": obj["Size"]}
for obj in response.get("Contents", [])
]
return {"objects": objects, "count": len(objects)}
elif self.cloud_type == "gcs":
bucket = self.client.bucket(self.bucket_name)
blobs = list(bucket.list_blobs(prefix=prefix, max_results=max_keys))
objects = [{"key": blob.name, "size": blob.size} for blob in blobs]
return {"objects": objects, "count": len(objects)}
elif name == "get_cloud_object":
key = arguments.get("key")
if self.cloud_type == "s3":
try:
response = self.client.get_object(Bucket=self.bucket_name, Key=key)
content = response["Body"].read().decode("utf-8")
return {"key": key, "content": content}
except Exception as e:
return {"error": str(e)}
elif self.cloud_type == "gcs":
try:
bucket = self.client.bucket(self.bucket_name)
blob = bucket.blob(key)
content = blob.download_as_text()
return {"key": key, "content": content}
except Exception as e:
return {"error": str(e)}
else:
raise ValueError(f"Unknown tool: {name}")
|