MCP-3 / app.py
cwadayi's picture
Update app.py
6b8fbeb verified
#!/usr/bin/env python3
"""
Earthquake Data MCP Server for Hugging Face Spaces
Provides earthquake data querying and visualization capabilities via both Gradio UI and MCP protocol.
"""
import asyncio
import json
import sqlite3
import pandas as pd
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import base64
import io
from typing import Any, Dict, Tuple
import gradio as gr
import logging
# Configure matplotlib for non-interactive backend
plt.switch_backend('Agg')
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EarthquakeDataService:
"""Core earthquake data service used by both Gradio and MCP interfaces."""
def __init__(self, db_path: str = 'earthquake_data.db'):
self.db_path = db_path
async def query_earthquakes_data(self,
start_date: str = "2024-01-01",
end_date: str = "2024-12-31",
minimum_magnitude: float = 4.0,
limit: int = 1000) -> Dict[str, Any]:
"""Query earthquake data with simplified filters."""
try:
# Combine date with hardcoded time to span the full days
start_datetime_str = f"{start_date.strip()} 00:00:00"
end_datetime_str = f"{end_date.strip()} 23:59:59"
# Connect to database
conn = sqlite3.connect(self.db_path)
# Build query with simplified filters
query = "SELECT * FROM earthquakes WHERE (date || ' ' || time) BETWEEN ? AND ? AND ML >= ?"
params = [start_datetime_str, end_datetime_str, minimum_magnitude]
query += f" LIMIT {limit}"
# Execute query
df = pd.read_sql_query(query, conn, params=tuple(params))
conn.close()
if df.empty:
return {
"success": True,
"message": "No data found for the selected filters",
"count": 0,
"data": [],
"dataframe": pd.DataFrame({"Message": ["No data found for the selected filters."]})
}
# Convert to list of dictionaries for MCP
data = df.to_dict('records')
return {
"success": True,
"count": len(data),
"data": data,
"dataframe": df,
"summary": {
"magnitude_range": [float(df['ML'].min()), float(df['ML'].max())],
"depth_range": [float(df['depth'].min()), float(df['depth'].max())],
"location_bounds": {
"lat_range": [float(df['lat'].min()), float(df['lat'].max())],
"lon_range": [float(df['lon'].min()), float(df['lon'].max())]
}
}
}
except Exception as e:
logger.error(f"Error querying earthquakes: {e}")
return {
"success": False,
"error": str(e),
"dataframe": pd.DataFrame({"Error": [str(e)]})
}
async def create_earthquake_map(self,
start_date: str = "2024-01-01",
end_date: str = "2024-12-31",
minimum_magnitude: float = 4.0,
return_base64: bool = False) -> Dict[str, Any]:
"""Create earthquake distribution map."""
try:
# Get earthquake data first
query_result = await self.query_earthquakes_data(
start_date=start_date,
end_date=end_date,
minimum_magnitude=minimum_magnitude
)
if not query_result.get("success") or query_result.get("count", 0) == 0:
return {"success": False, "error": "No data available for mapping", "figure": None}
df = query_result["dataframe"]
# Define a fixed extent for the map (Taiwan region)
lon_min, lon_max, lat_min, lat_max = 119, 123, 21, 26
# Create map
fig = plt.figure(figsize=(12, 10))
ax = fig.add_subplot(1, 1, 1, projection=ccrs.PlateCarree())
ax.set_extent([lon_min, lon_max, lat_min, lat_max], crs=ccrs.PlateCarree())
# Add map features
ax.add_feature(cfeature.LAND, edgecolor='black', alpha=0.8)
ax.add_feature(cfeature.OCEAN, alpha=0.6)
ax.add_feature(cfeature.COASTLINE, linewidth=0.8)
ax.add_feature(cfeature.BORDERS, linestyle=':', alpha=0.7)
# Create scatter plot
scatter = ax.scatter(
df['lon'], df['lat'], c=df['ML'],
cmap='viridis', alpha=0.7, s=60,
transform=ccrs.PlateCarree(),
edgecolors='black', linewidth=0.5
)
# Add colorbar and title
plt.colorbar(scatter, ax=ax, orientation='vertical',
label='Magnitude (ML)', shrink=0.7, pad=0.05)
ax.set_title(f'Earthquake Distribution Map\n'
f'{start_date} to {end_date} ({len(df)} events)',
fontsize=14, pad=20)
# Add gridlines
ax.gridlines(draw_labels=True, alpha=0.5)
result = {
"success": True,
"earthquake_count": len(df),
"date_range": f"{start_date} to {end_date}",
"bounds": {
"lat_min": lat_min, "lat_max": lat_max,
"lon_min": lon_min, "lon_max": lon_max
},
"figure": fig
}
if return_base64:
# Convert to base64
buffer = io.BytesIO()
plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight')
buffer.seek(0)
image_base64 = base64.b64encode(buffer.getvalue()).decode()
result["map_image_base64"] = image_base64
buffer.close()
plt.close(fig) # Close the figure to free memory
return result
except Exception as e:
logger.error(f"Error creating earthquake map: {e}")
return {"success": False, "error": str(e), "figure": None}
async def get_earthquake_stats(self,
start_date: str = "2024-01-01",
end_date: str = "2024-12-31",
minimum_magnitude: float = 4.0) -> Dict[str, Any]:
"""Get statistical summary of earthquake data."""
try:
query_result = await self.query_earthquakes_data(
start_date=start_date,
end_date=end_date,
minimum_magnitude=minimum_magnitude
)
if not query_result.get("success") or query_result.get("count", 0) == 0:
return {"success": False, "error": "No data available for statistics"}
df = query_result["dataframe"]
stats = {
"success": True,
"total_earthquakes": len(df),
"date_range": {"start": start_date, "end": end_date},
"magnitude_stats": {
"min": float(df['ML'].min()),
"max": float(df['ML'].max()),
"mean": float(df['ML'].mean()),
"median": float(df['ML'].median()),
"std": float(df['ML'].std())
},
"depth_stats": {
"min": float(df['depth'].min()),
"max": float(df['depth'].max()),
"mean": float(df['depth'].mean()),
"median": float(df['depth'].median())
},
"magnitude_distribution": {
"4.0-4.9": len(df[(df['ML'] >= 4.0) & (df['ML'] < 5.0)]),
"5.0-5.9": len(df[(df['ML'] >= 5.0) & (df['ML'] < 6.0)]),
"6.0-6.9": len(df[(df['ML'] >= 6.0) & (df['ML'] < 7.0)]),
"7.0+": len(df[df['ML'] >= 7.0])
},
"depth_distribution": {
"shallow (0-10km)": len(df[(df['depth'] >= 0) & (df['depth'] <= 10)]),
"intermediate (10-70km)": len(df[(df['depth'] > 10) & (df['depth'] <= 70)]),
"deep (>70km)": len(df[df['depth'] > 70])
}
}
return stats
except Exception as e:
logger.error(f"Error getting earthquake stats: {e}")
return {"success": False, "error": str(e)}
class EarthquakeMCPServer:
"""MCP Server component for handling MCP protocol requests."""
def __init__(self, data_service: EarthquakeDataService):
self.data_service = data_service
self.tools = {
"query_earthquakes": {
"description": "Query earthquake data with various filters",
"parameters": {
"type": "object",
"properties": {
"start_date": {"type": "string", "description": "Start date (YYYY-MM-DD)", "default": "2024-01-01"},
"end_date": {"type": "string", "description": "End date (YYYY-MM-DD)", "default": "2024-12-31"},
"minimum_magnitude": {"type": "number", "description": "Minimum magnitude (ML)", "default": 4.0},
"limit": {"type": "integer", "description": "Maximum results", "default": 1000}
},
"required": ["start_date", "end_date", "minimum_magnitude"]
}
},
"create_earthquake_map": {
"description": "Create a map visualization of earthquake data",
"parameters": {
"type": "object",
"properties": {
"start_date": {"type": "string", "default": "2024-01-01"},
"end_date": {"type": "string", "default": "2024-12-31"},
"minimum_magnitude": {"type": "number", "default": 4.0},
"return_base64": {"type": "boolean", "default": True}
},
"required": ["start_date", "end_date", "minimum_magnitude"]
}
},
"get_earthquake_stats": {
"description": "Get statistical summary of earthquake data",
"parameters": {
"type": "object",
"properties": {
"start_date": {"type": "string", "default": "2024-01-01"},
"end_date": {"type": "string", "default": "2024-12-31"},
"minimum_magnitude": {"type": "number", "default": 4.0}
},
"required": ["start_date", "end_date", "minimum_magnitude"]
}
}
}
async def handle_mcp_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MCP protocol requests."""
try:
method = request.get("method")
params = request.get("params", {})
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {
"name": "earthquake-data-server",
"version": "1.2.0" # Version bump
}
}
}
elif method == "tools/list":
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"tools": [
{"name": name, **tool_info}
for name, tool_info in self.tools.items()
]
}
}
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "query_earthquakes":
result = await self.data_service.query_earthquakes_data(**arguments)
result.pop("dataframe", None)
elif tool_name == "create_earthquake_map":
result = await self.data_service.create_earthquake_map(**arguments)
result.pop("figure", None)
elif tool_name == "get_earthquake_stats":
result = await self.data_service.get_earthquake_stats(**arguments)
else:
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}
}
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, indent=2, ensure_ascii=False)
}
]
}
}
else:
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {"code": -32601, "message": f"Unknown method: {method}"}
}
except Exception as e:
logger.error(f"Error handling MCP request: {e}", exc_info=True)
return {
"jsonrpc": "2.0",
"id": request.get("id", 0),
"error": {"code": -32603, "message": str(e)}
}
# Global instances
data_service = EarthquakeDataService()
mcp_server = EarthquakeMCPServer(data_service)
# --- Synchronous wrapper functions for Gradio ---
def fetch_and_plot_data_sync(
start_date: str, end_date: str, minimum_magnitude: float
) -> Tuple[pd.DataFrame, Any]:
"""Synchronous wrapper for Gradio interface function."""
try:
# Create new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Run the async function
query_result = loop.run_until_complete(
data_service.query_earthquakes_data(
start_date=start_date,
end_date=end_date,
minimum_magnitude=minimum_magnitude
)
)
df = query_result.get("dataframe", pd.DataFrame())
if query_result.get("count", 0) == 0:
return df, None
map_result = loop.run_until_complete(
data_service.create_earthquake_map(
start_date=start_date,
end_date=end_date,
minimum_magnitude=minimum_magnitude
)
)
figure = map_result.get("figure")
return df, figure
finally:
loop.close()
except Exception as e:
logger.error(f"Error in fetch_and_plot_data_sync: {e}", exc_info=True)
return pd.DataFrame({"Error": [str(e)]}), None
def handle_mcp_request_gradio_sync(request_json: str) -> str:
"""Synchronous wrapper for handling MCP requests through Gradio interface."""
try:
# Create new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
request = json.loads(request_json)
response = loop.run_until_complete(mcp_server.handle_mcp_request(request))
return json.dumps(response, indent=2, ensure_ascii=False)
finally:
loop.close()
except json.JSONDecodeError as e:
logger.error(f"JSON Parse Error: {e}")
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32700, "message": f"Parse error: {e}"}
}
return json.dumps(error_response, indent=2)
except Exception as e:
logger.error(f"Error in handle_mcp_request_gradio_sync: {e}", exc_info=True)
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32603, "message": f"Internal server error: {e}"}
}
return json.dumps(error_response, indent=2)
# --- Gradio Interface ---
with gr.Blocks(title="Earthquake Data MCP Server", theme=gr.themes.Soft()) as app:
gr.Markdown("# 🌍 Earthquake Data Explorer & MCP Server")
gr.Markdown("This application provides a simplified web interface and MCP server for earthquake data analysis.")
with gr.Tabs():
# Data Explorer Tab
with gr.TabItem("πŸ“Š Data Explorer"):
gr.Markdown("### Use the filters below to search the earthquake catalog and visualize the distribution.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("#### Date Range")
start_date_input = gr.Textbox(label="Start Date (YYYY-MM-DD)", value="2024-01-01")
end_date_input = gr.Textbox(label="End Date (YYYY-MM-DD)", value="2024-12-31")
with gr.Column(scale=1):
gr.Markdown("#### Magnitude Filter")
minimum_magnitude_input = gr.Number(label="Minimum Magnitude (ML)", value=4.0)
filter_button = gr.Button("πŸ” Filter and Plot Data", variant="primary", size="lg")
with gr.Row():
with gr.Column(scale=2):
output_plot = gr.Plot(label="Earthquake Distribution Map")
with gr.Column(scale=3):
output_df = gr.DataFrame(label="Filtered Results")
# Use synchronous wrapper function
filter_button.click(
fn=fetch_and_plot_data_sync,
inputs=[
start_date_input, end_date_input, minimum_magnitude_input
],
outputs=[output_df, output_plot]
)
# MCP Interface Tab
with gr.TabItem("πŸ”Œ MCP Interface"):
gr.Markdown("### Model Context Protocol (MCP) Interface")
gr.Markdown("Interact with the MCP server directly. **Available Tools:** `query_earthquakes`, `create_earthquake_map`, `get_earthquake_stats`")
with gr.Row():
with gr.Column():
mcp_request_input = gr.Code(
label="MCP Request (JSON-RPC)",
language="json",
value='{\n "jsonrpc": "2.0",\n "id": 1,\n "method": "tools/list",\n "params": {}\n}'
)
mcp_submit_button = gr.Button("πŸ“€ Send MCP Request", variant="primary")
with gr.Column():
mcp_response_output = gr.Code(label="MCP Response", language="json")
# Use synchronous wrapper function
mcp_submit_button.click(
fn=handle_mcp_request_gradio_sync,
inputs=[mcp_request_input],
outputs=[mcp_response_output]
)
gr.Markdown("#### Example Requests:")
example_requests = [
("List Tools", '{\n "jsonrpc": "2.0",\n "id": 2,\n "method": "tools/list",\n "params": {}\n}'),
("Query Earthquakes", '{\n "jsonrpc": "2.0",\n "id": 3,\n "method": "tools/call",\n "params": {\n "name": "query_earthquakes",\n "arguments": {\n "start_date": "2024-04-01",\n "end_date": "2024-04-30",\n "minimum_magnitude": 5.0\n }\n }\n}'),
("Get Statistics", '{\n "jsonrpc": "2.0",\n "id": 4,\n "method": "tools/call",\n "params": {\n "name": "get_earthquake_stats",\n "arguments": {\n "start_date": "2024-01-01",\n "end_date": "2024-06-30",\n "minimum_magnitude": 6.0\n }\n }\n}')
]
for title, request in example_requests:
with gr.Accordion(title, open=False):
gr.Code(value=request, language="json")
# API Documentation Tab
with gr.TabItem("πŸ“š API Documentation"):
gr.Markdown("""
# API Documentation
## MCP Server Tools
### 1. query_earthquakes
Query earthquake data.
**Parameters:**
- `start_date` (string): Start date in YYYY-MM-DD format.
- `end_date` (string): End date in YYYY-MM-DD format.
- `minimum_magnitude` (number): Minimum earthquake magnitude (ML).
- `limit` (integer, optional): Maximum number of results.
### 2. create_earthquake_map
Create a visualization map of earthquake data.
**Parameters:** Same as `query_earthquakes` plus:
- `return_base64` (boolean, optional): If true, returns map as base64 encoded image.
### 3. get_earthquake_stats
Get a statistical summary of earthquake data.
**Parameters:**
- `start_date` (string): Start date range.
- `end_date` (string): End date range.
- `minimum_magnitude` (number): Minimum magnitude to include in stats.
""")
# --- Main Application ---
if __name__ == "__main__":
import os
port = int(os.environ.get("PORT", 7860))
logger.info("Starting Earthquake Data MCP Server...")
logger.info(f"Server will be available at: http://0.0.0.0:{port}")
app.launch(server_name="0.0.0.0", server_port=port, mcp_server=True)