File size: 5,520 Bytes
8bf4d58
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Cloud storage MCP server."""

import logging
from typing import Any, Dict

try:
    from mcp.types import Tool
    MCP_AVAILABLE = True
except ImportError:
    MCP_AVAILABLE = False
    class Tool:
        def __init__(self, **kwargs):
            pass

from src.mcp.mcp_server import BaseMCPServer
from src.core.config import get_settings

logger = logging.getLogger(__name__)


class CloudMCPServer(BaseMCPServer):
    """MCP server for cloud storage operations."""

    def __init__(self):
        """Initialize cloud MCP server."""
        super().__init__("cloud_storage_server")
        self.settings = get_settings()
        self._init_cloud_client()
        self._register_tools()

    def _init_cloud_client(self):
        """Initialize cloud storage client."""
        self.cloud_type = None
        self.client = None

        # Check for AWS S3
        if self.settings.aws_access_key_id and self.settings.aws_s3_bucket:
            try:
                import boto3
                self.client = boto3.client(
                    "s3",
                    aws_access_key_id=self.settings.aws_access_key_id,
                    aws_secret_access_key=self.settings.aws_secret_access_key,
                    region_name=self.settings.aws_region,
                )
                self.cloud_type = "s3"
                self.bucket_name = self.settings.aws_s3_bucket
            except Exception as e:
                logger.error(f"Error initializing S3: {e}")

        # Check for GCS
        elif self.settings.google_application_credentials and self.settings.gcs_bucket_name:
            try:
                from google.cloud import storage
                import os
                os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.settings.google_application_credentials
                self.client = storage.Client()
                self.cloud_type = "gcs"
                self.bucket_name = self.settings.gcs_bucket_name
            except Exception as e:
                logger.error(f"Error initializing GCS: {e}")

    def _register_tools(self):
        """Register cloud storage tools."""
        if not self.client:
            logger.warning("No cloud storage configured, skipping tool registration")
            return

        # List objects tool
        list_tool = Tool(
            name="list_cloud_objects",
            description="List objects in cloud storage",
            inputSchema={
                "type": "object",
                "properties": {
                    "prefix": {
                        "type": "string",
                        "description": "Object key prefix to filter",
                    },
                    "max_keys": {
                        "type": "integer",
                        "description": "Maximum number of objects to return",
                        "default": 10,
                    },
                },
            },
        )
        self.register_tool(list_tool)

        # Get object tool
        get_tool = Tool(
            name="get_cloud_object",
            description="Get an object from cloud storage",
            inputSchema={
                "type": "object",
                "properties": {
                    "key": {
                        "type": "string",
                        "description": "Object key",
                    },
                },
                "required": ["key"],
            },
        )
        self.register_tool(get_tool)

    async def _execute_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
        """Execute a cloud storage tool."""
        if not self.client:
            return {"error": "Cloud storage not configured"}

        if name == "list_cloud_objects":
            prefix = arguments.get("prefix", "")
            max_keys = arguments.get("max_keys", 10)

            if self.cloud_type == "s3":
                response = self.client.list_objects_v2(
                    Bucket=self.bucket_name,
                    Prefix=prefix,
                    MaxKeys=max_keys,
                )
                objects = [
                    {"key": obj["Key"], "size": obj["Size"]}
                    for obj in response.get("Contents", [])
                ]
                return {"objects": objects, "count": len(objects)}

            elif self.cloud_type == "gcs":
                bucket = self.client.bucket(self.bucket_name)
                blobs = list(bucket.list_blobs(prefix=prefix, max_results=max_keys))
                objects = [{"key": blob.name, "size": blob.size} for blob in blobs]
                return {"objects": objects, "count": len(objects)}

        elif name == "get_cloud_object":
            key = arguments.get("key")

            if self.cloud_type == "s3":
                try:
                    response = self.client.get_object(Bucket=self.bucket_name, Key=key)
                    content = response["Body"].read().decode("utf-8")
                    return {"key": key, "content": content}
                except Exception as e:
                    return {"error": str(e)}

            elif self.cloud_type == "gcs":
                try:
                    bucket = self.client.bucket(self.bucket_name)
                    blob = bucket.blob(key)
                    content = blob.download_as_text()
                    return {"key": key, "content": content}
                except Exception as e:
                    return {"error": str(e)}

        else:
            raise ValueError(f"Unknown tool: {name}")