MCP-2 / app.py
cwadayi's picture
Update app.py
767b60b verified
#!/usr/bin/env python3
"""
Earthquake Data MCP Server for Hugging Face Spaces
Provides earthquake data querying and visualization capabilities via both Gradio UI and MCP protocol.
"""
import asyncio
import json
import sqlite3
import pandas as pd
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import base64
import io
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import gradio as gr
import logging
import threading
import queue
import time
# Configure matplotlib for non-interactive backend
plt.switch_backend('Agg')
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EarthquakeDataService:
"""Core earthquake data service used by both Gradio and MCP interfaces."""
def __init__(self, db_path: str = 'earthquake_data.db'):
self.db_path = db_path
async def query_earthquakes_data(self,
start_date: str = "2024-01-01",
start_time: str = "00:00:00",
end_date: str = "2024-12-31",
end_time: str = "23:59:59",
lat_min: float = 21,
lat_max: float = 26,
lon_min: float = 119,
lon_max: float = 123,
depth_min: float = 0,
depth_max: float = 100,
ML_min: float = 4.5,
ML_max: float = 8,
limit: int = 1000) -> Dict[str, Any]:
"""Query earthquake data with filters."""
try:
# Combine date and time
start_datetime_str = f"{start_date.strip()} {start_time.strip()}"
end_datetime_str = f"{end_date.strip()} {end_time.strip()}"
# Connect to database
conn = sqlite3.connect(self.db_path)
# Build query
query = "SELECT * FROM earthquakes WHERE (date || ' ' || time) BETWEEN ? AND ?"
params = [start_datetime_str, end_datetime_str]
# Add filters
filters = {
"lat BETWEEN ? AND ?": (lat_min, lat_max),
"lon BETWEEN ? AND ?": (lon_min, lon_max),
"depth BETWEEN ? AND ?": (depth_min, depth_max),
"ML BETWEEN ? AND ?": (ML_min, ML_max),
}
for condition, values in filters.items():
if values[0] is not None and values[1] is not None:
query += f" AND {condition}"
params.extend(values)
query += f" LIMIT {limit}"
# Execute query
df = pd.read_sql_query(query, conn, params=tuple(params))
conn.close()
if df.empty:
return {
"success": True,
"message": "No data found for the selected filters",
"count": 0,
"data": [],
"dataframe": pd.DataFrame({"Message": ["No data found for the selected filters."]})
}
# Convert to list of dictionaries for MCP
data = df.to_dict('records')
return {
"success": True,
"count": len(data),
"data": data,
"dataframe": df,
"summary": {
"magnitude_range": [float(df['ML'].min()), float(df['ML'].max())],
"depth_range": [float(df['depth'].min()), float(df['depth'].max())],
"location_bounds": {
"lat_range": [float(df['lat'].min()), float(df['lat'].max())],
"lon_range": [float(df['lon'].min()), float(df['lon'].max())]
}
}
}
except Exception as e:
logger.error(f"Error querying earthquakes: {e}")
return {
"success": False,
"error": str(e),
"dataframe": pd.DataFrame({"Error": [str(e)]})
}
async def create_earthquake_map(self,
start_date: str = "2024-01-01",
start_time: str = "00:00:00",
end_date: str = "2024-12-31",
end_time: str = "23:59:59",
lat_min: float = 21,
lat_max: float = 26,
lon_min: float = 119,
lon_max: float = 123,
depth_min: float = 0,
depth_max: float = 100,
ML_min: float = 4.5,
ML_max: float = 8,
return_base64: bool = False) -> Dict[str, Any]:
"""Create earthquake distribution map."""
try:
# Get earthquake data first
query_result = await self.query_earthquakes_data(
start_date, start_time, end_date, end_time,
lat_min, lat_max, lon_min, lon_max,
depth_min, depth_max, ML_min, ML_max
)
if not query_result.get("success") or query_result.get("count", 0) == 0:
return {"success": False, "error": "No data available for mapping", "figure": None}
df = query_result["dataframe"]
# Create map
fig = plt.figure(figsize=(12, 10))
ax = fig.add_subplot(1, 1, 1, projection=ccrs.PlateCarree())
ax.set_extent([lon_min, lon_max, lat_min, lat_max], crs=ccrs.PlateCarree())
# Add map features
ax.add_feature(cfeature.LAND, edgecolor='black', alpha=0.8)
ax.add_feature(cfeature.OCEAN, alpha=0.6)
ax.add_feature(cfeature.COASTLINE, linewidth=0.8)
ax.add_feature(cfeature.BORDERS, linestyle=':', alpha=0.7)
# Create scatter plot
scatter = ax.scatter(
df['lon'], df['lat'], c=df['ML'],
cmap='viridis', alpha=0.7, s=60,
transform=ccrs.PlateCarree(),
edgecolors='black', linewidth=0.5
)
# Add colorbar and title
plt.colorbar(scatter, ax=ax, orientation='vertical',
label='Magnitude (ML)', shrink=0.7, pad=0.05)
ax.set_title(f'Earthquake Distribution Map\n'
f'{start_date} to {end_date} ({len(df)} events)',
fontsize=14, pad=20)
# Add gridlines
ax.gridlines(draw_labels=True, alpha=0.5)
result = {
"success": True,
"earthquake_count": len(df),
"date_range": f"{start_date} to {end_date}",
"bounds": {
"lat_min": lat_min, "lat_max": lat_max,
"lon_min": lon_min, "lon_max": lon_max
},
"figure": fig
}
if return_base64:
# Convert to base64
buffer = io.BytesIO()
plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight')
buffer.seek(0)
image_base64 = base64.b64encode(buffer.getvalue()).decode()
result["map_image_base64"] = image_base64
buffer.close()
return result
except Exception as e:
logger.error(f"Error creating earthquake map: {e}")
return {"success": False, "error": str(e), "figure": None}
async def get_earthquake_stats(self,
start_date: str = "2024-01-01",
end_date: str = "2024-12-31",
lat_min: float = 21,
lat_max: float = 26,
lon_min: float = 119,
lon_max: float = 123) -> Dict[str, Any]:
"""Get statistical summary of earthquake data."""
try:
query_result = await self.query_earthquakes_data(
start_date=start_date, end_date=end_date,
lat_min=lat_min, lat_max=lat_max,
lon_min=lon_min, lon_max=lon_max
)
if not query_result.get("success") or query_result.get("count", 0) == 0:
return {"success": False, "error": "No data available for statistics"}
df = query_result["dataframe"]
stats = {
"success": True,
"total_earthquakes": len(df),
"date_range": {"start": start_date, "end": end_date},
"magnitude_stats": {
"min": float(df['ML'].min()),
"max": float(df['ML'].max()),
"mean": float(df['ML'].mean()),
"median": float(df['ML'].median()),
"std": float(df['ML'].std())
},
"depth_stats": {
"min": float(df['depth'].min()),
"max": float(df['depth'].max()),
"mean": float(df['depth'].mean()),
"median": float(df['depth'].median())
},
"magnitude_distribution": {
"4.0-4.9": len(df[(df['ML'] >= 4.0) & (df['ML'] < 5.0)]),
"5.0-5.9": len(df[(df['ML'] >= 5.0) & (df['ML'] < 6.0)]),
"6.0-6.9": len(df[(df['ML'] >= 6.0) & (df['ML'] < 7.0)]),
"7.0+": len(df[df['ML'] >= 7.0])
},
"depth_distribution": {
"shallow (0-10km)": len(df[(df['depth'] >= 0) & (df['depth'] <= 10)]),
"intermediate (10-70km)": len(df[(df['depth'] > 10) & (df['depth'] <= 70)]),
"deep (>70km)": len(df[df['depth'] > 70])
}
}
return stats
except Exception as e:
logger.error(f"Error getting earthquake stats: {e}")
return {"success": False, "error": str(e)}
class EarthquakeMCPServer:
"""MCP Server component for handling MCP protocol requests."""
def __init__(self, data_service: EarthquakeDataService):
self.data_service = data_service
self.tools = {
"query_earthquakes": {
"description": "Query earthquake data with various filters",
"parameters": {
"type": "object",
"properties": {
"start_date": {"type": "string", "description": "Start date (YYYY-MM-DD)", "default": "2024-01-01"},
"start_time": {"type": "string", "description": "Start time (HH:MM:SS)", "default": "00:00:00"},
"end_date": {"type": "string", "description": "End date (YYYY-MM-DD)", "default": "2024-12-31"},
"end_time": {"type": "string", "description": "End time (HH:MM:SS)", "default": "23:59:59"},
"lat_min": {"type": "number", "description": "Minimum latitude", "default": 21},
"lat_max": {"type": "number", "description": "Maximum latitude", "default": 26},
"lon_min": {"type": "number", "description": "Minimum longitude", "default": 119},
"lon_max": {"type": "number", "description": "Maximum longitude", "default": 123},
"depth_min": {"type": "number", "description": "Minimum depth (km)", "default": 0},
"depth_max": {"type": "number", "description": "Maximum depth (km)", "default": 100},
"ML_min": {"type": "number", "description": "Minimum magnitude", "default": 4.5},
"ML_max": {"type": "number", "description": "Maximum magnitude", "default": 8},
"limit": {"type": "integer", "description": "Maximum results", "default": 1000}
}
}
},
"create_earthquake_map": {
"description": "Create a map visualization of earthquake data",
"parameters": {
"type": "object",
"properties": {
"start_date": {"type": "string", "default": "2024-01-01"},
"start_time": {"type": "string", "default": "00:00:00"},
"end_date": {"type": "string", "default": "2024-12-31"},
"end_time": {"type": "string", "default": "23:59:59"},
"lat_min": {"type": "number", "default": 21},
"lat_max": {"type": "number", "default": 26},
"lon_min": {"type": "number", "default": 119},
"lon_max": {"type": "number", "default": 123},
"depth_min": {"type": "number", "default": 0},
"depth_max": {"type": "number", "default": 100},
"ML_min": {"type": "number", "default": 4.5},
"ML_max": {"type": "number", "default": 8},
"return_base64": {"type": "boolean", "default": True}
}
}
},
"get_earthquake_stats": {
"description": "Get statistical summary of earthquake data",
"parameters": {
"type": "object",
"properties": {
"start_date": {"type": "string", "default": "2024-01-01"},
"end_date": {"type": "string", "default": "2024-12-31"},
"lat_min": {"type": "number", "default": 21},
"lat_max": {"type": "number", "default": 26},
"lon_min": {"type": "number", "default": 119},
"lon_max": {"type": "number", "default": 123}
}
}
}
}
async def handle_mcp_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MCP protocol requests."""
try:
method = request.get("method")
params = request.get("params", {})
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {
"name": "earthquake-data-server",
"version": "1.0.0"
}
}
}
elif method == "tools/list":
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"tools": [
{"name": name, **tool_info}
for name, tool_info in self.tools.items()
]
}
}
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "query_earthquakes":
result = await self.data_service.query_earthquakes_data(**arguments)
# Remove dataframe for JSON serialization
result.pop("dataframe", None)
elif tool_name == "create_earthquake_map":
result = await self.data_service.create_earthquake_map(**arguments)
# Remove figure for JSON serialization
result.pop("figure", None)
elif tool_name == "get_earthquake_stats":
result = await self.data_service.get_earthquake_stats(**arguments)
else:
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}
}
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, indent=2, ensure_ascii=False)
}
]
}
}
else:
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {"code": -32601, "message": f"Unknown method: {method}"}
}
except Exception as e:
logger.error(f"Error handling MCP request: {e}")
return {
"jsonrpc": "2.0",
"id": request.get("id", 0),
"error": {"code": -32603, "message": str(e)}
}
# Global data service instance
data_service = EarthquakeDataService()
mcp_server = EarthquakeMCPServer(data_service)
# --- Gradio Interface Functions ---
async def fetch_and_plot_data(
start_date, start_time, end_date, end_time,
lat_min, lat_max, lon_min, lon_max,
depth_min, depth_max, ML_min, ML_max
) -> Tuple[pd.DataFrame, Any]:
"""Gradio interface function for fetching and plotting data."""
try:
# Query data
query_result = await data_service.query_earthquakes_data(
start_date, start_time, end_date, end_time,
lat_min, lat_max, lon_min, lon_max,
depth_min, depth_max, ML_min, ML_max
)
df = query_result.get("dataframe", pd.DataFrame())
if query_result.get("count", 0) == 0:
return df, None
# Create map
map_result = await data_service.create_earthquake_map(
start_date, start_time, end_date, end_time,
lat_min, lat_max, lon_min, lon_max,
depth_min, depth_max, ML_min, ML_max
)
figure = map_result.get("figure")
return df, figure
except Exception as e:
logger.error(f"Error in fetch_and_plot_data: {e}")
return pd.DataFrame({"Error": [str(e)]}), None
def gradio_fetch_and_plot_data(*args):
"""Synchronous wrapper for Gradio."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(fetch_and_plot_data(*args))
finally:
loop.close()
async def handle_mcp_request_gradio(request_json: str) -> str:
"""Handle MCP requests through Gradio interface."""
try:
request = json.loads(request_json)
response = await mcp_server.handle_mcp_request(request)
return json.dumps(response, indent=2, ensure_ascii=False)
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32700, "message": f"Parse error: {str(e)}"}
}
return json.dumps(error_response, indent=2)
def gradio_handle_mcp_request(request_json: str) -> str:
"""Synchronous wrapper for MCP requests in Gradio."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(handle_mcp_request_gradio(request_json))
finally:
loop.close()
# --- Gradio Interface ---
with gr.Blocks(title="Earthquake Data MCP Server", theme=gr.themes.Soft()) as app:
gr.Markdown("# 🌍 Earthquake Data Explorer & MCP Server")
gr.Markdown("This application provides both a web interface and MCP server for earthquake data analysis.")
with gr.Tabs():
# Data Explorer Tab
with gr.TabItem("πŸ“Š Data Explorer"):
gr.Markdown("### Use the filters below to search the earthquake catalog and visualize the distribution.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("#### Date & Time Range")
start_date_input = gr.Textbox(label="Start Date", value="2024-01-01")
start_time_input = gr.Textbox(label="Start Time (HH:MM:SS)", placeholder="00:00:00")
end_date_input = gr.Textbox(label="End Date", value="2024-12-31")
end_time_input = gr.Textbox(label="End Time (HH:MM:SS)", placeholder="23:59:59")
with gr.Column(scale=1):
gr.Markdown("#### Geographical & Physical Filters")
lon_min_input = gr.Number(label="Longitude From", value=119)
lon_max_input = gr.Number(label="To", value=123)
lat_min_input = gr.Number(label="Latitude From", value=21)
lat_max_input = gr.Number(label="To", value=26)
depth_min_input = gr.Number(label="Depth From", value=0)
depth_max_input = gr.Number(label="To", value=100)
ML_min_input = gr.Number(label="Magnitude From", value=4.5)
ML_max_input = gr.Number(label="To", value=8)
filter_button = gr.Button("πŸ” Filter and Plot Data", variant="primary", size="lg")
with gr.Row():
with gr.Column(scale=2):
output_plot = gr.Plot(label="Earthquake Distribution Map")
with gr.Column(scale=3):
output_df = gr.DataFrame(label="Filtered Results")
filter_button.click(
fn=gradio_fetch_and_plot_data,
inputs=[
start_date_input, start_time_input, end_date_input, end_time_input,
lat_min_input, lat_max_input, lon_min_input, lon_max_input,
depth_min_input, depth_max_input, ML_min_input, ML_max_input
],
outputs=[output_df, output_plot]
)
# MCP Interface Tab
with gr.TabItem("πŸ”Œ MCP Interface"):
gr.Markdown("### Model Context Protocol (MCP) Interface")
gr.Markdown("""
This tab allows you to interact with the MCP server directly. Send JSON-RPC requests to test the MCP functionality.
**Available Methods:**
- `initialize`: Initialize the MCP connection
- `tools/list`: List available tools
- `tools/call`: Call a specific tool
**Available Tools:**
- `query_earthquakes`: Query earthquake data
- `create_earthquake_map`: Create earthquake maps
- `get_earthquake_stats`: Get earthquake statistics
""")
with gr.Row():
with gr.Column():
mcp_request_input = gr.Code(
label="MCP Request (JSON-RPC)",
language="json",
value='{\n "jsonrpc": "2.0",\n "id": 1,\n "method": "tools/list",\n "params": {}\n}'
)
mcp_submit_button = gr.Button("πŸ“€ Send MCP Request", variant="primary")
with gr.Column():
mcp_response_output = gr.Code(
label="MCP Response",
language="json"
)
mcp_submit_button.click(
fn=gradio_handle_mcp_request,
inputs=[mcp_request_input],
outputs=[mcp_response_output]
)
# Example requests
gr.Markdown("#### Example Requests:")
example_requests = [
("Initialize Connection", '{\n "jsonrpc": "2.0",\n "id": 1,\n "method": "initialize",\n "params": {\n "protocolVersion": "2024-11-05",\n "capabilities": {},\n "clientInfo": {"name": "web-client", "version": "1.0.0"}\n }\n}'),
("List Tools", '{\n "jsonrpc": "2.0",\n "id": 2,\n "method": "tools/list",\n "params": {}\n}'),
("Query Earthquakes", '{\n "jsonrpc": "2.0",\n "id": 3,\n "method": "tools/call",\n "params": {\n "name": "query_earthquakes",\n "arguments": {\n "start_date": "2024-01-01",\n "end_date": "2024-01-31",\n "ML_min": 5.0,\n "limit": 10\n }\n }\n}'),
("Get Statistics", '{\n "jsonrpc": "2.0",\n "id": 4,\n "method": "tools/call",\n "params": {\n "name": "get_earthquake_stats",\n "arguments": {\n "start_date": "2024-01-01",\n "end_date": "2024-03-31"\n }\n }\n}')
]
for title, request in example_requests:
with gr.Accordion(title, open=False):
gr.Code(value=request, language="json")
# API Documentation Tab
with gr.TabItem("πŸ“š API Documentation"):
gr.Markdown("""
# API Documentation
## MCP Server Endpoint
**URL:** `https://your-space-name.hf.space` (when deployed)
## Available Tools
### 1. query_earthquakes
Query earthquake data with various filters.
**Parameters:**
- `start_date` (string): Start date in YYYY-MM-DD format
- `start_time` (string): Start time in HH:MM:SS format
- `end_date` (string): End date in YYYY-MM-DD format
- `end_time` (string): End time in HH:MM:SS format
- `lat_min`, `lat_max` (number): Latitude range
- `lon_min`, `lon_max` (number): Longitude range
- `depth_min`, `depth_max` (number): Depth range in km
- `ML_min`, `ML_max` (number): Magnitude range
- `limit` (integer): Maximum number of results
### 2. create_earthquake_map
Create a visualization map of earthquake data.
**Parameters:** Same as query_earthquakes plus:
- `return_base64` (boolean): Return map as base64 encoded image
### 3. get_earthquake_stats
Get statistical summary of earthquake data.
**Parameters:**
- `start_date`, `end_date` (string): Date range
- `lat_min`, `lat_max`, `lon_min`, `lon_max` (number): Geographic bounds
## Integration with AI Assistants
To use this MCP server with Claude Desktop or other MCP clients, add this configuration:
```json
{
"mcpServers": {
"earthquake-data": {
"command": "python",
"args": ["path/to/earthquake_mcp_server.py"],
"env": {
"EARTHQUAKE_DB_PATH": "earthquake_data.db"
}
}
}
}
```
For web-based integration, make HTTP requests to the deployed Hugging Face Space URL.
""")
# --- Main Application ---
if __name__ == "__main__":
# Check if running in Hugging Face Spaces
import os
port = int(os.environ.get("PORT", 7860))
logger.info("Starting Earthquake Data MCP Server...")
logger.info(f"Server will be available at: http://0.0.0.0:{port}")
app.launch(
server_name="0.0.0.0",
server_port=port,
share=True,
show_api=True,
mcp_server=True
)