rajkumarrawal's picture
Initial commit
2ec0d39
"""
Sample MCP Weather Server
Demonstrates integration with external APIs and real-world MCP server implementation
"""
import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import aiohttp
import structlog
# Configure structured logging
logging.basicConfig(level=logging.INFO)
logger = structlog.get_logger()
class WeatherAPI:
"""Mock weather API for demonstration purposes."""
def __init__(self):
# Mock weather data for different locations
self.mock_data = {
"New York": {
"temperature": 22,
"humidity": 65,
"conditions": "Partly Cloudy",
"wind_speed": 8.5,
"wind_direction": "NW",
"pressure": 1013.2,
"visibility": 10.0,
"uv_index": 6,
"forecast": [
{"date": "2024-11-30", "high": 24, "low": 18, "conditions": "Sunny"},
{"date": "2024-12-01", "high": 20, "low": 15, "conditions": "Cloudy"},
{"date": "2024-12-02", "high": 23, "low": 19, "conditions": "Partly Cloudy"}
]
},
"London": {
"temperature": 15,
"humidity": 78,
"conditions": "Rainy",
"wind_speed": 12.3,
"wind_direction": "SW",
"pressure": 1008.5,
"visibility": 8.0,
"uv_index": 2,
"forecast": [
{"date": "2024-11-30", "high": 17, "low": 12, "conditions": "Rainy"},
{"date": "2024-12-01", "high": 14, "low": 10, "conditions": "Overcast"},
{"date": "2024-12-02", "high": 16, "low": 11, "conditions": "Drizzle"}
]
},
"Tokyo": {
"temperature": 18,
"humidity": 72,
"conditions": "Clear",
"wind_speed": 5.2,
"wind_direction": "NE",
"pressure": 1016.8,
"visibility": 12.0,
"uv_index": 4,
"forecast": [
{"date": "2024-11-30", "high": 21, "low": 16, "conditions": "Clear"},
{"date": "2024-12-01", "high": 19, "low": 14, "conditions": "Partly Cloudy"},
{"date": "2024-12-02", "high": 22, "low": 17, "conditions": "Sunny"}
]
}
}
async def get_current_weather(self, location: str) -> Dict[str, Any]:
"""Get current weather for a location."""
# Simulate API delay
await asyncio.sleep(0.1)
location = location.strip().title()
if location in self.mock_data:
data = self.mock_data[location].copy()
data["location"] = location
data["timestamp"] = datetime.utcnow().isoformat()
return data
else:
raise ValueError(f"Weather data not available for location: {location}")
async def get_forecast(self, location: str, days: int = 3) -> Dict[str, Any]:
"""Get weather forecast for a location."""
await asyncio.sleep(0.15)
location = location.strip().title()
if location in self.mock_data:
forecast_data = self.mock_data[location]["forecast"][:days]
return {
"location": location,
"forecast": forecast_data,
"generated_at": datetime.utcnow().isoformat()
}
else:
raise ValueError(f"Forecast data not available for location: {location}")
async def search_locations(self, query: str) -> List[str]:
"""Search for available locations."""
await asyncio.sleep(0.05)
query = query.lower()
locations = []
for location in self.mock_data.keys():
if query in location.lower():
locations.append(location)
return locations
class WeatherMCPServer:
"""Model Context Protocol Weather Server implementation."""
def __init__(self, port: int = 8001):
self.port = port
self.api = WeatherAPI()
self.tools = {
"get_current_weather": {
"name": "get_current_weather",
"description": "Get current weather conditions for a specified location",
"inputSchema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City or location name (e.g., 'New York', 'London', 'Tokyo')",
"minLength": 1,
"maxLength": 100
}
},
"required": ["location"]
},
"outputSchema": {
"type": "object",
"properties": {
"location": {"type": "string"},
"temperature": {"type": "number", "description": "Temperature in Celsius"},
"humidity": {"type": "number", "description": "Humidity percentage"},
"conditions": {"type": "string", "description": "Weather conditions"},
"wind_speed": {"type": "number", "description": "Wind speed in km/h"},
"wind_direction": {"type": "string", "description": "Wind direction"},
"pressure": {"type": "number", "description": "Atmospheric pressure in hPa"},
"visibility": {"type": "number", "description": "Visibility in km"},
"uv_index": {"type": "number", "description": "UV index"},
"timestamp": {"type": "string", "description": "ISO timestamp"}
}
},
"examples": [
{"location": "New York"},
{"location": "London"},
{"location": "Tokyo"}
],
"tags": ["weather", "current", "conditions"]
},
"get_weather_forecast": {
"name": "get_weather_forecast",
"description": "Get weather forecast for a specified location and number of days",
"inputSchema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City or location name",
"minLength": 1,
"maxLength": 100
},
"days": {
"type": "integer",
"description": "Number of forecast days (1-7)",
"minimum": 1,
"maximum": 7,
"default": 3
}
},
"required": ["location"]
},
"outputSchema": {
"type": "object",
"properties": {
"location": {"type": "string"},
"forecast": {
"type": "array",
"items": {
"type": "object",
"properties": {
"date": {"type": "string"},
"high": {"type": "number", "description": "High temperature"},
"low": {"type": "number", "description": "Low temperature"},
"conditions": {"type": "string"}
}
}
},
"generated_at": {"type": "string"}
}
},
"examples": [
{"location": "New York", "days": 5},
{"location": "London", "days": 3}
],
"tags": ["weather", "forecast", "planning"]
},
"search_weather_locations": {
"name": "search_weather_locations",
"description": "Search for available weather locations",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query for location names",
"minLength": 1,
"maxLength": 50
}
},
"required": ["query"]
},
"outputSchema": {
"type": "object",
"properties": {
"locations": {
"type": "array",
"items": {"type": "string"},
"description": "List of matching locations"
}
}
},
"examples": [
{"query": "New"},
{"query": "London"}
],
"tags": ["search", "locations", "discovery"]
}
}
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle incoming MCP requests."""
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
try:
if method == "tools/list":
return await self._handle_tools_list(request_id)
elif method == "tools/call":
return await self._handle_tools_call(request_id, params)
elif method == "initialize":
return await self._handle_initialize(request_id, params)
else:
return self._error_response(request_id, -32601, "Method not found")
except Exception as e:
logger.error("Request handling failed", method=method, error=str(e))
return self._error_response(request_id, -32603, f"Internal error: {str(e)}")
async def _handle_tools_list(self, request_id: str) -> Dict[str, Any]:
"""Handle tools/list request."""
tools_list = []
for tool_name, tool_info in self.tools.items():
tools_list.append({
"name": tool_info["name"],
"description": tool_info["description"],
"inputSchema": tool_info["inputSchema"],
"outputSchema": tool_info.get("outputSchema"),
"examples": tool_info.get("examples", []),
"tags": tool_info.get("tags", [])
})
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": tools_list
}
}
async def _handle_tools_call(self, request_id: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tools/call request."""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name not in self.tools:
return self._error_response(request_id, -32601, f"Tool '{tool_name}' not found")
# Execute the tool
try:
if tool_name == "get_current_weather":
result = await self._get_current_weather(arguments)
elif tool_name == "get_weather_forecast":
result = await self._get_weather_forecast(arguments)
elif tool_name == "search_weather_locations":
result = await self._search_weather_locations(arguments)
else:
return self._error_response(request_id, -32601, f"Tool '{tool_name}' not implemented")
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, indent=2)
}
]
}
}
except Exception as e:
logger.error("Tool execution failed", tool=tool_name, error=str(e))
return self._error_response(request_id, -32603, f"Tool execution failed: {str(e)}")
async def _handle_initialize(self, request_id: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle initialize request."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "Weather MCP Server",
"version": "1.0.0",
"description": "Provides weather information and forecasts"
}
}
}
async def _get_current_weather(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute get_current_weather tool."""
location = arguments.get("location")
if not location:
raise ValueError("Location is required")
return await self.api.get_current_weather(location)
async def _get_weather_forecast(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute get_weather_forecast tool."""
location = arguments.get("location")
days = arguments.get("days", 3)
if not location:
raise ValueError("Location is required")
if not isinstance(days, int) or days < 1 or days > 7:
raise ValueError("Days must be an integer between 1 and 7")
return await self.api.get_forecast(location, days)
async def _search_weather_locations(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute search_weather_locations tool."""
query = arguments.get("query")
if not query:
raise ValueError("Query is required")
locations = await self.api.search_locations(query)
return {"locations": locations}
def _error_response(self, request_id: str, code: int, message: str) -> Dict[str, Any]:
"""Create error response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": code,
"message": message
}
}
async def start_server(self):
"""Start the MCP server."""
from aiohttp import web, ClientTimeout
import aiohttp_cors
# Create aiohttp application
app = web.Application()
# Add CORS support
cors = aiohttp_cors.setup(app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
allow_methods="*"
)
})
# Add MCP endpoint
async def mcp_handler(request):
try:
data = await request.json()
result = await self.handle_request(data)
return web.json_response(result)
except json.JSONDecodeError:
return web.json_response({
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32700, "message": "Parse error"}
}, status=400)
except Exception as e:
logger.error("Request processing failed", error=str(e))
return web.json_response({
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32603, "message": "Internal error"}
}, status=500)
# Add routes
app.router.add_post('/mcp', mcp_handler)
app.router.add_get('/health', self._health_check)
# Configure CORS for all routes
for route in list(app.router.routes()):
cors.add(route)
# Start server
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', self.port)
await site.start()
logger.info(f"Weather MCP Server started on port {self.port}")
return runner
async def _health_check(self, request):
"""Health check endpoint."""
return web.json_response({
"status": "healthy",
"service": "Weather MCP Server",
"version": "1.0.0",
"timestamp": datetime.utcnow().isoformat()
})
async def stop_server(self, runner):
"""Stop the MCP server."""
await runner.cleanup()
async def main():
"""Main entry point for the weather server."""
server = WeatherMCPServer()
runner = await server.start_server()
try:
# Keep server running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down weather server...")
await server.stop_server(runner)
if __name__ == "__main__":
asyncio.run(main())