Spaces:
Sleeping
Sleeping
| """ | |
| MCP Server for CleanCity Agent | |
| Exposes trash detection and cleanup planning tools via the Model Context Protocol. | |
| Can be used by Claude Desktop or other MCP clients. | |
| """ | |
| from mcp.server import Server | |
| from mcp.types import Tool, TextContent | |
| import mcp.server.stdio | |
| import json | |
| from typing import Any | |
| from tools import ( | |
| detect_trash_mcp, | |
| plan_cleanup, | |
| log_event, | |
| query_events, | |
| generate_report | |
| ) | |
| from tools.history_tool import get_hotspots, mark_cleaned | |
| # Initialize MCP server | |
| server = Server("cleancity-agent") | |
| async def list_tools() -> list[Tool]: | |
| """List available MCP tools.""" | |
| return [ | |
| Tool( | |
| name="detect_trash", | |
| description="Detect trash objects in an image using computer vision. Returns bounding boxes, labels, and confidence scores.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "image_data": { | |
| "type": "string", | |
| "description": "Base64 encoded image data" | |
| } | |
| }, | |
| "required": ["image_data"] | |
| } | |
| ), | |
| Tool( | |
| name="plan_cleanup", | |
| description="Generate a cleanup action plan based on detected trash. Returns severity level, resource requirements, and recommendations.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "detections": { | |
| "type": "array", | |
| "description": "Array of trash detection objects from detect_trash", | |
| "items": {"type": "object"} | |
| }, | |
| "location": { | |
| "type": "string", | |
| "description": "Location description (optional)" | |
| }, | |
| "notes": { | |
| "type": "string", | |
| "description": "Additional context or notes (optional)" | |
| }, | |
| "use_llm": { | |
| "type": "boolean", | |
| "description": "Use LLM for enhanced planning (optional, default: false)" | |
| } | |
| }, | |
| "required": ["detections"] | |
| } | |
| ), | |
| Tool( | |
| name="log_event", | |
| description="Log a trash detection event to the history database for tracking and analysis.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "detections": { | |
| "type": "array", | |
| "description": "Array of trash detection objects", | |
| "items": {"type": "object"} | |
| }, | |
| "severity": { | |
| "type": "string", | |
| "description": "Severity level: low, medium, or high" | |
| }, | |
| "location": { | |
| "type": "string", | |
| "description": "Location description (optional)" | |
| }, | |
| "notes": { | |
| "type": "string", | |
| "description": "User notes (optional)" | |
| } | |
| }, | |
| "required": ["detections", "severity"] | |
| } | |
| ), | |
| Tool( | |
| name="query_events", | |
| description="Query trash events from history database with filtering options. Useful for finding patterns and hotspots.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "days": { | |
| "type": "integer", | |
| "description": "Only events from last N days (optional)" | |
| }, | |
| "location": { | |
| "type": "string", | |
| "description": "Filter by location (partial match, optional)" | |
| }, | |
| "severity": { | |
| "type": "string", | |
| "description": "Filter by severity: low, medium, high (optional)" | |
| }, | |
| "limit": { | |
| "type": "integer", | |
| "description": "Maximum results to return (default: 100)" | |
| } | |
| } | |
| } | |
| ), | |
| Tool( | |
| name="get_hotspots", | |
| description="Identify locations with recurring trash problems based on historical data.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "min_events": { | |
| "type": "integer", | |
| "description": "Minimum events to qualify as hotspot (default: 2)" | |
| }, | |
| "days": { | |
| "type": "integer", | |
| "description": "Time window in days (optional, default: 30)" | |
| } | |
| } | |
| } | |
| ), | |
| Tool( | |
| name="generate_report", | |
| description="Generate a formatted report for trash detection event, suitable for city authorities or documentation.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "detections": { | |
| "type": "array", | |
| "description": "Array of trash detection objects", | |
| "items": {"type": "object"} | |
| }, | |
| "severity": { | |
| "type": "string", | |
| "description": "Severity level" | |
| }, | |
| "location": { | |
| "type": "string", | |
| "description": "Location description (optional)" | |
| }, | |
| "notes": { | |
| "type": "string", | |
| "description": "Additional notes (optional)" | |
| }, | |
| "plan": { | |
| "type": "object", | |
| "description": "Cleanup plan object (optional)" | |
| }, | |
| "format": { | |
| "type": "string", | |
| "description": "Report format: email, markdown, or plain (default: email)" | |
| } | |
| }, | |
| "required": ["detections", "severity"] | |
| } | |
| ), | |
| Tool( | |
| name="mark_cleaned", | |
| description="Mark a logged event as cleaned/resolved.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "event_id": { | |
| "type": "integer", | |
| "description": "Database event ID to mark as cleaned" | |
| } | |
| }, | |
| "required": ["event_id"] | |
| } | |
| ) | |
| ] | |
| async def call_tool(name: str, arguments) -> list[TextContent]: | |
| """Handle tool execution.""" | |
| try: | |
| if name == "detect_trash": | |
| result = detect_trash_mcp(arguments["image_data"]) | |
| elif name == "plan_cleanup": | |
| result = plan_cleanup( | |
| detections=arguments["detections"], | |
| location=arguments.get("location"), | |
| notes=arguments.get("notes"), | |
| use_llm=arguments.get("use_llm", False) | |
| ) | |
| elif name == "log_event": | |
| result = log_event( | |
| detections=arguments["detections"], | |
| severity=arguments["severity"], | |
| location=arguments.get("location"), | |
| notes=arguments.get("notes") | |
| ) | |
| elif name == "query_events": | |
| result = query_events( | |
| days=arguments.get("days"), | |
| location=arguments.get("location"), | |
| severity=arguments.get("severity"), | |
| limit=arguments.get("limit", 100) | |
| ) | |
| elif name == "get_hotspots": | |
| result = get_hotspots( | |
| min_events=arguments.get("min_events", 2), | |
| days=arguments.get("days", 30) | |
| ) | |
| elif name == "generate_report": | |
| result = generate_report( | |
| detections=arguments["detections"], | |
| severity=arguments["severity"], | |
| location=arguments.get("location"), | |
| notes=arguments.get("notes"), | |
| plan=arguments.get("plan"), | |
| format=arguments.get("format", "email") | |
| ) | |
| elif name == "mark_cleaned": | |
| result = mark_cleaned(arguments["event_id"]) | |
| else: | |
| return [TextContent( | |
| type="text", | |
| text=f"Unknown tool: {name}" | |
| )] | |
| # Return result as JSON | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(result, indent=2) | |
| )] | |
| except Exception as e: | |
| return [TextContent( | |
| type="text", | |
| text=f"Error executing {name}: {str(e)}" | |
| )] | |
| async def main(): | |
| """Run the MCP server.""" | |
| async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): | |
| await server.run( | |
| read_stream, | |
| write_stream, | |
| server.create_initialization_options() | |
| ) | |
| if __name__ == "__main__": | |
| import asyncio | |
| asyncio.run(main()) | |