|
|
""" |
|
|
Cleanup Planning Agent |
|
|
|
|
|
Orchestrates the full workflow from image analysis to actionable cleanup plans. |
|
|
""" |
|
|
|
|
|
from typing import Optional, Any |
|
|
from PIL import Image |
|
|
import base64 |
|
|
from io import BytesIO |
|
|
|
|
|
from tools.trash_detection_tool import detect_trash_mcp, format_detections_for_display |
|
|
from tools.cleanup_planner_tool import plan_cleanup |
|
|
from tools.history_tool import log_event, get_hotspots |
|
|
from tools.report_generator_tool import generate_report |
|
|
from trash_model import Detection |
|
|
|
|
|
|
|
|
def run_cleanup_workflow( |
|
|
image: Image.Image, |
|
|
location: Optional[str] = None, |
|
|
notes: Optional[str] = None, |
|
|
save_to_history: bool = True, |
|
|
use_llm_enhancement: bool = False, |
|
|
latitude: Optional[float] = None, |
|
|
longitude: Optional[float] = None |
|
|
): |
|
|
""" |
|
|
Execute the complete cleanup workflow for a trash detection event. |
|
|
|
|
|
This is the main orchestration function that: |
|
|
1. Detects trash in the image |
|
|
2. Plans cleanup actions |
|
|
3. Optionally logs to history |
|
|
4. Generates reports |
|
|
5. Returns all results for UI display |
|
|
|
|
|
Args: |
|
|
image: PIL Image to analyze |
|
|
location: Optional location description |
|
|
notes: Optional user notes |
|
|
save_to_history: Whether to log event to database |
|
|
use_llm_enhancement: Whether to use LLM for enhanced text generation |
|
|
latitude: Optional GPS latitude |
|
|
longitude: Optional GPS longitude |
|
|
|
|
|
Returns: |
|
|
Dict containing: |
|
|
- detection_results: Raw detection data |
|
|
- plan: Cleanup plan |
|
|
- report: Generated report text |
|
|
- event_id: Database ID (if saved) |
|
|
- visualization_data: Data for UI overlay |
|
|
- summary: Human-readable summary |
|
|
""" |
|
|
|
|
|
print("π Step 1: Detecting trash in image...") |
|
|
|
|
|
|
|
|
buffered = BytesIO() |
|
|
image.save(buffered, format="PNG") |
|
|
img_b64 = base64.b64encode(buffered.getvalue()).decode() |
|
|
|
|
|
detection_result = detect_trash_mcp(img_b64) |
|
|
detections: list[Detection] = detection_result["detections"] |
|
|
|
|
|
print(f" Found {detection_result['count']} items") |
|
|
|
|
|
if not detections: |
|
|
return { |
|
|
"detection_results": detection_result, |
|
|
"plan": None, |
|
|
"report": None, |
|
|
"event_id": None, |
|
|
"visualization_data": None, |
|
|
"summary": "No trash detected in this image. The area appears clean!", |
|
|
"status": "no_trash" |
|
|
} |
|
|
|
|
|
|
|
|
print("π Step 2: Planning cleanup actions...") |
|
|
plan = plan_cleanup( |
|
|
detections, |
|
|
location=location, |
|
|
notes=notes, |
|
|
use_llm=use_llm_enhancement |
|
|
) |
|
|
print(f" Severity: {plan['severity']}, Volunteers: {plan['recommended_volunteers']}") |
|
|
|
|
|
|
|
|
event_id = None |
|
|
if save_to_history: |
|
|
print("πΎ Step 3: Logging event to history...") |
|
|
log_result = log_event( |
|
|
detections=detections, |
|
|
severity=plan["severity"], |
|
|
location=location, |
|
|
notes=notes, |
|
|
image_path=None, |
|
|
latitude=latitude, |
|
|
longitude=longitude |
|
|
) |
|
|
event_id = log_result["event_id"] |
|
|
print(f" Saved as event #{event_id}") |
|
|
|
|
|
|
|
|
print("π Step 4: Generating report...") |
|
|
report_result = generate_report( |
|
|
detections=detections, |
|
|
severity=plan["severity"], |
|
|
location=location, |
|
|
notes=notes, |
|
|
event_id=event_id, |
|
|
plan=plan, |
|
|
format="email" |
|
|
) |
|
|
report_text = report_result["report"] |
|
|
|
|
|
|
|
|
visualization_data = _prepare_visualization_data(image, detections) |
|
|
|
|
|
|
|
|
summary = _create_workflow_summary(detection_result, plan, event_id) |
|
|
|
|
|
print("β
Workflow complete!") |
|
|
|
|
|
return { |
|
|
"detection_results": detection_result, |
|
|
"plan": plan, |
|
|
"report": report_text, |
|
|
"event_id": event_id, |
|
|
"visualization_data": visualization_data, |
|
|
"summary": summary, |
|
|
"status": "success" |
|
|
} |
|
|
|
|
|
|
|
|
def _prepare_visualization_data(image: Image.Image, detections: list[Detection]): |
|
|
"""Prepare data for drawing bounding boxes on image.""" |
|
|
return { |
|
|
"image_size": {"width": image.width, "height": image.height}, |
|
|
"boxes": [ |
|
|
{ |
|
|
"bbox": det["bbox"], |
|
|
"label": det["label"], |
|
|
"score": det["score"], |
|
|
"color": _get_color_for_label(det["label"]) |
|
|
} |
|
|
for det in detections |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
def _get_color_for_label(label: str) -> str: |
|
|
"""Get consistent color for trash category.""" |
|
|
color_map = { |
|
|
"plastic_bottle": "#FF6B6B", |
|
|
"plastic_bag": "#4ECDC4", |
|
|
"food_wrapper": "#FFD93D", |
|
|
"cigarette_butt": "#95E1D3", |
|
|
"paper_cup": "#F38181", |
|
|
"aluminum_can": "#AA96DA", |
|
|
"food_container": "#FCBAD3", |
|
|
"cardboard_box": "#A8D8EA", |
|
|
"glass_bottle": "#FFA07A", |
|
|
"other_trash": "#B0B0B0" |
|
|
} |
|
|
return color_map.get(label, "#FF0000") |
|
|
|
|
|
|
|
|
def _create_workflow_summary( |
|
|
detection_result, |
|
|
plan, |
|
|
event_id: Optional[int] |
|
|
) -> str: |
|
|
"""Create human-readable workflow summary.""" |
|
|
count = detection_result["count"] |
|
|
categories = detection_result["categories"] |
|
|
severity = plan["severity"] |
|
|
volunteers = plan["recommended_volunteers"] |
|
|
time = plan["estimated_time_minutes"] |
|
|
|
|
|
category_text = ", ".join(categories[:3]) |
|
|
if len(categories) > 3: |
|
|
category_text += f" and {len(categories) - 3} more" |
|
|
|
|
|
summary = f"""**Analysis Complete** |
|
|
|
|
|
Detected **{count} trash items** across {len(categories)} categories ({category_text}). |
|
|
|
|
|
**Severity:** {severity.upper()} |
|
|
|
|
|
**Recommended Action:** |
|
|
- {volunteers} volunteer(s) needed |
|
|
- Approximately {time} minutes |
|
|
- Action within {plan['urgency_days']} day(s) |
|
|
""" |
|
|
|
|
|
if event_id: |
|
|
summary += f"\nβ Event saved to history (ID: {event_id})" |
|
|
|
|
|
return summary |
|
|
|
|
|
|
|
|
def analyze_hotspots(days: int = 30): |
|
|
""" |
|
|
Analyze trash hotspots from historical data. |
|
|
|
|
|
Args: |
|
|
days: Time window for analysis |
|
|
|
|
|
Returns: |
|
|
Hotspot analysis with recommendations |
|
|
""" |
|
|
hotspots_data = get_hotspots(min_events=2, days=days) |
|
|
|
|
|
if not hotspots_data["hotspots"]: |
|
|
return { |
|
|
"hotspots": [], |
|
|
"message": f"No recurring hotspots found in the last {days} days.", |
|
|
"recommendation": "Continue monitoring and logging new events." |
|
|
} |
|
|
|
|
|
|
|
|
top_hotspot = hotspots_data["hotspots"][0] |
|
|
|
|
|
recommendation = f"""**Hotspot Alert** |
|
|
|
|
|
{hotspots_data['count']} location(s) with recurring trash issues identified. |
|
|
|
|
|
**Top Problem Area:** {top_hotspot['location']} |
|
|
- {top_hotspot['event_count']} events recorded |
|
|
- {top_hotspot['total_trash']} total items |
|
|
- Last event: {top_hotspot['last_event']} |
|
|
|
|
|
**Recommendation:** Consider setting up regular cleanup schedule or requesting permanent waste receptacles for this location. |
|
|
""" |
|
|
|
|
|
return { |
|
|
"hotspots": hotspots_data["hotspots"], |
|
|
"count": hotspots_data["count"], |
|
|
"recommendation": recommendation, |
|
|
"top_hotspot": top_hotspot |
|
|
} |
|
|
|