AgenticAI-RAG / src /mcp /cloud_server.py
GreymanT's picture
Upload 80 files
8bf4d58 verified
"""Cloud storage MCP server."""
import logging
from typing import Any, Dict
try:
from mcp.types import Tool
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
class Tool:
def __init__(self, **kwargs):
pass
from src.mcp.mcp_server import BaseMCPServer
from src.core.config import get_settings
logger = logging.getLogger(__name__)
class CloudMCPServer(BaseMCPServer):
"""MCP server for cloud storage operations."""
def __init__(self):
"""Initialize cloud MCP server."""
super().__init__("cloud_storage_server")
self.settings = get_settings()
self._init_cloud_client()
self._register_tools()
def _init_cloud_client(self):
"""Initialize cloud storage client."""
self.cloud_type = None
self.client = None
# Check for AWS S3
if self.settings.aws_access_key_id and self.settings.aws_s3_bucket:
try:
import boto3
self.client = boto3.client(
"s3",
aws_access_key_id=self.settings.aws_access_key_id,
aws_secret_access_key=self.settings.aws_secret_access_key,
region_name=self.settings.aws_region,
)
self.cloud_type = "s3"
self.bucket_name = self.settings.aws_s3_bucket
except Exception as e:
logger.error(f"Error initializing S3: {e}")
# Check for GCS
elif self.settings.google_application_credentials and self.settings.gcs_bucket_name:
try:
from google.cloud import storage
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.settings.google_application_credentials
self.client = storage.Client()
self.cloud_type = "gcs"
self.bucket_name = self.settings.gcs_bucket_name
except Exception as e:
logger.error(f"Error initializing GCS: {e}")
def _register_tools(self):
"""Register cloud storage tools."""
if not self.client:
logger.warning("No cloud storage configured, skipping tool registration")
return
# List objects tool
list_tool = Tool(
name="list_cloud_objects",
description="List objects in cloud storage",
inputSchema={
"type": "object",
"properties": {
"prefix": {
"type": "string",
"description": "Object key prefix to filter",
},
"max_keys": {
"type": "integer",
"description": "Maximum number of objects to return",
"default": 10,
},
},
},
)
self.register_tool(list_tool)
# Get object tool
get_tool = Tool(
name="get_cloud_object",
description="Get an object from cloud storage",
inputSchema={
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "Object key",
},
},
"required": ["key"],
},
)
self.register_tool(get_tool)
async def _execute_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
"""Execute a cloud storage tool."""
if not self.client:
return {"error": "Cloud storage not configured"}
if name == "list_cloud_objects":
prefix = arguments.get("prefix", "")
max_keys = arguments.get("max_keys", 10)
if self.cloud_type == "s3":
response = self.client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=prefix,
MaxKeys=max_keys,
)
objects = [
{"key": obj["Key"], "size": obj["Size"]}
for obj in response.get("Contents", [])
]
return {"objects": objects, "count": len(objects)}
elif self.cloud_type == "gcs":
bucket = self.client.bucket(self.bucket_name)
blobs = list(bucket.list_blobs(prefix=prefix, max_results=max_keys))
objects = [{"key": blob.name, "size": blob.size} for blob in blobs]
return {"objects": objects, "count": len(objects)}
elif name == "get_cloud_object":
key = arguments.get("key")
if self.cloud_type == "s3":
try:
response = self.client.get_object(Bucket=self.bucket_name, Key=key)
content = response["Body"].read().decode("utf-8")
return {"key": key, "content": content}
except Exception as e:
return {"error": str(e)}
elif self.cloud_type == "gcs":
try:
bucket = self.client.bucket(self.bucket_name)
blob = bucket.blob(key)
content = blob.download_as_text()
return {"key": key, "content": content}
except Exception as e:
return {"error": str(e)}
else:
raise ValueError(f"Unknown tool: {name}")