File size: 11,202 Bytes
ab07cb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dac021d
ab07cb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dac021d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab07cb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dac021d
ab07cb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dac021d
 
 
 
ab07cb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
#!/usr/bin/env python3
"""
ERA5 MCP Server
===============

Model Context Protocol server for ERA5 climate data retrieval.

Usage:
    eurus-mcp                          # If installed as package
    python -m eurus.server         # Direct execution

Configuration via environment variables:
    ARRAYLAKE_API_KEY    - Required for data access
    ERA5_DATA_DIR        - Data storage directory (default: ./data)
    ERA5_MEMORY_DIR      - Memory storage directory (default: ./.memory)
    ERA5_MAX_RETRIES     - Download retry attempts (default: 3)
    ERA5_LOG_LEVEL       - Logging level (default: INFO)
"""

from __future__ import annotations

import asyncio
import logging
import os
import sys
from typing import Any

from dotenv import load_dotenv

# Load environment variables early
load_dotenv()

# Configure logging
log_level = os.environ.get("ERA5_LOG_LEVEL", "INFO").upper()
logging.basicConfig(
    level=getattr(logging, log_level),
    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
    datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)

# Import MCP components
try:
    from mcp.server import Server
    from mcp.server.stdio import stdio_server
    from mcp.types import (
        CallToolResult,
        TextContent,
        Tool,
    )
except ImportError:
    logger.error("MCP library not found. Install with: pip install mcp")
    sys.exit(1)

# Import ERA5 components
from eurus.config import (
    list_available_variables,
)
from eurus.memory import get_memory
from eurus.tools.era5 import retrieve_era5_data, ERA5RetrievalArgs
from eurus.tools.analysis_guide import get_analysis_guide, ANALYSIS_GUIDES

# Import Maritime Routing tool
from eurus.tools.routing import (
    calculate_maritime_route,
    RouteArgs,
    HAS_ROUTING_DEPS,
)

# Create MCP server
server = Server("era5-climate-data")

# Alias for compatibility
app = server


# ============================================================================
# TOOL DEFINITIONS
# ============================================================================

@server.list_tools()
async def list_tools() -> list[Tool]:
    """List available MCP tools."""
    tools = [
        Tool(
            name="retrieve_era5_data",
            description=(
                "Retrieve ERA5 climate reanalysis data from Earthmover's cloud archive.\n\n"
                "⚠️ QUERY TYPE is AUTO-DETECTED based on time/area:\n"
                "- 'temporal': time > 1 day AND region < 30°×30° (time series, small area)\n"
                "- 'spatial': time ≤ 1 day OR region ≥ 30°×30° (maps, snapshots, large area)\n\n"
                "VARIABLES: sst, t2, u10, v10, mslp, tcc, tp\n"
                "NOTE: swh (waves) is NOT available in this dataset!\n\n"
                "COORDINATES: Always specify lat/lon bounds explicitly.\n"
                "Longitude: Use 0-360 format (e.g., -74°W = 286°E)\n\n"
                "Returns file path. Load: xr.open_dataset('PATH', engine='zarr')"
            ),
            inputSchema=ERA5RetrievalArgs.model_json_schema()
        ),
        Tool(
            name="list_era5_variables",
            description=(
                "List all available ERA5 variables with their descriptions, units, "
                "and short names for use with retrieve_era5_data."
            ),
            inputSchema={
                "type": "object",
                "properties": {},
                "additionalProperties": False
            }
        ),
        Tool(
            name="list_cached_datasets",
            description=(
                "List all ERA5 datasets that have been downloaded and cached locally. "
                "Shows variable, date range, file path, and size."
            ),
            inputSchema={
                "type": "object",
                "properties": {},
                "additionalProperties": False
            }
        ),
        Tool(
            name="get_analysis_guide",
            description=(
                "Get methodological guidance for climate data analysis and visualization.\n\n"
                "Returns workflow steps, quality checklists, common pitfalls, and best practices.\n\n"
                "TOPICS:\n"
                "- Data: load_data, spatial_subset, temporal_subset\n"
                "- Statistics: anomalies, zscore, trend_analysis, eof_analysis\n"
                "- Advanced: correlation_analysis, composite_analysis, diurnal_cycle, "
                "seasonal_decomposition, spectral_analysis, spatial_statistics, "
                "multi_variable, climatology_normals\n"
                "- Climate: climate_indices, extremes, drought_analysis, heatwave_detection, "
                "atmospheric_rivers, blocking_events\n"
                "- Domain: energy_budget, wind_energy, moisture_budget, convective_potential, snow_cover\n"
                "- Visualization: visualization_spatial, visualization_timeseries, "
                "visualization_anomaly_map, visualization_wind, visualization_comparison, "
                "visualization_profile, visualization_distribution, visualization_animation, "
                "visualization_dashboard, visualization_contour, visualization_correlation_map\n"
                "- Maritime: maritime_route, maritime_visualization\n\n"
                "CALL THIS BEFORE writing analysis/plotting code!"
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "topic": {
                        "type": "string",
                        "description": "Analysis topic to get guidance for",
                        "enum": sorted(ANALYSIS_GUIDES.keys())
                    }
                },
                "required": ["topic"]
            }
        ),
    ]

    # ========== MARITIME ROUTING TOOL (if dependencies available) ==========
    if HAS_ROUTING_DEPS:
        tools.append(
            Tool(
                name="calculate_maritime_route",
                description=(
                    "Calculate a realistic maritime shipping route between two ports. "
                    "Uses global shipping lane graph to avoid land and find optimal path.\n\n"
                    "RETURNS: Waypoint coordinates, bounding box, and INSTRUCTIONS for "
                    "climatological risk assessment protocol.\n\n"
                    "DOES NOT: Check weather itself. The Agent must follow the returned "
                    "protocol to assess route safety using ERA5 data.\n\n"
                    "WORKFLOW:\n"
                    "1. Call this tool → get waypoints + instructions\n"
                    "2. Download ERA5 wind data (u10, v10) for the region\n"
                    "3. Call get_analysis_guide(topic='maritime_visualization')\n"
                    "4. Execute analysis in python_repl"
                ),
                inputSchema=RouteArgs.model_json_schema()
            )
        )

    return tools


# ============================================================================
# TOOL HANDLERS
# ============================================================================

@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> CallToolResult:
    """Handle tool calls."""

    try:
        if name == "retrieve_era5_data":
            # Run synchronous function in thread pool (query_type auto-detected)
            result = await asyncio.get_event_loop().run_in_executor(
                None,
                lambda: retrieve_era5_data(
                    variable_id=arguments["variable_id"],
                    start_date=arguments["start_date"],
                    end_date=arguments["end_date"],
                    min_latitude=arguments["min_latitude"],
                    max_latitude=arguments["max_latitude"],
                    min_longitude=arguments["min_longitude"],
                    max_longitude=arguments["max_longitude"],
                )
            )
            return CallToolResult(content=[TextContent(type="text", text=result)])

        elif name == "list_era5_variables":
            result = list_available_variables()
            return CallToolResult(content=[TextContent(type="text", text=result)])

        elif name == "list_cached_datasets":
            memory = get_memory()
            result = memory.list_datasets()
            return CallToolResult(content=[TextContent(type="text", text=result)])

        elif name == "get_analysis_guide":
            result = get_analysis_guide(arguments["topic"])
            return CallToolResult(content=[TextContent(type="text", text=result)])

        # ========== MARITIME ROUTING HANDLER ==========
        elif name == "calculate_maritime_route":
            if not HAS_ROUTING_DEPS:
                return CallToolResult(
                    content=[TextContent(
                        type="text",
                        text="Error: Maritime routing dependencies not installed.\n"
                             "Install with: pip install scgraph geopy"
                    )],
                    isError=True
                )
            result = await asyncio.get_event_loop().run_in_executor(
                None,
                lambda: calculate_maritime_route(
                    origin_lat=arguments["origin_lat"],
                    origin_lon=arguments["origin_lon"],
                    dest_lat=arguments["dest_lat"],
                    dest_lon=arguments["dest_lon"],
                    month=arguments["month"],
                    year=arguments.get("year"),
                    speed_knots=arguments.get("speed_knots", 14.0)
                )
            )
            return CallToolResult(content=[TextContent(type="text", text=result)])

        else:
            return CallToolResult(
                content=[TextContent(type="text", text=f"Unknown tool: {name}")],
                isError=True
            )

    except Exception as e:
        logger.exception(f"Error executing tool {name}")
        return CallToolResult(
            content=[TextContent(type="text", text=f"Error: {str(e)}")],
            isError=True
        )


# ============================================================================
# SERVER STARTUP
# ============================================================================

async def run_server() -> None:
    """Run the MCP server using stdio transport."""
    logger.info("Starting ERA5 MCP Server...")

    # Check for API key
    if not os.environ.get("ARRAYLAKE_API_KEY"):
        logger.warning(
            "ARRAYLAKE_API_KEY not set. Data retrieval will fail. "
            "Set it via environment variable or .env file."
        )

    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )


def main() -> None:
    """Main entry point."""
    try:
        asyncio.run(run_server())
    except KeyboardInterrupt:
        logger.info("Server shutdown requested")
    except Exception as e:
        logger.exception(f"Server error: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()