#!/usr/bin/env python3 """ Earthquake Data MCP Server for Hugging Face Spaces Provides earthquake data querying and visualization capabilities via both Gradio UI and MCP protocol. """ import asyncio import json import sqlite3 import pandas as pd import matplotlib.pyplot as plt import cartopy.crs as ccrs import cartopy.feature as cfeature import base64 import io from typing import Any, Dict, Tuple import gradio as gr import logging # Configure matplotlib for non-interactive backend plt.switch_backend('Agg') # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class EarthquakeDataService: """Core earthquake data service used by both Gradio and MCP interfaces.""" def __init__(self, db_path: str = 'earthquake_data.db'): self.db_path = db_path async def query_earthquakes_data(self, start_date: str = "2024-01-01", end_date: str = "2024-12-31", minimum_magnitude: float = 4.0, limit: int = 1000) -> Dict[str, Any]: """Query earthquake data with simplified filters.""" try: # Combine date with hardcoded time to span the full days start_datetime_str = f"{start_date.strip()} 00:00:00" end_datetime_str = f"{end_date.strip()} 23:59:59" # Connect to database conn = sqlite3.connect(self.db_path) # Build query with simplified filters query = "SELECT * FROM earthquakes WHERE (date || ' ' || time) BETWEEN ? AND ? AND ML >= ?" params = [start_datetime_str, end_datetime_str, minimum_magnitude] query += f" LIMIT {limit}" # Execute query df = pd.read_sql_query(query, conn, params=tuple(params)) conn.close() if df.empty: return { "success": True, "message": "No data found for the selected filters", "count": 0, "data": [], "dataframe": pd.DataFrame({"Message": ["No data found for the selected filters."]}) } # Convert to list of dictionaries for MCP data = df.to_dict('records') return { "success": True, "count": len(data), "data": data, "dataframe": df, "summary": { "magnitude_range": [float(df['ML'].min()), float(df['ML'].max())], "depth_range": [float(df['depth'].min()), float(df['depth'].max())], "location_bounds": { "lat_range": [float(df['lat'].min()), float(df['lat'].max())], "lon_range": [float(df['lon'].min()), float(df['lon'].max())] } } } except Exception as e: logger.error(f"Error querying earthquakes: {e}") return { "success": False, "error": str(e), "dataframe": pd.DataFrame({"Error": [str(e)]}) } async def create_earthquake_map(self, start_date: str = "2024-01-01", end_date: str = "2024-12-31", minimum_magnitude: float = 4.0, return_base64: bool = False) -> Dict[str, Any]: """Create earthquake distribution map.""" try: # Get earthquake data first query_result = await self.query_earthquakes_data( start_date=start_date, end_date=end_date, minimum_magnitude=minimum_magnitude ) if not query_result.get("success") or query_result.get("count", 0) == 0: return {"success": False, "error": "No data available for mapping", "figure": None} df = query_result["dataframe"] # Define a fixed extent for the map (Taiwan region) lon_min, lon_max, lat_min, lat_max = 119, 123, 21, 26 # Create map fig = plt.figure(figsize=(12, 10)) ax = fig.add_subplot(1, 1, 1, projection=ccrs.PlateCarree()) ax.set_extent([lon_min, lon_max, lat_min, lat_max], crs=ccrs.PlateCarree()) # Add map features ax.add_feature(cfeature.LAND, edgecolor='black', alpha=0.8) ax.add_feature(cfeature.OCEAN, alpha=0.6) ax.add_feature(cfeature.COASTLINE, linewidth=0.8) ax.add_feature(cfeature.BORDERS, linestyle=':', alpha=0.7) # Create scatter plot scatter = ax.scatter( df['lon'], df['lat'], c=df['ML'], cmap='viridis', alpha=0.7, s=60, transform=ccrs.PlateCarree(), edgecolors='black', linewidth=0.5 ) # Add colorbar and title plt.colorbar(scatter, ax=ax, orientation='vertical', label='Magnitude (ML)', shrink=0.7, pad=0.05) ax.set_title(f'Earthquake Distribution Map\n' f'{start_date} to {end_date} ({len(df)} events)', fontsize=14, pad=20) # Add gridlines ax.gridlines(draw_labels=True, alpha=0.5) result = { "success": True, "earthquake_count": len(df), "date_range": f"{start_date} to {end_date}", "bounds": { "lat_min": lat_min, "lat_max": lat_max, "lon_min": lon_min, "lon_max": lon_max }, "figure": fig } if return_base64: # Convert to base64 buffer = io.BytesIO() plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight') buffer.seek(0) image_base64 = base64.b64encode(buffer.getvalue()).decode() result["map_image_base64"] = image_base64 buffer.close() plt.close(fig) # Close the figure to free memory return result except Exception as e: logger.error(f"Error creating earthquake map: {e}") return {"success": False, "error": str(e), "figure": None} async def get_earthquake_stats(self, start_date: str = "2024-01-01", end_date: str = "2024-12-31", minimum_magnitude: float = 4.0) -> Dict[str, Any]: """Get statistical summary of earthquake data.""" try: query_result = await self.query_earthquakes_data( start_date=start_date, end_date=end_date, minimum_magnitude=minimum_magnitude ) if not query_result.get("success") or query_result.get("count", 0) == 0: return {"success": False, "error": "No data available for statistics"} df = query_result["dataframe"] stats = { "success": True, "total_earthquakes": len(df), "date_range": {"start": start_date, "end": end_date}, "magnitude_stats": { "min": float(df['ML'].min()), "max": float(df['ML'].max()), "mean": float(df['ML'].mean()), "median": float(df['ML'].median()), "std": float(df['ML'].std()) }, "depth_stats": { "min": float(df['depth'].min()), "max": float(df['depth'].max()), "mean": float(df['depth'].mean()), "median": float(df['depth'].median()) }, "magnitude_distribution": { "4.0-4.9": len(df[(df['ML'] >= 4.0) & (df['ML'] < 5.0)]), "5.0-5.9": len(df[(df['ML'] >= 5.0) & (df['ML'] < 6.0)]), "6.0-6.9": len(df[(df['ML'] >= 6.0) & (df['ML'] < 7.0)]), "7.0+": len(df[df['ML'] >= 7.0]) }, "depth_distribution": { "shallow (0-10km)": len(df[(df['depth'] >= 0) & (df['depth'] <= 10)]), "intermediate (10-70km)": len(df[(df['depth'] > 10) & (df['depth'] <= 70)]), "deep (>70km)": len(df[df['depth'] > 70]) } } return stats except Exception as e: logger.error(f"Error getting earthquake stats: {e}") return {"success": False, "error": str(e)} class EarthquakeMCPServer: """MCP Server component for handling MCP protocol requests.""" def __init__(self, data_service: EarthquakeDataService): self.data_service = data_service self.tools = { "query_earthquakes": { "description": "Query earthquake data with various filters", "parameters": { "type": "object", "properties": { "start_date": {"type": "string", "description": "Start date (YYYY-MM-DD)", "default": "2024-01-01"}, "end_date": {"type": "string", "description": "End date (YYYY-MM-DD)", "default": "2024-12-31"}, "minimum_magnitude": {"type": "number", "description": "Minimum magnitude (ML)", "default": 4.0}, "limit": {"type": "integer", "description": "Maximum results", "default": 1000} }, "required": ["start_date", "end_date", "minimum_magnitude"] } }, "create_earthquake_map": { "description": "Create a map visualization of earthquake data", "parameters": { "type": "object", "properties": { "start_date": {"type": "string", "default": "2024-01-01"}, "end_date": {"type": "string", "default": "2024-12-31"}, "minimum_magnitude": {"type": "number", "default": 4.0}, "return_base64": {"type": "boolean", "default": True} }, "required": ["start_date", "end_date", "minimum_magnitude"] } }, "get_earthquake_stats": { "description": "Get statistical summary of earthquake data", "parameters": { "type": "object", "properties": { "start_date": {"type": "string", "default": "2024-01-01"}, "end_date": {"type": "string", "default": "2024-12-31"}, "minimum_magnitude": {"type": "number", "default": 4.0} }, "required": ["start_date", "end_date", "minimum_magnitude"] } } } async def handle_mcp_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handle MCP protocol requests.""" try: method = request.get("method") params = request.get("params", {}) if method == "initialize": return { "jsonrpc": "2.0", "id": request.get("id"), "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": { "name": "earthquake-data-server", "version": "1.2.0" # Version bump } } } elif method == "tools/list": return { "jsonrpc": "2.0", "id": request.get("id"), "result": { "tools": [ {"name": name, **tool_info} for name, tool_info in self.tools.items() ] } } elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name == "query_earthquakes": result = await self.data_service.query_earthquakes_data(**arguments) result.pop("dataframe", None) elif tool_name == "create_earthquake_map": result = await self.data_service.create_earthquake_map(**arguments) result.pop("figure", None) elif tool_name == "get_earthquake_stats": result = await self.data_service.get_earthquake_stats(**arguments) else: return { "jsonrpc": "2.0", "id": request.get("id"), "error": {"code": -32601, "message": f"Unknown tool: {tool_name}"} } return { "jsonrpc": "2.0", "id": request.get("id"), "result": { "content": [ { "type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False) } ] } } else: return { "jsonrpc": "2.0", "id": request.get("id"), "error": {"code": -32601, "message": f"Unknown method: {method}"} } except Exception as e: logger.error(f"Error handling MCP request: {e}", exc_info=True) return { "jsonrpc": "2.0", "id": request.get("id", 0), "error": {"code": -32603, "message": str(e)} } # Global instances data_service = EarthquakeDataService() mcp_server = EarthquakeMCPServer(data_service) # --- Synchronous wrapper functions for Gradio --- def fetch_and_plot_data_sync( start_date: str, end_date: str, minimum_magnitude: float ) -> Tuple[pd.DataFrame, Any]: """Synchronous wrapper for Gradio interface function.""" try: # Create new event loop for this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: # Run the async function query_result = loop.run_until_complete( data_service.query_earthquakes_data( start_date=start_date, end_date=end_date, minimum_magnitude=minimum_magnitude ) ) df = query_result.get("dataframe", pd.DataFrame()) if query_result.get("count", 0) == 0: return df, None map_result = loop.run_until_complete( data_service.create_earthquake_map( start_date=start_date, end_date=end_date, minimum_magnitude=minimum_magnitude ) ) figure = map_result.get("figure") return df, figure finally: loop.close() except Exception as e: logger.error(f"Error in fetch_and_plot_data_sync: {e}", exc_info=True) return pd.DataFrame({"Error": [str(e)]}), None def handle_mcp_request_gradio_sync(request_json: str) -> str: """Synchronous wrapper for handling MCP requests through Gradio interface.""" try: # Create new event loop for this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: request = json.loads(request_json) response = loop.run_until_complete(mcp_server.handle_mcp_request(request)) return json.dumps(response, indent=2, ensure_ascii=False) finally: loop.close() except json.JSONDecodeError as e: logger.error(f"JSON Parse Error: {e}") error_response = { "jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": f"Parse error: {e}"} } return json.dumps(error_response, indent=2) except Exception as e: logger.error(f"Error in handle_mcp_request_gradio_sync: {e}", exc_info=True) error_response = { "jsonrpc": "2.0", "id": None, "error": {"code": -32603, "message": f"Internal server error: {e}"} } return json.dumps(error_response, indent=2) # --- Gradio Interface --- with gr.Blocks(title="Earthquake Data MCP Server", theme=gr.themes.Soft()) as app: gr.Markdown("# 🌍 Earthquake Data Explorer & MCP Server") gr.Markdown("This application provides a simplified web interface and MCP server for earthquake data analysis.") with gr.Tabs(): # Data Explorer Tab with gr.TabItem("📊 Data Explorer"): gr.Markdown("### Use the filters below to search the earthquake catalog and visualize the distribution.") with gr.Row(): with gr.Column(scale=1): gr.Markdown("#### Date Range") start_date_input = gr.Textbox(label="Start Date (YYYY-MM-DD)", value="2024-01-01") end_date_input = gr.Textbox(label="End Date (YYYY-MM-DD)", value="2024-12-31") with gr.Column(scale=1): gr.Markdown("#### Magnitude Filter") minimum_magnitude_input = gr.Number(label="Minimum Magnitude (ML)", value=4.0) filter_button = gr.Button("🔍 Filter and Plot Data", variant="primary", size="lg") with gr.Row(): with gr.Column(scale=2): output_plot = gr.Plot(label="Earthquake Distribution Map") with gr.Column(scale=3): output_df = gr.DataFrame(label="Filtered Results") # Use synchronous wrapper function filter_button.click( fn=fetch_and_plot_data_sync, inputs=[ start_date_input, end_date_input, minimum_magnitude_input ], outputs=[output_df, output_plot] ) # MCP Interface Tab with gr.TabItem("🔌 MCP Interface"): gr.Markdown("### Model Context Protocol (MCP) Interface") gr.Markdown("Interact with the MCP server directly. **Available Tools:** `query_earthquakes`, `create_earthquake_map`, `get_earthquake_stats`") with gr.Row(): with gr.Column(): mcp_request_input = gr.Code( label="MCP Request (JSON-RPC)", language="json", value='{\n "jsonrpc": "2.0",\n "id": 1,\n "method": "tools/list",\n "params": {}\n}' ) mcp_submit_button = gr.Button("📤 Send MCP Request", variant="primary") with gr.Column(): mcp_response_output = gr.Code(label="MCP Response", language="json") # Use synchronous wrapper function mcp_submit_button.click( fn=handle_mcp_request_gradio_sync, inputs=[mcp_request_input], outputs=[mcp_response_output] ) gr.Markdown("#### Example Requests:") example_requests = [ ("List Tools", '{\n "jsonrpc": "2.0",\n "id": 2,\n "method": "tools/list",\n "params": {}\n}'), ("Query Earthquakes", '{\n "jsonrpc": "2.0",\n "id": 3,\n "method": "tools/call",\n "params": {\n "name": "query_earthquakes",\n "arguments": {\n "start_date": "2024-04-01",\n "end_date": "2024-04-30",\n "minimum_magnitude": 5.0\n }\n }\n}'), ("Get Statistics", '{\n "jsonrpc": "2.0",\n "id": 4,\n "method": "tools/call",\n "params": {\n "name": "get_earthquake_stats",\n "arguments": {\n "start_date": "2024-01-01",\n "end_date": "2024-06-30",\n "minimum_magnitude": 6.0\n }\n }\n}') ] for title, request in example_requests: with gr.Accordion(title, open=False): gr.Code(value=request, language="json") # API Documentation Tab with gr.TabItem("📚 API Documentation"): gr.Markdown(""" # API Documentation ## MCP Server Tools ### 1. query_earthquakes Query earthquake data. **Parameters:** - `start_date` (string): Start date in YYYY-MM-DD format. - `end_date` (string): End date in YYYY-MM-DD format. - `minimum_magnitude` (number): Minimum earthquake magnitude (ML). - `limit` (integer, optional): Maximum number of results. ### 2. create_earthquake_map Create a visualization map of earthquake data. **Parameters:** Same as `query_earthquakes` plus: - `return_base64` (boolean, optional): If true, returns map as base64 encoded image. ### 3. get_earthquake_stats Get a statistical summary of earthquake data. **Parameters:** - `start_date` (string): Start date range. - `end_date` (string): End date range. - `minimum_magnitude` (number): Minimum magnitude to include in stats. """) # --- Main Application --- if __name__ == "__main__": import os port = int(os.environ.get("PORT", 7860)) logger.info("Starting Earthquake Data MCP Server...") logger.info(f"Server will be available at: http://0.0.0.0:{port}") app.launch(server_name="0.0.0.0", server_port=port, mcp_server=True)