|
|
""" |
|
|
MCP Server for CleanCity Agent |
|
|
|
|
|
Exposes trash detection and cleanup planning tools via the Model Context Protocol. |
|
|
Can be used by Claude Desktop or other MCP clients. |
|
|
""" |
|
|
|
|
|
from mcp.server import Server |
|
|
from mcp.types import Tool, TextContent |
|
|
import mcp.server.stdio |
|
|
import json |
|
|
from typing import Any |
|
|
|
|
|
from tools import ( |
|
|
detect_trash_mcp, |
|
|
plan_cleanup, |
|
|
log_event, |
|
|
query_events, |
|
|
generate_report |
|
|
) |
|
|
from tools.history_tool import get_hotspots, mark_cleaned |
|
|
|
|
|
|
|
|
|
|
|
server = Server("cleancity-agent") |
|
|
|
|
|
|
|
|
@server.list_tools() |
|
|
async def list_tools() -> list[Tool]: |
|
|
"""List available MCP tools.""" |
|
|
return [ |
|
|
Tool( |
|
|
name="detect_trash", |
|
|
description="Detect trash objects in an image using computer vision. Returns bounding boxes, labels, and confidence scores.", |
|
|
inputSchema={ |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"image_data": { |
|
|
"type": "string", |
|
|
"description": "Base64 encoded image data" |
|
|
} |
|
|
}, |
|
|
"required": ["image_data"] |
|
|
} |
|
|
), |
|
|
Tool( |
|
|
name="plan_cleanup", |
|
|
description="Generate a cleanup action plan based on detected trash. Returns severity level, resource requirements, and recommendations.", |
|
|
inputSchema={ |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"detections": { |
|
|
"type": "array", |
|
|
"description": "Array of trash detection objects from detect_trash", |
|
|
"items": {"type": "object"} |
|
|
}, |
|
|
"location": { |
|
|
"type": "string", |
|
|
"description": "Location description (optional)" |
|
|
}, |
|
|
"notes": { |
|
|
"type": "string", |
|
|
"description": "Additional context or notes (optional)" |
|
|
}, |
|
|
"use_llm": { |
|
|
"type": "boolean", |
|
|
"description": "Use LLM for enhanced planning (optional, default: false)" |
|
|
} |
|
|
}, |
|
|
"required": ["detections"] |
|
|
} |
|
|
), |
|
|
Tool( |
|
|
name="log_event", |
|
|
description="Log a trash detection event to the history database for tracking and analysis.", |
|
|
inputSchema={ |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"detections": { |
|
|
"type": "array", |
|
|
"description": "Array of trash detection objects", |
|
|
"items": {"type": "object"} |
|
|
}, |
|
|
"severity": { |
|
|
"type": "string", |
|
|
"description": "Severity level: low, medium, or high" |
|
|
}, |
|
|
"location": { |
|
|
"type": "string", |
|
|
"description": "Location description (optional)" |
|
|
}, |
|
|
"notes": { |
|
|
"type": "string", |
|
|
"description": "User notes (optional)" |
|
|
} |
|
|
}, |
|
|
"required": ["detections", "severity"] |
|
|
} |
|
|
), |
|
|
Tool( |
|
|
name="query_events", |
|
|
description="Query trash events from history database with filtering options. Useful for finding patterns and hotspots.", |
|
|
inputSchema={ |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"days": { |
|
|
"type": "integer", |
|
|
"description": "Only events from last N days (optional)" |
|
|
}, |
|
|
"location": { |
|
|
"type": "string", |
|
|
"description": "Filter by location (partial match, optional)" |
|
|
}, |
|
|
"severity": { |
|
|
"type": "string", |
|
|
"description": "Filter by severity: low, medium, high (optional)" |
|
|
}, |
|
|
"limit": { |
|
|
"type": "integer", |
|
|
"description": "Maximum results to return (default: 100)" |
|
|
} |
|
|
} |
|
|
} |
|
|
), |
|
|
Tool( |
|
|
name="get_hotspots", |
|
|
description="Identify locations with recurring trash problems based on historical data.", |
|
|
inputSchema={ |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"min_events": { |
|
|
"type": "integer", |
|
|
"description": "Minimum events to qualify as hotspot (default: 2)" |
|
|
}, |
|
|
"days": { |
|
|
"type": "integer", |
|
|
"description": "Time window in days (optional, default: 30)" |
|
|
} |
|
|
} |
|
|
} |
|
|
), |
|
|
Tool( |
|
|
name="generate_report", |
|
|
description="Generate a formatted report for trash detection event, suitable for city authorities or documentation.", |
|
|
inputSchema={ |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"detections": { |
|
|
"type": "array", |
|
|
"description": "Array of trash detection objects", |
|
|
"items": {"type": "object"} |
|
|
}, |
|
|
"severity": { |
|
|
"type": "string", |
|
|
"description": "Severity level" |
|
|
}, |
|
|
"location": { |
|
|
"type": "string", |
|
|
"description": "Location description (optional)" |
|
|
}, |
|
|
"notes": { |
|
|
"type": "string", |
|
|
"description": "Additional notes (optional)" |
|
|
}, |
|
|
"plan": { |
|
|
"type": "object", |
|
|
"description": "Cleanup plan object (optional)" |
|
|
}, |
|
|
"format": { |
|
|
"type": "string", |
|
|
"description": "Report format: email, markdown, or plain (default: email)" |
|
|
} |
|
|
}, |
|
|
"required": ["detections", "severity"] |
|
|
} |
|
|
), |
|
|
Tool( |
|
|
name="mark_cleaned", |
|
|
description="Mark a logged event as cleaned/resolved.", |
|
|
inputSchema={ |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"event_id": { |
|
|
"type": "integer", |
|
|
"description": "Database event ID to mark as cleaned" |
|
|
} |
|
|
}, |
|
|
"required": ["event_id"] |
|
|
} |
|
|
) |
|
|
] |
|
|
|
|
|
|
|
|
@server.call_tool() |
|
|
async def call_tool(name: str, arguments) -> list[TextContent]: |
|
|
"""Handle tool execution.""" |
|
|
try: |
|
|
if name == "detect_trash": |
|
|
result = detect_trash_mcp(arguments["image_data"]) |
|
|
|
|
|
elif name == "plan_cleanup": |
|
|
result = plan_cleanup( |
|
|
detections=arguments["detections"], |
|
|
location=arguments.get("location"), |
|
|
notes=arguments.get("notes"), |
|
|
use_llm=arguments.get("use_llm", False) |
|
|
) |
|
|
|
|
|
elif name == "log_event": |
|
|
result = log_event( |
|
|
detections=arguments["detections"], |
|
|
severity=arguments["severity"], |
|
|
location=arguments.get("location"), |
|
|
notes=arguments.get("notes") |
|
|
) |
|
|
|
|
|
elif name == "query_events": |
|
|
result = query_events( |
|
|
days=arguments.get("days"), |
|
|
location=arguments.get("location"), |
|
|
severity=arguments.get("severity"), |
|
|
limit=arguments.get("limit", 100) |
|
|
) |
|
|
|
|
|
elif name == "get_hotspots": |
|
|
result = get_hotspots( |
|
|
min_events=arguments.get("min_events", 2), |
|
|
days=arguments.get("days", 30) |
|
|
) |
|
|
|
|
|
elif name == "generate_report": |
|
|
result = generate_report( |
|
|
detections=arguments["detections"], |
|
|
severity=arguments["severity"], |
|
|
location=arguments.get("location"), |
|
|
notes=arguments.get("notes"), |
|
|
plan=arguments.get("plan"), |
|
|
format=arguments.get("format", "email") |
|
|
) |
|
|
|
|
|
elif name == "mark_cleaned": |
|
|
result = mark_cleaned(arguments["event_id"]) |
|
|
|
|
|
else: |
|
|
return [TextContent( |
|
|
type="text", |
|
|
text=f"Unknown tool: {name}" |
|
|
)] |
|
|
|
|
|
|
|
|
return [TextContent( |
|
|
type="text", |
|
|
text=json.dumps(result, indent=2) |
|
|
)] |
|
|
|
|
|
except Exception as e: |
|
|
return [TextContent( |
|
|
type="text", |
|
|
text=f"Error executing {name}: {str(e)}" |
|
|
)] |
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Run the MCP server.""" |
|
|
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): |
|
|
await server.run( |
|
|
read_stream, |
|
|
write_stream, |
|
|
server.create_initialization_options() |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import asyncio |
|
|
asyncio.run(main()) |
|
|
|