CC-CleanCity / mcp_server.py
AlBaraa63's picture
Implement HF Hub fallback for model loading and add upload script for HuggingFace
c4f5747
"""
CleanEye Agent MCP Server (Hybrid, Safe, Production-Ready)
----------------------------------------------------------
Autonomous waste management MCP server exposing 8 tools:
1) detect_garbage
2) analyze_severity
3) extract_location
4) identify_stakeholders
5) estimate_cleanup
6) calculate_resources
7) generate_report
8) create_timeline
Built for HF + Anthropic + Gradio MCP Hackathon Winter 2025
Track 2: MCP in Action | Category: Consumer
Key features:
- YOLO lazy-loading with clear error if weights missing
- Accepts image_path or base64 image
- Defensive argument validation + structured error responses
- Stable JSON outputs for every tool
- Works with MCP stdio server (default)
"""
from __future__ import annotations
import asyncio
import base64
import json
import os
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# Optional heavy deps: load lazily where possible
try:
import cv2 # type: ignore
except Exception as _e_cv2:
cv2 = None # type: ignore
try:
import numpy as np # type: ignore
except Exception as _e_np:
np = None # type: ignore
# YOLO is loaded lazily inside get_model() to avoid import cost if unused
YOLO = None # will be imported when needed
# ----- MCP imports (stdio server) -----
MCP_AVAILABLE = True
try:
import mcp.server.stdio # type: ignore
import mcp.types as types # type: ignore
from mcp.server import NotificationOptions, Server # type: ignore
from mcp.server.models import InitializationOptions # type: ignore
except Exception:
MCP_AVAILABLE = False
# Lightweight fallbacks so the file can import without MCP installed.
@dataclass
class _TextContent:
type: str
text: str
class types: # type: ignore
TextContent = _TextContent
ImageContent = _TextContent # not used here
EmbeddedResource = _TextContent # not used here
Tool = dict # only for list-tools; not used without MCP
class Server: # type: ignore
def __init__(self, *_a, **_kw): ...
def get_capabilities(self, **_): return {}
async def run(self, *_a, **_kw): ...
class NotificationOptions: ... # type: ignore
class InitializationOptions: # type: ignore
def __init__(self, **_): ...
# -----------------------------------------------------------------------------
# Paths & Globals
# -----------------------------------------------------------------------------
ROOT_DIR = Path(os.getenv("CLEANEYE_ROOT_DIR", Path(__file__).resolve().parent))
MODEL_PATH = Path(os.getenv("CLEANEYE_MODEL_PATH", ROOT_DIR / "Weights" / "best.pt"))
# One global YOLO model instance (lazy)
_model = None
def _get_model_path():
"""Get model path with HF Hub fallback for deployment"""
# Check if local model exists and is valid
if MODEL_PATH.exists():
# Check if it's a real file (not LFS pointer)
if MODEL_PATH.stat().st_size > 1000: # Real models are much larger
return MODEL_PATH
print("⚠️ Model file appears to be a Git LFS pointer")
# Try to download from HuggingFace Hub
try:
from huggingface_hub import hf_hub_download
print("📥 Downloading model from HuggingFace Hub...")
model_path = hf_hub_download(
repo_id=os.getenv("HF_MODEL_REPO", "ultralytics/yolov8n"),
filename=os.getenv("HF_MODEL_FILE", "yolov8n.pt"),
cache_dir=str(ROOT_DIR / "model_cache")
)
print(f"✅ Model downloaded successfully: {model_path}")
return Path(model_path)
except Exception as e:
print(f"⚠️ Could not download from HF Hub: {e}")
# Last resort: return default model name (ultralytics will auto-download)
print("📦 Using default YOLOv8n model from ultralytics")
return Path("yolov8n.pt")
# Waste category metadata (tune for your dataset)
WASTE_CATEGORIES = {
"0": {"type": "General Waste", "priority": 2, "cleanup_time": 5},
"c": {"type": "Containers", "priority": 3, "cleanup_time": 3},
"garbage": {"type": "General Garbage", "priority": 2, "cleanup_time": 5},
"garbage_bag": {"type": "Plastic Bags", "priority": 4, "cleanup_time": 7},
"waste": {"type": "Waste Items", "priority": 2, "cleanup_time": 4},
"trash": {"type": "Trash", "priority": 2, "cleanup_time": 4},
}
AREA_TYPES = {
"residential": {"urgency": "medium", "stakeholders": ["Municipal Waste Dept", "Residential Association"]},
"commercial": {"urgency": "high", "stakeholders": ["Municipal Waste Dept", "Business District Authority"]},
"public_park": {"urgency": "high", "stakeholders": ["Parks & Recreation", "Environmental Health"]},
"street": {"urgency": "medium", "stakeholders": ["Street Cleaning Dept", "Municipal Waste Dept"]},
"industrial": {"urgency": "low", "stakeholders": ["Industrial Waste Management", "Environmental Protection"]},
}
# -----------------------------------------------------------------------------
# Utilities
# -----------------------------------------------------------------------------
def _ok(payload: Dict[str, Any]) -> Dict[str, Any]:
payload.setdefault("status", "success")
return payload
def _err(message: str, code: str = "bad_request", extra: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
out = {"status": "error", "error": {"code": code, "message": message}}
if extra: out["error"]["details"] = extra
return out
def _json_text(payload: Dict[str, Any]) -> types.TextContent:
return types.TextContent(type="text", text=json.dumps(payload, indent=2))
def _require(dep, name: str) -> Optional[types.TextContent]:
if dep is None:
return _json_text(_err(f"{name} not available. Please install it.", code="missing_dependency"))
return None
def get_model():
"""
Lazy-load YOLO model once with HF Hub fallback.
"""
global _model, YOLO
if _model is not None:
return _model
# deps
missing = _require(np, "numpy") or _require(cv2, "opencv-python")
if missing:
raise RuntimeError(json.loads(missing.text)["error"]["message"])
# import YOLO only here
try:
if YOLO is None:
from ultralytics import YOLO as _YOLO # type: ignore
globals()["YOLO"] = _YOLO
except Exception:
raise RuntimeError("ultralytics not available. Install via: pip install ultralytics")
# Get model path with fallback
model_path = _get_model_path()
print(f"🔄 Loading YOLO model from: {model_path}")
try:
_model = YOLO(str(model_path))
print("✅ YOLO model loaded successfully")
except Exception as e:
raise RuntimeError(f"Failed to load YOLO model: {e}")
return _model
def _read_image_from_args(args: Dict[str, Any]) -> Tuple[Optional["np.ndarray"], Optional[types.TextContent]]:
"""
Load image from 'image_path' or 'image_base64'. Returns (image_bgr, error_textcontent | None)
"""
# Dependency check
dep_missing = _require(np, "numpy") or _require(cv2, "opencv-python")
if dep_missing:
return None, dep_missing
img = None
img_path = args.get("image_path")
img_b64 = args.get("image_base64")
if img_path:
p = Path(img_path)
if not p.is_absolute():
p = ROOT_DIR / p
if not p.exists() or not p.is_file():
return None, _json_text(_err(f"Image not found: {p}", code="not_found"))
try:
img = cv2.imread(str(p))
except Exception as e:
return None, _json_text(_err(f"Failed to read image: {e}", code="image_read_error"))
elif img_b64:
try:
raw = base64.b64decode(img_b64)
arr = np.frombuffer(raw, np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
except Exception as e:
return None, _json_text(_err(f"Invalid base64 image: {e}", code="decode_error"))
else:
return None, _json_text(_err("Provide 'image_path' or 'image_base64'.", code="bad_request"))
if img is None:
return None, _json_text(_err("Image decoding returned None.", code="image_decode_error"))
return img, None
def _round(x, n=2):
try:
return round(float(x), n)
except Exception:
return x
# -----------------------------------------------------------------------------
# MCP Server (if available)
# -----------------------------------------------------------------------------
server = Server("cleaneye-agent") if MCP_AVAILABLE else None # type: ignore
if MCP_AVAILABLE:
@server.list_tools() # type: ignore
async def handle_list_tools() -> list[types.Tool]:
"""
List available MCP tools.
"""
def tool(name: str, description: str, schema: Dict[str, Any]) -> types.Tool:
return types.Tool(name=name, description=description, inputSchema=schema) # type: ignore
return [
tool(
"detect_garbage",
"🔍 STEP 1: Detect and identify garbage in an image. Returns detections with boxes, labels, confidences.",
{
"type": "object",
"properties": {
"image_path": {"type": "string", "description": "Path to the image file"},
"image_base64": {"type": "string", "description": "Base64-encoded image (alternative)"},
"confidence": {"type": "number", "default": 0.25, "minimum": 0.01, "maximum": 0.99},
},
},
),
tool(
"analyze_severity",
"🧠 STEP 2: Analyze severity & health risk. Input detections + image_dimensions.",
{
"type": "object",
"properties": {
"detections": {"type": "array", "items": {"type": "object"}},
"image_dimensions": {
"type": "object",
"properties": {
"width": {"type": "number"},
"height": {"type": "number"},
},
},
},
"required": ["detections", "image_dimensions"],
},
),
tool(
"extract_location",
"📍 STEP 3: Extract/assign location and area type (exif/manual).",
{
"type": "object",
"properties": {
"image_path": {"type": "string"},
"manual_location": {
"type": "object",
"properties": {
"coordinates": {
"type": "object",
"properties": {
"latitude": {"type": "number"},
"longitude": {"type": "number"},
},
},
"address": {"type": "string"},
"area_type": {
"type": "string",
"enum": ["residential", "commercial", "public_park", "street", "industrial"],
},
},
},
},
},
),
tool(
"identify_stakeholders",
"👥 STEP 4: Determine relevant stakeholders & urgency.",
{
"type": "object",
"properties": {
"area_type": {
"type": "string",
"enum": ["residential", "commercial", "public_park", "street", "industrial"],
},
"severity_level": {"type": "string", "enum": ["clean", "moderate", "severe"]},
"waste_categories": {"type": "array", "items": {"type": "string"}},
},
"required": ["area_type", "severity_level"],
},
),
tool(
"estimate_cleanup",
"⏱️ STEP 5: Estimate cleanup time, workers, suggested schedule.",
{
"type": "object",
"properties": {
"detection_count": {"type": "integer"},
"waste_breakdown": {"type": "object"},
"area_coverage": {"type": "number"},
"severity_level": {"type": "string", "enum": ["clean", "moderate", "severe"]},
},
"required": ["detection_count", "severity_level"],
},
),
tool(
"calculate_resources",
"🛠️ STEP 6: Calculate equipment, vehicle, and budget.",
{
"type": "object",
"properties": {
"detection_count": {"type": "integer"},
"waste_categories": {"type": "object"},
"area_type": {"type": "string"},
},
"required": ["detection_count"],
},
),
tool(
"generate_report",
"📊 STEP 7: Generate comprehensive report.",
{
"type": "object",
"properties": {
"detection_results": {"type": "object"},
"severity_analysis": {"type": "object"},
"location_info": {"type": "object"},
"stakeholders": {"type": "object"},
"cleanup_estimate": {"type": "object"},
"resources": {"type": "object"},
},
"required": ["detection_results", "severity_analysis", "cleanup_estimate"],
},
),
tool(
"create_timeline",
"📅 STEP 8: Build cleanup timeline & milestones.",
{
"type": "object",
"properties": {
"cleanup_estimate": {"type": "object"},
"urgency_level": {"type": "string", "enum": ["low", "medium", "high", "critical"]},
"preferred_date": {"type": "string"},
},
"required": ["cleanup_estimate", "urgency_level"],
},
),
# 🏆 NEW B2B TOOLS FOR WASTE COMPANIES
tool(
"generate_contract_bid",
"💼 STEP 9 [B2B]: Generate professional contract bid for waste companies with cost breakdown, profit margin, and competitive analysis.",
{
"type": "object",
"properties": {
"cleanup_estimate": {"type": "object", "description": "Output from estimate_cleanup"},
"resources": {"type": "object", "description": "Output from calculate_resources"},
"detection_count": {"type": "integer", "description": "Number of waste items detected"},
"profit_margin_percent": {"type": "number", "default": 20.0, "description": "Desired profit margin (%)"},
"company_name": {"type": "string", "default": "Your Waste Management Co."},
"client_name": {"type": "string", "default": "Municipal Authority"},
},
"required": ["cleanup_estimate", "resources", "detection_count"],
},
),
tool(
"calculate_profit_margin",
"📊 STEP 10 [B2B]: Calculate profit margins and ROI for waste company operations.",
{
"type": "object",
"properties": {
"bid_amount": {"type": "number", "description": "Total bid amount in USD"},
"actual_costs": {"type": "number", "description": "Actual operational costs"},
"labor_hours": {"type": "number", "description": "Total labor hours"},
"monthly_jobs_estimate": {"type": "integer", "default": 1, "description": "Estimated monthly job volume"},
},
"required": ["bid_amount", "actual_costs", "labor_hours"],
},
),
tool(
"optimize_route",
"🗺️ STEP 11 [B2B]: Optimize collection routes for multiple waste sites to maximize efficiency and profitability.",
{
"type": "object",
"properties": {
"locations": {
"type": "array",
"items": {"type": "object"},
"description": "List of site locations with coordinates",
},
"depot_location": {
"type": "object",
"properties": {
"lat": {"type": "number"},
"lng": {"type": "number"},
},
"description": "Starting depot/office location",
},
},
"required": ["locations"],
},
),
]
@server.call_tool() # type: ignore
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
try:
args = arguments or {}
if name == "detect_garbage": return await detect_garbage(args)
if name == "analyze_severity": return await analyze_severity(args)
if name == "extract_location": return await extract_location(args)
if name == "identify_stakeholders": return await identify_stakeholders(args)
if name == "estimate_cleanup": return await estimate_cleanup(args)
if name == "calculate_resources": return await calculate_resources(args)
if name == "generate_report": return await generate_report(args)
if name == "create_timeline": return await create_timeline(args)
# 🏆 NEW B2B TOOLS
if name == "generate_contract_bid": return await generate_contract_bid(args)
if name == "calculate_profit_margin": return await calculate_profit_margin(args)
if name == "optimize_route": return await optimize_route(args)
return [_json_text(_err(f"Unknown tool: {name}", code="unknown_tool"))]
except Exception as e:
return [_json_text(_err(f"Unhandled exception: {e}", code="server_error"))]
# -----------------------------------------------------------------------------
# TOOL IMPLEMENTATIONS
# -----------------------------------------------------------------------------
async def detect_garbage(args: Dict[str, Any]) -> list[types.TextContent]:
"""
🔍 STEP 1: Detect garbage in image (YOLOv8)
Input: image_path or image_base64, optional confidence (0.01..0.99)
"""
conf = args.get("confidence", 0.25)
try:
conf = float(conf)
if not (0.01 <= conf <= 0.99):
raise ValueError
except Exception:
return [_json_text(_err("confidence must be a float between 0.01 and 0.99", code="bad_request"))]
image, err = _read_image_from_args(args)
if err:
return [err]
try:
model = get_model()
results = model.predict(image, conf=conf, verbose=False)
except FileNotFoundError as e:
return [_json_text(_err(str(e), code="model_missing"))]
except Exception as e:
return [_json_text(_err(f"YOLO inference failed: {e}", code="inference_error"))]
detections: List[Dict[str, Any]] = []
category_counts: Dict[str, int] = {}
try:
for result in results:
# Ultralytics result format
boxes = result.boxes
for box in boxes:
x1, y1, x2, y2 = [float(v) for v in box.xyxy[0].tolist()]
conf_v = float(box.conf[0])
cls_id = int(box.cls[0])
label = model.names[cls_id]
area = (x2 - x1) * (y2 - y1)
detections.append({
"category": str(label),
"confidence": _round(conf_v, 3),
"bounding_box": {"x1": _round(x1, 1), "y1": _round(y1, 1), "x2": _round(x2, 1), "y2": _round(y2, 1)},
"area": _round(area, 1),
})
category_counts[str(label)] = category_counts.get(str(label), 0) + 1
h, w = image.shape[:2]
out = _ok({
"total_detections": len(detections),
"category_breakdown": category_counts,
"detections": detections,
"image_dimensions": {"width": int(w), "height": int(h)},
"reasoning": f"Detected {len(detections)} items across {len(category_counts)} categories at conf={conf}.",
})
return [_json_text(out)]
except Exception as e:
return [_json_text(_err(f"Post-processing failure: {e}", code="postprocess_error"))]
async def analyze_severity(args: Dict[str, Any]) -> list[types.TextContent]:
"""
🧠 STEP 2: Analyze severity & health risk
Input: detections (list), image_dimensions {width,height}
"""
dets = args.get("detections", [])
dims = args.get("image_dimensions", {})
try:
w = float(dims.get("width", 0))
h = float(dims.get("height", 0))
if w <= 0 or h <= 0:
raise ValueError
except Exception:
return [_json_text(_err("image_dimensions must include positive width/height", code="bad_request"))]
count = int(len(dets))
image_area = max(1.0, w * h)
total_waste_area = float(sum(float(d.get("area", 0)) for d in dets))
coverage_percent = (total_waste_area / image_area) * 100.0
# Rule-of-thumb classification
if count == 0:
severity, health_risk, priority = "clean", "none", 0
elif count < 5 and coverage_percent < 5:
severity, health_risk, priority = "moderate", "low", 2
elif count < 15 and coverage_percent < 15:
severity, health_risk, priority = "moderate", "moderate", 3
else:
severity, health_risk, priority = "severe", "high", 4
categories = [str(d.get("category", "")) for d in dets]
hazardous = any(c in ["garbage_bag", "waste"] for c in categories)
if hazardous and severity == "moderate":
health_risk = "moderate"
priority = max(priority, 3)
out = _ok({
"severity_level": severity,
"health_risk": health_risk,
"priority_score": int(priority),
"area_coverage_percent": _round(coverage_percent, 2),
"total_waste_area_pixels": _round(total_waste_area, 1),
"metrics": {
"detection_count": count,
"density_per_megapixel": _round(count / (image_area / 1_000_000.0), 2),
"average_item_size_px": _round((total_waste_area / count) if count else 0, 1),
},
"reasoning": f"{count} detections covering {_round(coverage_percent,1)}% area → {severity.upper()} / {health_risk.upper()}",
})
return [_json_text(out)]
async def extract_location(args: Dict[str, Any]) -> list[types.TextContent]:
"""
📍 STEP 3: Extract or assign location (EXIF/manual).
Note: EXIF parsing omitted for brevity; we mark as attempted.
"""
area_type = "street"
address = None
coords = None
extraction_method = "none"
img_path = args.get("image_path")
if img_path:
extraction_method = "attempted_exif"
# In production: read EXIF here and fill coords/address if available.
manual = args.get("manual_location")
if isinstance(manual, dict):
address = manual.get("address") or address
coords = manual.get("coordinates") or coords
area_type = manual.get("area_type") or area_type
extraction_method = "manual_input"
if area_type not in AREA_TYPES:
area_type = "street"
out = _ok({
"coordinates": coords,
"address": address or "Not provided",
"area_type": area_type,
"area_characteristics": AREA_TYPES[area_type],
"extraction_method": extraction_method,
"reasoning": f"Area type set to {area_type.upper()} via {extraction_method}.",
})
return [_json_text(out)]
async def identify_stakeholders(args: Dict[str, Any]) -> list[types.TextContent]:
"""
👥 STEP 4: Stakeholder calculation based on area_type + severity_level.
"""
area_type = args.get("area_type", "street")
severity = args.get("severity_level", "moderate")
if area_type not in AREA_TYPES:
area_type = "street"
base = AREA_TYPES[area_type]
stakeholders = list(base["stakeholders"])
urgency = base["urgency"]
if severity == "severe":
stakeholders += ["Environmental Health Department", "Public Health Office"]
urgency = "high"
if urgency == "high":
channels = ["immediate_sms", "email", "system_alert"]
elif urgency == "medium":
channels = ["email", "system_alert"]
else:
channels = ["system_alert", "email"]
out = _ok({
"primary_stakeholders": stakeholders,
"urgency_level": urgency,
"notification_channels": channels,
"contact_priority": "high" if severity == "severe" else "medium",
"reasoning": f"{area_type.upper()} + {severity.upper()}{len(stakeholders)} stakeholders, {urgency.upper()} urgency.",
})
return [_json_text(out)]
async def estimate_cleanup(args: Dict[str, Any]) -> list[types.TextContent]:
"""
⏱️ STEP 5: Estimate duration, workers, equipment, suggested schedule.
"""
try:
count = int(args.get("detection_count", 0))
severity = str(args.get("severity_level", "moderate"))
coverage = float(args.get("area_coverage", 0.0))
waste_breakdown = dict(args.get("waste_breakdown", {}))
except Exception:
return [_json_text(_err("Invalid input types for estimate_cleanup.", code="bad_request"))]
base_time_per_item = 3.0 # minutes
total_time = count * base_time_per_item
severity_mult = {"clean": 0.5, "moderate": 1.0, "severe": 1.5}.get(severity, 1.0)
total_time *= severity_mult
# simple worker logic
if total_time < 30:
workers = 1
elif total_time < 90:
workers = 2
else:
workers = 3
actual_minutes = total_time / max(1, workers)
actual_minutes += 15 # setup/teardown
equipment = ["Trash bags", "Gloves", "Grabber tools"]
if count > 20:
equipment.append("Waste collection cart")
if severity == "severe":
equipment += ["Safety vests", "Broom and dustpan"]
suggested_date = (datetime.now() + timedelta(days=1)).replace(hour=10, minute=0, second=0, microsecond=0)
out = _ok({
"estimated_duration_minutes": int(round(actual_minutes)),
"workers_required": int(workers),
"equipment_needed": equipment,
"suggested_schedule": {
"date": suggested_date.strftime("%Y-%m-%d"),
"time": suggested_date.strftime("%H:%M"),
"day_of_week": suggested_date.strftime("%A"),
},
"calculation_details": {
"items_to_collect": count,
"base_time_minutes": int(round(total_time)),
"severity_multiplier": severity_mult,
"area_coverage": _round(coverage, 2),
"waste_breakdown": waste_breakdown,
},
"reasoning": f"{count} items → ~{int(round(actual_minutes))} min with {workers} worker(s).",
})
return [_json_text(out)]
async def calculate_resources(args: Dict[str, Any]) -> list[types.TextContent]:
"""
🛠️ STEP 6: Calculate bags, gloves, vehicle, and budget.
"""
try:
count = int(args.get("detection_count", 0))
categories = dict(args.get("waste_categories", {}))
area_type = str(args.get("area_type", "street"))
except Exception:
return [_json_text(_err("Invalid input types for calculate_resources.", code="bad_request"))]
bags = max(1, count // 10 + 1)
gloves_pairs = max(2, count // 15)
if count < 20:
vehicle = "None (hand collection)"
elif count <= 50:
vehicle = "Small waste collection vehicle"
else:
vehicle = "Standard waste collection truck"
labor_hours = (count * 3.0) / 60.0 # 3 mins per item
budget = (labor_hours * 15.0) + 50.0
out = _ok({
"waste_bags": int(bags),
"gloves_pairs": int(gloves_pairs),
"vehicle_requirement": vehicle,
"budget_estimate_usd": _round(budget, 2),
"detailed_breakdown": {
"consumables": {
"heavy_duty_bags": int(bags),
"disposable_gloves": int(gloves_pairs),
"sanitizer": "1 bottle",
},
"tools": {
"grabber_sticks": 2,
"brooms": 1 if count > 10 else 0,
"dustpan": 1 if count > 10 else 0,
},
"transport": vehicle,
},
"reasoning": f"{count} items → {bags} bags, {vehicle}. Est. budget ${_round(budget,2)}.",
})
return [_json_text(out)]
async def generate_report(args: Dict[str, Any]) -> list[types.TextContent]:
"""
📊 STEP 7: Generate comprehensive report object.
"""
detection = dict(args.get("detection_results", {}))
severity = dict(args.get("severity_analysis", {}))
location = dict(args.get("location_info", {}))
stakeholders = dict(args.get("stakeholders", {}))
estimate = dict(args.get("cleanup_estimate", {}))
resources = dict(args.get("resources", {}))
rid = f"CLN-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
out = _ok({
"report_id": rid,
"generated_at": datetime.now().isoformat(),
"executive_summary": {
"total_waste_items": detection.get("total_detections", 0),
"severity": str(severity.get("severity_level", "unknown")).upper(),
"health_risk": str(severity.get("health_risk", "unknown")).upper(),
"priority": severity.get("priority_score", 0),
"location_type": str(location.get("area_type", "unknown")).upper(),
"cleanup_duration": f"{estimate.get('estimated_duration_minutes', 0)} minutes",
"workers_needed": estimate.get("workers_required", 1),
},
"detection_details": {
"categories_found": detection.get("category_breakdown", {}),
"area_coverage": f"{severity.get('area_coverage_percent', 0):.1f}%",
"detection_confidence": "High (YOLOv8)",
},
"location_information": {
"area_type": location.get("area_type", "unknown"),
"coordinates": location.get("coordinates"),
"address": location.get("address", "Not provided"),
},
"stakeholder_notifications": {
"notify": stakeholders.get("primary_stakeholders", []),
"urgency": stakeholders.get("urgency_level", "medium"),
"channels": stakeholders.get("notification_channels", []),
},
"cleanup_plan": {
"scheduled_date": estimate.get("suggested_schedule", {}).get("date"),
"scheduled_time": estimate.get("suggested_schedule", {}).get("time"),
"duration_minutes": estimate.get("estimated_duration_minutes"),
"team_size": estimate.get("workers_required"),
"equipment": estimate.get("equipment_needed", []),
},
"resource_requirements": {
"budget": f"${resources.get('budget_estimate_usd', 0):.2f}",
"materials": resources.get("detailed_breakdown", {}),
"vehicle": resources.get("vehicle_requirement", "Not required"),
},
"recommendations": [
f"Deploy {estimate.get('workers_required', 1)} worker(s) on {estimate.get('suggested_schedule', {}).get('date', 'TBD')}",
f"Notify {', '.join(stakeholders.get('primary_stakeholders', [])[:2])}",
f"Allocate ${resources.get('budget_estimate_usd', 0):.2f} from municipal cleaning budget",
"Schedule follow-up inspection after 1 week" if severity.get("severity_level") == "severe" else "No follow-up required",
],
"agent_reasoning": "Report combines detections, severity metrics, location context, and resource estimates.",
})
return [_json_text(out)]
async def create_timeline(args: Dict[str, Any]) -> list[types.TextContent]:
"""
📅 STEP 8: Construct a milestone timeline from estimate + urgency.
"""
estimate = dict(args.get("cleanup_estimate", {}))
urgency = str(args.get("urgency_level", "medium"))
preferred_date = args.get("preferred_date") # ISO date string (optional)
now = datetime.now()
if urgency == "critical":
notification_time = now + timedelta(minutes=15)
deployment_time = now + timedelta(hours=2)
elif urgency == "high":
notification_time = now + timedelta(hours=1)
deployment_time = now + timedelta(hours=4)
elif urgency == "medium":
notification_time = now + timedelta(hours=2)
deployment_time = now + timedelta(days=1)
else:
notification_time = now + timedelta(hours=4)
deployment_time = now + timedelta(days=2)
# Override date if preferred_date provided (keep time)
if preferred_date:
try:
_d = datetime.fromisoformat(preferred_date)
deployment_time = deployment_time.replace(year=_d.year, month=_d.month, day=_d.day)
except Exception:
# ignore invalid preferred_date
pass
cleanup_minutes = int(estimate.get("estimated_duration_minutes", 60))
cleanup_td = timedelta(minutes=cleanup_minutes)
completion_time = deployment_time + cleanup_td
verification_time = completion_time + timedelta(hours=2)
out = _ok({
"timeline_id": f"TL-{now.strftime('%Y%m%d-%H%M%S')}",
"created_at": now.isoformat(),
"milestones": [
{
"step": 1,
"action": "Send notifications to stakeholders",
"scheduled_time": notification_time.isoformat(),
"relative_time": "15 minutes" if urgency == "critical" else "1-2 hours",
"responsible": "Automated system",
"status": "pending",
},
{
"step": 2,
"action": "Assign cleanup crew and allocate resources",
"scheduled_time": (notification_time + timedelta(hours=1)).isoformat(),
"relative_time": "1 hour after notification",
"responsible": "Municipal Waste Department",
"status": "pending",
},
{
"step": 3,
"action": "Deploy cleanup team to location",
"scheduled_time": deployment_time.isoformat(),
"relative_time": deployment_time.strftime("%A, %B %d at %H:%M"),
"responsible": "Cleanup crew",
"status": "scheduled",
},
{
"step": 4,
"action": f"Execute cleanup operation ({cleanup_minutes} min)",
"scheduled_time": deployment_time.isoformat(),
"relative_time": f"{cleanup_minutes} minutes",
"responsible": f"{estimate.get('workers_required', 1)} worker(s)",
"status": "scheduled",
},
{
"step": 5,
"action": "Complete cleanup and remove waste",
"scheduled_time": completion_time.isoformat(),
"relative_time": completion_time.strftime("%A, %B %d at %H:%M"),
"responsible": "Cleanup crew",
"status": "scheduled",
},
{
"step": 6,
"action": "Verification and photo documentation",
"scheduled_time": verification_time.isoformat(),
"relative_time": "2 hours after completion",
"responsible": "Quality assurance team",
"status": "scheduled",
},
],
"summary": {
"total_duration_hours": _round((verification_time - now).total_seconds() / 3600.0, 1),
"urgency_classification": urgency.upper(),
"estimated_completion": completion_time.strftime("%A, %B %d, %Y at %H:%M"),
"cleanup_window": f"{cleanup_minutes} minutes",
},
"reasoning": f"Urgency {urgency.upper()} → notify at {notification_time:%H:%M}, deploy {deployment_time:%a %H:%M}, complete {completion_time:%H:%M}.",
})
return [_json_text(out)]
# -----------------------------------------------------------------------------
# 🏆 NEW B2B TOOLS FOR WASTE COMPANIES
# -----------------------------------------------------------------------------
async def generate_contract_bid(args: Dict[str, Any]) -> list[types.TextContent]:
"""
💼 NEW TOOL 9: Generate professional contract bid for waste companies
Takes cleanup estimate and generates a competitive bid with:
- Labor costs (hourly rates × workers × time)
- Equipment costs (vehicle, tools, supplies)
- Overhead markup (typically 15-25%)
- Profit margin (configurable, default 20%)
- Competitive positioning
- Contract terms
"""
try:
# Input data
cleanup_estimate = dict(args.get("cleanup_estimate", {}))
resources = dict(args.get("resources", {}))
detection_count = int(args.get("detection_count", 0))
profit_margin = float(args.get("profit_margin_percent", 20.0)) # Default 20%
company_name = str(args.get("company_name", "Your Waste Management Co."))
client_name = str(args.get("client_name", "Municipal Authority"))
# Cost breakdown calculations
workers = int(cleanup_estimate.get("workers_required", 1))
duration_minutes = float(cleanup_estimate.get("estimated_duration_minutes", 60))
duration_hours = duration_minutes / 60.0
# Labor costs (industry standard: $15-25/hour per worker)
labor_rate_per_hour = 18.50
labor_cost = workers * duration_hours * labor_rate_per_hour
# Equipment & supply costs
vehicle_cost = 0.0
vehicle_req = resources.get("vehicle_requirement", "None")
if "truck" in vehicle_req.lower():
vehicle_cost = 85.0 # Truck rental/amortization
elif "small" in vehicle_req.lower():
vehicle_cost = 45.0
bags = int(resources.get("waste_bags", 3))
gloves = int(resources.get("gloves_pairs", 2))
supplies_cost = (bags * 2.5) + (gloves * 3.0) + 15.0 # bags + gloves + misc
# Subtotal direct costs
direct_costs = labor_cost + vehicle_cost + supplies_cost
# Overhead (insurance, admin, fuel, etc.) - typically 18%
overhead_rate = 0.18
overhead_cost = direct_costs * overhead_rate
# Total cost before profit
total_cost = direct_costs + overhead_cost
# Profit margin
profit_amount = total_cost * (profit_margin / 100.0)
# Final bid amount
bid_amount = total_cost + profit_amount
# Competitive analysis (simulate market intelligence)
market_avg_low = bid_amount * 0.85
market_avg_high = bid_amount * 1.25
competitive_position = "COMPETITIVE" if profit_margin <= 22 else "PREMIUM"
# Generate bid document
bid_id = f"BID-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
out = _ok({
"bid_id": bid_id,
"generated_at": datetime.now().isoformat(),
"company_name": company_name,
"client_name": client_name,
"bid_summary": {
"total_bid_amount": _round(bid_amount, 2),
"profit_margin_percent": profit_margin,
"profit_amount": _round(profit_amount, 2),
"competitive_position": competitive_position,
"estimated_win_probability": "78%" if profit_margin <= 22 else "62%",
},
"cost_breakdown": {
"labor": {
"workers": workers,
"hours": _round(duration_hours, 2),
"rate_per_hour": labor_rate_per_hour,
"subtotal": _round(labor_cost, 2),
},
"equipment_vehicle": {
"vehicle_type": vehicle_req,
"subtotal": _round(vehicle_cost, 2),
},
"supplies": {
"waste_bags": bags,
"gloves_pairs": gloves,
"misc_supplies": "Safety vests, sanitizer",
"subtotal": _round(supplies_cost, 2),
},
"direct_costs_total": _round(direct_costs, 2),
"overhead_18_percent": _round(overhead_cost, 2),
"total_costs": _round(total_cost, 2),
"profit_margin": _round(profit_amount, 2),
},
"market_intelligence": {
"estimated_market_range": {
"low": _round(market_avg_low, 2),
"high": _round(market_avg_high, 2),
},
"your_bid_position": competitive_position,
"recommendation": "Strong bid" if profit_margin <= 25 else "Consider reducing margin for higher win rate",
},
"contract_terms": {
"service_description": f"Complete waste removal service for {detection_count} items",
"timeline": cleanup_estimate.get("suggested_schedule", {}),
"payment_terms": "Net 30 days",
"warranty": "90-day service guarantee",
"insurance": "Full liability and workers compensation",
},
"bid_document_preview": f"""
PROFESSIONAL SERVICE BID
Bid ID: {bid_id}
From: {company_name}
To: {client_name}
Date: {datetime.now().strftime('%B %d, %Y')}
SCOPE: Complete waste removal and site cleanup
ITEMS: {detection_count} waste items identified
CREW: {workers} trained personnel
DURATION: {_round(duration_hours, 2)} hours
EQUIPMENT: {vehicle_req}
TOTAL BID AMOUNT: ${_round(bid_amount, 2)} USD
Detailed breakdown available upon request.
Valid for 30 days from issue date.
""".strip(),
"reasoning": f"Generated competitive bid: ${_round(bid_amount, 2)} with {profit_margin}% margin. Market position: {competitive_position}.",
})
return [_json_text(out)]
except Exception as e:
return [_json_text(_err(f"Bid generation failed: {e}", code="calculation_error"))]
async def calculate_profit_margin(args: Dict[str, Any]) -> list[types.TextContent]:
"""
📊 NEW TOOL 10: Calculate profit margins and ROI for waste company operations
Helps companies understand profitability per job and optimize pricing strategy
"""
try:
bid_amount = float(args.get("bid_amount", 0))
actual_costs = float(args.get("actual_costs", 0))
labor_hours = float(args.get("labor_hours", 0))
monthly_jobs = int(args.get("monthly_jobs_estimate", 1))
# Profit calculations
gross_profit = bid_amount - actual_costs
profit_margin_percent = (gross_profit / bid_amount * 100) if bid_amount > 0 else 0
profit_per_hour = gross_profit / labor_hours if labor_hours > 0 else 0
# Monthly projections
monthly_revenue = bid_amount * monthly_jobs
monthly_profit = gross_profit * monthly_jobs
annual_projection = monthly_profit * 12
# Industry benchmarks
benchmark_margin = 18.5 # Industry average
performance_vs_benchmark = profit_margin_percent - benchmark_margin
rating = "EXCELLENT" if profit_margin_percent > 25 else \
"GOOD" if profit_margin_percent > 18 else \
"ACCEPTABLE" if profit_margin_percent > 12 else "LOW"
out = _ok({
"profitability_analysis": {
"gross_profit": _round(gross_profit, 2),
"profit_margin_percent": _round(profit_margin_percent, 2),
"profit_per_labor_hour": _round(profit_per_hour, 2),
"performance_rating": rating,
},
"projections": {
"monthly_revenue": _round(monthly_revenue, 2),
"monthly_profit": _round(monthly_profit, 2),
"annual_profit_projection": _round(annual_projection, 2),
"jobs_per_month": monthly_jobs,
},
"industry_comparison": {
"your_margin": _round(profit_margin_percent, 2),
"industry_average": benchmark_margin,
"difference": _round(performance_vs_benchmark, 2),
"status": "Above average" if performance_vs_benchmark > 0 else "Below average",
},
"recommendations": [
"Maintain current pricing strategy" if rating == "EXCELLENT" else "Consider optimizing labor efficiency",
f"Target {monthly_jobs * 1.2:.0f} jobs/month to reach ${annual_projection * 1.2:.2f}/year" if rating != "LOW" else "Review cost structure and pricing",
"Invest in route optimization to reduce fuel costs" if profit_margin_percent < 20 else "Strong margins - consider competitive expansion",
],
"reasoning": f"Profit margin of {_round(profit_margin_percent, 2)}% rated as {rating}. {performance_vs_benchmark:+.1f}% vs industry benchmark.",
})
return [_json_text(out)]
except Exception as e:
return [_json_text(_err(f"Profit calculation failed: {e}", code="calculation_error"))]
async def optimize_route(args: Dict[str, Any]) -> list[types.TextContent]:
"""
🗺️ NEW TOOL 11: Optimize collection routes for multiple waste sites
Helps waste companies plan efficient routes to maximize jobs per day
"""
try:
locations = list(args.get("locations", [])) # List of sites with coordinates
start_location = dict(args.get("depot_location", {"lat": 0, "lng": 0}))
if not locations:
return [_json_text(_err("No locations provided for route optimization", code="bad_request"))]
# Simple route optimization (for demo - real impl would use TSP solver)
num_sites = len(locations)
# Calculate total distance estimate (simplified)
avg_distance_between_sites = 3.5 # km average
total_distance_km = num_sites * avg_distance_between_sites
# Time calculations
avg_service_time_per_site = 45 # minutes
drive_time_per_km = 2.5 # minutes (city driving)
total_service_time = num_sites * avg_service_time_per_site
total_drive_time = total_distance_km * drive_time_per_km
total_time_minutes = total_service_time + total_drive_time
# Crew optimization
max_sites_per_crew_per_day = 8
crews_needed = (num_sites + max_sites_per_crew_per_day - 1) // max_sites_per_crew_per_day
# Cost estimates
fuel_cost = total_distance_km * 0.65 # $0.65/km for commercial vehicle
labor_cost = (total_time_minutes / 60.0) * 18.50 * 2 # 2 workers per crew
total_route_cost = fuel_cost + labor_cost
cost_per_site = total_route_cost / num_sites if num_sites > 0 else 0
# Generate optimized sequence (simplified - would use real routing algorithm)
optimized_sequence = [
{
"stop_number": i + 1,
"site_id": f"SITE-{i+1:03d}",
"estimated_arrival": (datetime.now() + timedelta(minutes=i * (avg_service_time_per_site + drive_time_per_km * avg_distance_between_sites))).strftime("%H:%M"),
"service_duration_minutes": avg_service_time_per_site,
"location": locations[i] if i < len(locations) else {"lat": 0, "lng": 0},
}
for i in range(num_sites)
]
out = _ok({
"route_summary": {
"total_sites": num_sites,
"total_distance_km": _round(total_distance_km, 1),
"total_time_hours": _round(total_time_minutes / 60.0, 1),
"crews_required": crews_needed,
"sites_per_crew": min(num_sites, max_sites_per_crew_per_day),
},
"cost_analysis": {
"fuel_cost": _round(fuel_cost, 2),
"labor_cost": _round(labor_cost, 2),
"total_route_cost": _round(total_route_cost, 2),
"cost_per_site": _round(cost_per_site, 2),
},
"optimized_sequence": optimized_sequence[:10], # Show first 10 stops
"efficiency_metrics": {
"sites_per_hour": _round(num_sites / (total_time_minutes / 60.0), 1) if total_time_minutes > 0 else 0,
"revenue_potential": _round(num_sites * 150.0, 2), # Avg $150/site
"estimated_profit": _round((num_sites * 150.0) - total_route_cost, 2),
},
"recommendations": [
f"Complete route in single day with {crews_needed} crew(s)" if num_sites <= 8 else f"Split into {crews_needed} days or deploy {crews_needed} crews",
"Start route at 7:00 AM for optimal traffic conditions",
f"Expected completion: {(datetime.now() + timedelta(minutes=total_time_minutes)).strftime('%I:%M %p')}",
"Prioritize high-value contracts in morning slots",
],
"reasoning": f"Optimized {num_sites} sites into {_round(total_time_minutes/60.0, 1)}hr route. Cost: ${_round(total_route_cost, 2)}, Revenue potential: ${_round(num_sites * 150.0, 2)}.",
})
return [_json_text(out)]
except Exception as e:
return [_json_text(_err(f"Route optimization failed: {e}", code="calculation_error"))]
# -----------------------------------------------------------------------------
# Entry point (MCP stdio)
# -----------------------------------------------------------------------------
async def main():
if not MCP_AVAILABLE:
print("⚠️ MCP is not installed. Install with: pip install mcp")
print(" This module exposes functions even without MCP, but stdio server won't run.")
return
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): # type: ignore
await server.run( # type: ignore
read_stream,
write_stream,
InitializationOptions(
server_name="cleaneye-agent",
server_version="2.0.0",
capabilities=server.get_capabilities(notification_options=NotificationOptions()), # type: ignore
),
)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass