#!/usr/bin/env python3 """ Earthquake Data MCP Server for Hugging Face Spaces Provides earthquake data querying and visualization capabilities via both Gradio UI and MCP protocol. """ import asyncio import json import sqlite3 import pandas as pd import matplotlib.pyplot as plt import cartopy.crs as ccrs import cartopy.feature as cfeature import base64 import io from datetime import datetime from typing import Any, Dict, List, Optional, Tuple import gradio as gr import logging import threading import queue import time # Configure matplotlib for non-interactive backend plt.switch_backend('Agg') # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class EarthquakeDataService: """Core earthquake data service used by both Gradio and MCP interfaces.""" def __init__(self, db_path: str = 'earthquake_data.db'): self.db_path = db_path async def query_earthquakes_data(self, start_date: str = "2024-01-01", start_time: str = "00:00:00", end_date: str = "2024-12-31", end_time: str = "23:59:59", lat_min: float = 21, lat_max: float = 26, lon_min: float = 119, lon_max: float = 123, depth_min: float = 0, depth_max: float = 100, ML_min: float = 4.5, ML_max: float = 8, limit: int = 1000) -> Dict[str, Any]: """Query earthquake data with filters.""" try: # Combine date and time start_datetime_str = f"{start_date.strip()} {start_time.strip()}" end_datetime_str = f"{end_date.strip()} {end_time.strip()}" # Connect to database conn = sqlite3.connect(self.db_path) # Build query query = "SELECT * FROM earthquakes WHERE (date || ' ' || time) BETWEEN ? AND ?" params = [start_datetime_str, end_datetime_str] # Add filters filters = { "lat BETWEEN ? AND ?": (lat_min, lat_max), "lon BETWEEN ? AND ?": (lon_min, lon_max), "depth BETWEEN ? AND ?": (depth_min, depth_max), "ML BETWEEN ? AND ?": (ML_min, ML_max), } for condition, values in filters.items(): if values[0] is not None and values[1] is not None: query += f" AND {condition}" params.extend(values) query += f" LIMIT {limit}" # Execute query df = pd.read_sql_query(query, conn, params=tuple(params)) conn.close() if df.empty: return { "success": True, "message": "No data found for the selected filters", "count": 0, "data": [], "dataframe": pd.DataFrame({"Message": ["No data found for the selected filters."]}) } # Convert to list of dictionaries for MCP data = df.to_dict('records') return { "success": True, "count": len(data), "data": data, "dataframe": df, "summary": { "magnitude_range": [float(df['ML'].min()), float(df['ML'].max())], "depth_range": [float(df['depth'].min()), float(df['depth'].max())], "location_bounds": { "lat_range": [float(df['lat'].min()), float(df['lat'].max())], "lon_range": [float(df['lon'].min()), float(df['lon'].max())] } } } except Exception as e: logger.error(f"Error querying earthquakes: {e}") return { "success": False, "error": str(e), "dataframe": pd.DataFrame({"Error": [str(e)]}) } async def create_earthquake_map(self, start_date: str = "2024-01-01", start_time: str = "00:00:00", end_date: str = "2024-12-31", end_time: str = "23:59:59", lat_min: float = 21, lat_max: float = 26, lon_min: float = 119, lon_max: float = 123, depth_min: float = 0, depth_max: float = 100, ML_min: float = 4.5, ML_max: float = 8, return_base64: bool = False) -> Dict[str, Any]: """Create earthquake distribution map.""" try: # Get earthquake data first query_result = await self.query_earthquakes_data( start_date, start_time, end_date, end_time, lat_min, lat_max, lon_min, lon_max, depth_min, depth_max, ML_min, ML_max ) if not query_result.get("success") or query_result.get("count", 0) == 0: return {"success": False, "error": "No data available for mapping", "figure": None} df = query_result["dataframe"] # Create map fig = plt.figure(figsize=(12, 10)) ax = fig.add_subplot(1, 1, 1, projection=ccrs.PlateCarree()) ax.set_extent([lon_min, lon_max, lat_min, lat_max], crs=ccrs.PlateCarree()) # Add map features ax.add_feature(cfeature.LAND, edgecolor='black', alpha=0.8) ax.add_feature(cfeature.OCEAN, alpha=0.6) ax.add_feature(cfeature.COASTLINE, linewidth=0.8) ax.add_feature(cfeature.BORDERS, linestyle=':', alpha=0.7) # Create scatter plot scatter = ax.scatter( df['lon'], df['lat'], c=df['ML'], cmap='viridis', alpha=0.7, s=60, transform=ccrs.PlateCarree(), edgecolors='black', linewidth=0.5 ) # Add colorbar and title plt.colorbar(scatter, ax=ax, orientation='vertical', label='Magnitude (ML)', shrink=0.7, pad=0.05) ax.set_title(f'Earthquake Distribution Map\n' f'{start_date} to {end_date} ({len(df)} events)', fontsize=14, pad=20) # Add gridlines ax.gridlines(draw_labels=True, alpha=0.5) result = { "success": True, "earthquake_count": len(df), "date_range": f"{start_date} to {end_date}", "bounds": { "lat_min": lat_min, "lat_max": lat_max, "lon_min": lon_min, "lon_max": lon_max }, "figure": fig } if return_base64: # Convert to base64 buffer = io.BytesIO() plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight') buffer.seek(0) image_base64 = base64.b64encode(buffer.getvalue()).decode() result["map_image_base64"] = image_base64 buffer.close() return result except Exception as e: logger.error(f"Error creating earthquake map: {e}") return {"success": False, "error": str(e), "figure": None} async def get_earthquake_stats(self, start_date: str = "2024-01-01", end_date: str = "2024-12-31", lat_min: float = 21, lat_max: float = 26, lon_min: float = 119, lon_max: float = 123) -> Dict[str, Any]: """Get statistical summary of earthquake data.""" try: query_result = await self.query_earthquakes_data( start_date=start_date, end_date=end_date, lat_min=lat_min, lat_max=lat_max, lon_min=lon_min, lon_max=lon_max ) if not query_result.get("success") or query_result.get("count", 0) == 0: return {"success": False, "error": "No data available for statistics"} df = query_result["dataframe"] stats = { "success": True, "total_earthquakes": len(df), "date_range": {"start": start_date, "end": end_date}, "magnitude_stats": { "min": float(df['ML'].min()), "max": float(df['ML'].max()), "mean": float(df['ML'].mean()), "median": float(df['ML'].median()), "std": float(df['ML'].std()) }, "depth_stats": { "min": float(df['depth'].min()), "max": float(df['depth'].max()), "mean": float(df['depth'].mean()), "median": float(df['depth'].median()) }, "magnitude_distribution": { "4.0-4.9": len(df[(df['ML'] >= 4.0) & (df['ML'] < 5.0)]), "5.0-5.9": len(df[(df['ML'] >= 5.0) & (df['ML'] < 6.0)]), "6.0-6.9": len(df[(df['ML'] >= 6.0) & (df['ML'] < 7.0)]), "7.0+": len(df[df['ML'] >= 7.0]) }, "depth_distribution": { "shallow (0-10km)": len(df[(df['depth'] >= 0) & (df['depth'] <= 10)]), "intermediate (10-70km)": len(df[(df['depth'] > 10) & (df['depth'] <= 70)]), "deep (>70km)": len(df[df['depth'] > 70]) } } return stats except Exception as e: logger.error(f"Error getting earthquake stats: {e}") return {"success": False, "error": str(e)} class EarthquakeMCPServer: """MCP Server component for handling MCP protocol requests.""" def __init__(self, data_service: EarthquakeDataService): self.data_service = data_service self.tools = { "query_earthquakes": { "description": "Query earthquake data with various filters", "parameters": { "type": "object", "properties": { "start_date": {"type": "string", "description": "Start date (YYYY-MM-DD)", "default": "2024-01-01"}, "start_time": {"type": "string", "description": "Start time (HH:MM:SS)", "default": "00:00:00"}, "end_date": {"type": "string", "description": "End date (YYYY-MM-DD)", "default": "2024-12-31"}, "end_time": {"type": "string", "description": "End time (HH:MM:SS)", "default": "23:59:59"}, "lat_min": {"type": "number", "description": "Minimum latitude", "default": 21}, "lat_max": {"type": "number", "description": "Maximum latitude", "default": 26}, "lon_min": {"type": "number", "description": "Minimum longitude", "default": 119}, "lon_max": {"type": "number", "description": "Maximum longitude", "default": 123}, "depth_min": {"type": "number", "description": "Minimum depth (km)", "default": 0}, "depth_max": {"type": "number", "description": "Maximum depth (km)", "default": 100}, "ML_min": {"type": "number", "description": "Minimum magnitude", "default": 4.5}, "ML_max": {"type": "number", "description": "Maximum magnitude", "default": 8}, "limit": {"type": "integer", "description": "Maximum results", "default": 1000} } } }, "create_earthquake_map": { "description": "Create a map visualization of earthquake data", "parameters": { "type": "object", "properties": { "start_date": {"type": "string", "default": "2024-01-01"}, "start_time": {"type": "string", "default": "00:00:00"}, "end_date": {"type": "string", "default": "2024-12-31"}, "end_time": {"type": "string", "default": "23:59:59"}, "lat_min": {"type": "number", "default": 21}, "lat_max": {"type": "number", "default": 26}, "lon_min": {"type": "number", "default": 119}, "lon_max": {"type": "number", "default": 123}, "depth_min": {"type": "number", "default": 0}, "depth_max": {"type": "number", "default": 100}, "ML_min": {"type": "number", "default": 4.5}, "ML_max": {"type": "number", "default": 8}, "return_base64": {"type": "boolean", "default": True} } } }, "get_earthquake_stats": { "description": "Get statistical summary of earthquake data", "parameters": { "type": "object", "properties": { "start_date": {"type": "string", "default": "2024-01-01"}, "end_date": {"type": "string", "default": "2024-12-31"}, "lat_min": {"type": "number", "default": 21}, "lat_max": {"type": "number", "default": 26}, "lon_min": {"type": "number", "default": 119}, "lon_max": {"type": "number", "default": 123} } } } } async def handle_mcp_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handle MCP protocol requests.""" try: method = request.get("method") params = request.get("params", {}) if method == "initialize": return { "jsonrpc": "2.0", "id": request.get("id"), "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": { "name": "earthquake-data-server", "version": "1.0.0" } } } elif method == "tools/list": return { "jsonrpc": "2.0", "id": request.get("id"), "result": { "tools": [ {"name": name, **tool_info} for name, tool_info in self.tools.items() ] } } elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name == "query_earthquakes": result = await self.data_service.query_earthquakes_data(**arguments) # Remove dataframe for JSON serialization result.pop("dataframe", None) elif tool_name == "create_earthquake_map": result = await self.data_service.create_earthquake_map(**arguments) # Remove figure for JSON serialization result.pop("figure", None) elif tool_name == "get_earthquake_stats": result = await self.data_service.get_earthquake_stats(**arguments) else: return { "jsonrpc": "2.0", "id": request.get("id"), "error": {"code": -32601, "message": f"Unknown tool: {tool_name}"} } return { "jsonrpc": "2.0", "id": request.get("id"), "result": { "content": [ { "type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False) } ] } } else: return { "jsonrpc": "2.0", "id": request.get("id"), "error": {"code": -32601, "message": f"Unknown method: {method}"} } except Exception as e: logger.error(f"Error handling MCP request: {e}") return { "jsonrpc": "2.0", "id": request.get("id", 0), "error": {"code": -32603, "message": str(e)} } # Global data service instance data_service = EarthquakeDataService() mcp_server = EarthquakeMCPServer(data_service) # --- Gradio Interface Functions --- async def fetch_and_plot_data( start_date, start_time, end_date, end_time, lat_min, lat_max, lon_min, lon_max, depth_min, depth_max, ML_min, ML_max ) -> Tuple[pd.DataFrame, Any]: """Gradio interface function for fetching and plotting data.""" try: # Query data query_result = await data_service.query_earthquakes_data( start_date, start_time, end_date, end_time, lat_min, lat_max, lon_min, lon_max, depth_min, depth_max, ML_min, ML_max ) df = query_result.get("dataframe", pd.DataFrame()) if query_result.get("count", 0) == 0: return df, None # Create map map_result = await data_service.create_earthquake_map( start_date, start_time, end_date, end_time, lat_min, lat_max, lon_min, lon_max, depth_min, depth_max, ML_min, ML_max ) figure = map_result.get("figure") return df, figure except Exception as e: logger.error(f"Error in fetch_and_plot_data: {e}") return pd.DataFrame({"Error": [str(e)]}), None def gradio_fetch_and_plot_data(*args): """Synchronous wrapper for Gradio.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(fetch_and_plot_data(*args)) finally: loop.close() async def handle_mcp_request_gradio(request_json: str) -> str: """Handle MCP requests through Gradio interface.""" try: request = json.loads(request_json) response = await mcp_server.handle_mcp_request(request) return json.dumps(response, indent=2, ensure_ascii=False) except Exception as e: error_response = { "jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": f"Parse error: {str(e)}"} } return json.dumps(error_response, indent=2) def gradio_handle_mcp_request(request_json: str) -> str: """Synchronous wrapper for MCP requests in Gradio.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(handle_mcp_request_gradio(request_json)) finally: loop.close() # --- Gradio Interface --- with gr.Blocks(title="Earthquake Data MCP Server", theme=gr.themes.Soft()) as app: gr.Markdown("# 🌍 Earthquake Data Explorer & MCP Server") gr.Markdown("This application provides both a web interface and MCP server for earthquake data analysis.") with gr.Tabs(): # Data Explorer Tab with gr.TabItem("📊 Data Explorer"): gr.Markdown("### Use the filters below to search the earthquake catalog and visualize the distribution.") with gr.Row(): with gr.Column(scale=1): gr.Markdown("#### Date & Time Range") start_date_input = gr.Textbox(label="Start Date", value="2024-01-01") start_time_input = gr.Textbox(label="Start Time (HH:MM:SS)", placeholder="00:00:00") end_date_input = gr.Textbox(label="End Date", value="2024-12-31") end_time_input = gr.Textbox(label="End Time (HH:MM:SS)", placeholder="23:59:59") with gr.Column(scale=1): gr.Markdown("#### Geographical & Physical Filters") lon_min_input = gr.Number(label="Longitude From", value=119) lon_max_input = gr.Number(label="To", value=123) lat_min_input = gr.Number(label="Latitude From", value=21) lat_max_input = gr.Number(label="To", value=26) depth_min_input = gr.Number(label="Depth From", value=0) depth_max_input = gr.Number(label="To", value=100) ML_min_input = gr.Number(label="Magnitude From", value=4.5) ML_max_input = gr.Number(label="To", value=8) filter_button = gr.Button("🔍 Filter and Plot Data", variant="primary", size="lg") with gr.Row(): with gr.Column(scale=2): output_plot = gr.Plot(label="Earthquake Distribution Map") with gr.Column(scale=3): output_df = gr.DataFrame(label="Filtered Results") filter_button.click( fn=gradio_fetch_and_plot_data, inputs=[ start_date_input, start_time_input, end_date_input, end_time_input, lat_min_input, lat_max_input, lon_min_input, lon_max_input, depth_min_input, depth_max_input, ML_min_input, ML_max_input ], outputs=[output_df, output_plot] ) # MCP Interface Tab with gr.TabItem("🔌 MCP Interface"): gr.Markdown("### Model Context Protocol (MCP) Interface") gr.Markdown(""" This tab allows you to interact with the MCP server directly. Send JSON-RPC requests to test the MCP functionality. **Available Methods:** - `initialize`: Initialize the MCP connection - `tools/list`: List available tools - `tools/call`: Call a specific tool **Available Tools:** - `query_earthquakes`: Query earthquake data - `create_earthquake_map`: Create earthquake maps - `get_earthquake_stats`: Get earthquake statistics """) with gr.Row(): with gr.Column(): mcp_request_input = gr.Code( label="MCP Request (JSON-RPC)", language="json", value='{\n "jsonrpc": "2.0",\n "id": 1,\n "method": "tools/list",\n "params": {}\n}' ) mcp_submit_button = gr.Button("📤 Send MCP Request", variant="primary") with gr.Column(): mcp_response_output = gr.Code( label="MCP Response", language="json" ) mcp_submit_button.click( fn=gradio_handle_mcp_request, inputs=[mcp_request_input], outputs=[mcp_response_output] ) # Example requests gr.Markdown("#### Example Requests:") example_requests = [ ("Initialize Connection", '{\n "jsonrpc": "2.0",\n "id": 1,\n "method": "initialize",\n "params": {\n "protocolVersion": "2024-11-05",\n "capabilities": {},\n "clientInfo": {"name": "web-client", "version": "1.0.0"}\n }\n}'), ("List Tools", '{\n "jsonrpc": "2.0",\n "id": 2,\n "method": "tools/list",\n "params": {}\n}'), ("Query Earthquakes", '{\n "jsonrpc": "2.0",\n "id": 3,\n "method": "tools/call",\n "params": {\n "name": "query_earthquakes",\n "arguments": {\n "start_date": "2024-01-01",\n "end_date": "2024-01-31",\n "ML_min": 5.0,\n "limit": 10\n }\n }\n}'), ("Get Statistics", '{\n "jsonrpc": "2.0",\n "id": 4,\n "method": "tools/call",\n "params": {\n "name": "get_earthquake_stats",\n "arguments": {\n "start_date": "2024-01-01",\n "end_date": "2024-03-31"\n }\n }\n}') ] for title, request in example_requests: with gr.Accordion(title, open=False): gr.Code(value=request, language="json") # API Documentation Tab with gr.TabItem("📚 API Documentation"): gr.Markdown(""" # API Documentation ## MCP Server Endpoint **URL:** `https://your-space-name.hf.space` (when deployed) ## Available Tools ### 1. query_earthquakes Query earthquake data with various filters. **Parameters:** - `start_date` (string): Start date in YYYY-MM-DD format - `start_time` (string): Start time in HH:MM:SS format - `end_date` (string): End date in YYYY-MM-DD format - `end_time` (string): End time in HH:MM:SS format - `lat_min`, `lat_max` (number): Latitude range - `lon_min`, `lon_max` (number): Longitude range - `depth_min`, `depth_max` (number): Depth range in km - `ML_min`, `ML_max` (number): Magnitude range - `limit` (integer): Maximum number of results ### 2. create_earthquake_map Create a visualization map of earthquake data. **Parameters:** Same as query_earthquakes plus: - `return_base64` (boolean): Return map as base64 encoded image ### 3. get_earthquake_stats Get statistical summary of earthquake data. **Parameters:** - `start_date`, `end_date` (string): Date range - `lat_min`, `lat_max`, `lon_min`, `lon_max` (number): Geographic bounds ## Integration with AI Assistants To use this MCP server with Claude Desktop or other MCP clients, add this configuration: ```json { "mcpServers": { "earthquake-data": { "command": "python", "args": ["path/to/earthquake_mcp_server.py"], "env": { "EARTHQUAKE_DB_PATH": "earthquake_data.db" } } } } ``` For web-based integration, make HTTP requests to the deployed Hugging Face Space URL. """) # --- Main Application --- if __name__ == "__main__": # Check if running in Hugging Face Spaces import os port = int(os.environ.get("PORT", 7860)) logger.info("Starting Earthquake Data MCP Server...") logger.info(f"Server will be available at: http://0.0.0.0:{port}") app.launch( server_name="0.0.0.0", server_port=port, share=True, show_api=True, mcp_server=True )