CC-CleanCity / test_agent.py
AlBaraa63's picture
Add comprehensive test suite for CleanEye B2B features
c9d941d
"""
Agent System wrapper around the CleanEye MCP Tools.
Coordinates full pipeline from detection β†’ severity β†’ location β†’ stakeholders β†’
cleanup estimation β†’ resources β†’ report β†’ timeline.
This agent simulates calls to the MCP tools.
You must have your MCP server running OR import the tool functions directly.
"""
import asyncio
import json
from typing import Any, Dict, List, Optional
# Import ALL tools from your MCP server module.
# This assumes your MCP server file is named mcp_server.py
from mcp_server import (
detect_garbage,
analyze_severity,
extract_location,
identify_stakeholders,
estimate_cleanup,
calculate_resources,
generate_report,
create_timeline
)
class CleanEyeAgent:
def __init__(self):
self.reasoning_chain: List[str] = []
self.results: Dict[str, Any] = {}
self.summary: Dict[str, Any] = {}
# ----------------------------------------------------------------------
# Helper: track thoughts
# ----------------------------------------------------------------------
def think(self, msg: str):
self.reasoning_chain.append(msg)
print(f"🧠 {msg}")
# ----------------------------------------------------------------------
# Core pipeline wrapper
# ----------------------------------------------------------------------
async def run_full_pipeline(self, image_path: str, confidence: float = 0.25):
"""
Orchestrates full 8-step pipeline.
"""
# βœ… STEP 1: DETECTION
self.think("Running garbage detection...")
det = await detect_garbage({"image_path": image_path, "confidence": confidence})
det = json.loads(det[0].text)
self.results["detection"] = det
self.think(f"Detected {det.get('total_detections', 0)} waste items.")
# βœ… STEP 2: SEVERITY ANALYSIS
self.think("Analyzing severity...")
sev_args = {
"detections": det.get("detections", []),
"image_dimensions": det.get("image_dimensions", {})
}
sev = await analyze_severity(sev_args)
sev = json.loads(sev[0].text)
self.results["severity"] = sev
self.think(f"Severity level β†’ {sev.get('severity_level')}")
# βœ… STEP 3: LOCATION
self.think("Extracting location (manual fallback)...")
loc = await extract_location({"image_path": image_path})
loc = json.loads(loc[0].text)
self.results["location"] = loc
self.think(f"Area type β†’ {loc.get('area_type')}")
# βœ… STEP 4: STAKEHOLDERS
self.think("Determining stakeholders...")
st_args = {
"area_type": loc.get("area_type"),
"severity_level": sev.get("severity_level"),
"waste_categories": list(det.get("category_breakdown", {}).keys())
}
stk = await identify_stakeholders(st_args)
stk = json.loads(stk[0].text)
self.results["stakeholders"] = stk
self.think(f"Stakeholders identified β†’ {len(stk.get('primary_stakeholders', []))}")
# βœ… STEP 5: CLEANUP ESTIMATION
self.think("Estimating cleanup time and workers...")
est_args = {
"detection_count": det.get("total_detections", 0),
"severity_level": sev.get("severity_level"),
"area_coverage": sev.get("area_coverage_percent", 0.0),
"waste_breakdown": det.get("category_breakdown", {})
}
est = await estimate_cleanup(est_args)
est = json.loads(est[0].text)
self.results["estimate"] = est
self.think(f"Cleanup β†’ {est.get('estimated_duration_minutes')} minutes")
# βœ… STEP 6: RESOURCE CALCULATION
self.think("Calculating resource requirements...")
res_args = {
"detection_count": det.get("total_detections", 0),
"waste_categories": det.get("category_breakdown", {}),
"area_type": loc.get("area_type")
}
res = await calculate_resources(res_args)
res = json.loads(res[0].text)
self.results["resources"] = res
self.think(f"Budget estimate β†’ ${res.get('budget_estimate_usd', 0)}")
# βœ… STEP 7: REPORT GENERATION
self.think("Generating final structured report...")
rep_args = {
"detection_results": det,
"severity_analysis": sev,
"location_info": loc,
"stakeholders": stk,
"cleanup_estimate": est,
"resources": res,
}
rep = await generate_report(rep_args)
rep = json.loads(rep[0].text)
self.results["report"] = rep
self.think("Report generated successfully.")
# βœ… STEP 8: TIMELINE CREATION
self.think("Creating actionable timeline...")
tl_args = {
"cleanup_estimate": est,
"urgency_level": stk.get("urgency_level", "medium"),
}
tl = await create_timeline(tl_args)
tl = json.loads(tl[0].text)
self.results["timeline"] = tl
self.think("Timeline ready.")
# βœ… Summaries
self._build_summary()
return rep, self.reasoning_chain
# ----------------------------------------------------------------------
def _build_summary(self):
"""Simple high-level summary for quick reference"""
det = self.results.get("detection", {})
sev = self.results.get("severity", {})
est = self.results.get("estimate", {})
res = self.results.get("resources", {})
self.summary = {
"detections": det.get("total_detections"),
"severity": sev.get("severity_level"),
"cleanup_minutes": est.get("estimated_duration_minutes"),
"workers": est.get("workers_required"),
"budget_usd": res.get("budget_estimate_usd"),
}
# ----------------------------------------------------------------------
def get_summary(self) -> Dict[str, Any]:
return self.summaryw
def get_reasoning_chain(self) -> List[str]:
return self.reasoning_chain