mcp-alert-generator / mcp_server.py
aakashdg's picture
fix (import issue)
3d2c52e verified
raw
history blame
3.07 kB
"""
Farmer.chat MCP Server
Minimal wrapper exposing existing servers as MCP tools
Usage:
Local (stdio): python mcp_server.py
Remote (SSE): Mount in FastAPI with mcp.sse_app()
"""
from mcp.server.fastmcp import FastMCP
import asyncio
import sys
import os
# Add parent dir to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from src.servers.weather import WeatherServer
from src.servers.soil import SoilPropertiesServer
from src.servers.water import WaterServer
from src.servers.elevation import ElevationServer
from src.servers.pests import PestsServer
# Create MCP server
mcp = FastMCP("farmer-chat")
# Initialize data servers
weather = WeatherServer()
soil = SoilPropertiesServer()
water = WaterServer()
elevation = ElevationServer()
pests = PestsServer()
# ============================================================================
# MCP TOOLS
# ============================================================================
@mcp.tool()
async def get_weather(latitude: float, longitude: float) -> dict:
"""Get current weather and 7-day forecast from Open-Meteo API."""
return await weather.get_data(latitude, longitude)
@mcp.tool()
async def get_soil_properties(latitude: float, longitude: float) -> dict:
"""Get soil composition (clay, sand, silt, pH, organic carbon) from Google Earth Engine."""
return await soil.get_data(latitude, longitude)
@mcp.tool()
async def get_groundwater(latitude: float, longitude: float) -> dict:
"""Get groundwater levels and drought status from NASA GRACE satellite data."""
return await water.get_data(latitude, longitude)
@mcp.tool()
async def get_elevation(latitude: float, longitude: float) -> dict:
"""Get terrain elevation from OpenElevation API."""
return await elevation.get_data(latitude, longitude)
@mcp.tool()
async def get_pest_observations(latitude: float, longitude: float) -> dict:
"""Get recent pest/insect observations within 50km from iNaturalist."""
return await pests.get_data(latitude, longitude)
@mcp.tool()
async def get_all_data(latitude: float, longitude: float) -> dict:
"""Get data from all sources in parallel (weather, soil, water, elevation, pests)."""
results = await asyncio.gather(
weather.get_data(latitude, longitude),
soil.get_data(latitude, longitude),
water.get_data(latitude, longitude),
elevation.get_data(latitude, longitude),
pests.get_data(latitude, longitude),
return_exceptions=True
)
def safe(r):
return r if not isinstance(r, Exception) else {"status": "error", "error": str(r)}
return {
"weather": safe(results[0]),
"soil": safe(results[1]),
"groundwater": safe(results[2]),
"elevation": safe(results[3]),
"pests": safe(results[4])
}
# ============================================================================
# RUN
# ============================================================================
if __name__ == "__main__":
mcp.run(transport="stdio")