Spaces:
Sleeping
Sleeping
| """ | |
| Agent System wrapper around the CleanEye MCP Tools. | |
| Coordinates full pipeline from detection β severity β location β stakeholders β | |
| cleanup estimation β resources β report β timeline. | |
| This agent simulates calls to the MCP tools. | |
| You must have your MCP server running OR import the tool functions directly. | |
| """ | |
| import asyncio | |
| import json | |
| from typing import Any, Dict, List, Optional | |
| # Import ALL tools from your MCP server module. | |
| # This assumes your MCP server file is named mcp_server.py | |
| from mcp_server import ( | |
| detect_garbage, | |
| analyze_severity, | |
| extract_location, | |
| identify_stakeholders, | |
| estimate_cleanup, | |
| calculate_resources, | |
| generate_report, | |
| create_timeline | |
| ) | |
| class CleanEyeAgent: | |
| def __init__(self): | |
| self.reasoning_chain: List[str] = [] | |
| self.results: Dict[str, Any] = {} | |
| self.summary: Dict[str, Any] = {} | |
| # ---------------------------------------------------------------------- | |
| # Helper: track thoughts | |
| # ---------------------------------------------------------------------- | |
| def think(self, msg: str): | |
| self.reasoning_chain.append(msg) | |
| print(f"π§ {msg}") | |
| # ---------------------------------------------------------------------- | |
| # Core pipeline wrapper | |
| # ---------------------------------------------------------------------- | |
| async def run_full_pipeline(self, image_path: str, confidence: float = 0.25): | |
| """ | |
| Orchestrates full 8-step pipeline. | |
| """ | |
| # β STEP 1: DETECTION | |
| self.think("Running garbage detection...") | |
| det = await detect_garbage({"image_path": image_path, "confidence": confidence}) | |
| det = json.loads(det[0].text) | |
| self.results["detection"] = det | |
| self.think(f"Detected {det.get('total_detections', 0)} waste items.") | |
| # β STEP 2: SEVERITY ANALYSIS | |
| self.think("Analyzing severity...") | |
| sev_args = { | |
| "detections": det.get("detections", []), | |
| "image_dimensions": det.get("image_dimensions", {}) | |
| } | |
| sev = await analyze_severity(sev_args) | |
| sev = json.loads(sev[0].text) | |
| self.results["severity"] = sev | |
| self.think(f"Severity level β {sev.get('severity_level')}") | |
| # β STEP 3: LOCATION | |
| self.think("Extracting location (manual fallback)...") | |
| loc = await extract_location({"image_path": image_path}) | |
| loc = json.loads(loc[0].text) | |
| self.results["location"] = loc | |
| self.think(f"Area type β {loc.get('area_type')}") | |
| # β STEP 4: STAKEHOLDERS | |
| self.think("Determining stakeholders...") | |
| st_args = { | |
| "area_type": loc.get("area_type"), | |
| "severity_level": sev.get("severity_level"), | |
| "waste_categories": list(det.get("category_breakdown", {}).keys()) | |
| } | |
| stk = await identify_stakeholders(st_args) | |
| stk = json.loads(stk[0].text) | |
| self.results["stakeholders"] = stk | |
| self.think(f"Stakeholders identified β {len(stk.get('primary_stakeholders', []))}") | |
| # β STEP 5: CLEANUP ESTIMATION | |
| self.think("Estimating cleanup time and workers...") | |
| est_args = { | |
| "detection_count": det.get("total_detections", 0), | |
| "severity_level": sev.get("severity_level"), | |
| "area_coverage": sev.get("area_coverage_percent", 0.0), | |
| "waste_breakdown": det.get("category_breakdown", {}) | |
| } | |
| est = await estimate_cleanup(est_args) | |
| est = json.loads(est[0].text) | |
| self.results["estimate"] = est | |
| self.think(f"Cleanup β {est.get('estimated_duration_minutes')} minutes") | |
| # β STEP 6: RESOURCE CALCULATION | |
| self.think("Calculating resource requirements...") | |
| res_args = { | |
| "detection_count": det.get("total_detections", 0), | |
| "waste_categories": det.get("category_breakdown", {}), | |
| "area_type": loc.get("area_type") | |
| } | |
| res = await calculate_resources(res_args) | |
| res = json.loads(res[0].text) | |
| self.results["resources"] = res | |
| self.think(f"Budget estimate β ${res.get('budget_estimate_usd', 0)}") | |
| # β STEP 7: REPORT GENERATION | |
| self.think("Generating final structured report...") | |
| rep_args = { | |
| "detection_results": det, | |
| "severity_analysis": sev, | |
| "location_info": loc, | |
| "stakeholders": stk, | |
| "cleanup_estimate": est, | |
| "resources": res, | |
| } | |
| rep = await generate_report(rep_args) | |
| rep = json.loads(rep[0].text) | |
| self.results["report"] = rep | |
| self.think("Report generated successfully.") | |
| # β STEP 8: TIMELINE CREATION | |
| self.think("Creating actionable timeline...") | |
| tl_args = { | |
| "cleanup_estimate": est, | |
| "urgency_level": stk.get("urgency_level", "medium"), | |
| } | |
| tl = await create_timeline(tl_args) | |
| tl = json.loads(tl[0].text) | |
| self.results["timeline"] = tl | |
| self.think("Timeline ready.") | |
| # β Summaries | |
| self._build_summary() | |
| return rep, self.reasoning_chain | |
| # ---------------------------------------------------------------------- | |
| def _build_summary(self): | |
| """Simple high-level summary for quick reference""" | |
| det = self.results.get("detection", {}) | |
| sev = self.results.get("severity", {}) | |
| est = self.results.get("estimate", {}) | |
| res = self.results.get("resources", {}) | |
| self.summary = { | |
| "detections": det.get("total_detections"), | |
| "severity": sev.get("severity_level"), | |
| "cleanup_minutes": est.get("estimated_duration_minutes"), | |
| "workers": est.get("workers_required"), | |
| "budget_usd": res.get("budget_estimate_usd"), | |
| } | |
| # ---------------------------------------------------------------------- | |
| def get_summary(self) -> Dict[str, Any]: | |
| return self.summaryw | |
| def get_reasoning_chain(self) -> List[str]: | |
| return self.reasoning_chain | |