CleanCity / tools /history_tool.py
AlBaraa63
Recover
5369733
"""
History Tracking MCP Tool
Logs trash detection events and provides querying capabilities for analysis.
Uses SQLite for persistent storage.
"""
import sqlite3
import json
from datetime import datetime, timedelta
from typing import Optional, Any
from pathlib import Path
from trash_model import Detection
DB_PATH = Path(__file__).parent.parent / "data" / "trash_events.db"
def _get_connection() -> sqlite3.Connection:
"""Get SQLite connection and ensure schema exists."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
# Create schema if not exists
conn.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
location TEXT,
latitude REAL,
longitude REAL,
severity TEXT NOT NULL,
trash_count INTEGER NOT NULL,
categories TEXT NOT NULL,
detections_json TEXT NOT NULL,
notes TEXT,
image_path TEXT,
cleaned BOOLEAN DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp ON events(timestamp)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_location ON events(location)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_severity ON events(severity)
""")
conn.commit()
return conn
def log_event(
detections: list[Detection],
severity: str,
location: Optional[str] = None,
notes: Optional[str] = None,
image_path: Optional[str] = None,
latitude: Optional[float] = None,
longitude: Optional[float] = None
):
"""
Log a trash detection event to the database.
Args:
detections: List of trash detections
severity: "low" | "medium" | "high"
location: Human-readable location description
notes: Optional user notes
image_path: Optional path to saved image
latitude: Optional GPS latitude
longitude: Optional GPS longitude
Returns:
Dict with event_id and confirmation message
"""
conn = _get_connection()
timestamp = datetime.now().isoformat()
trash_count = len(detections)
categories = json.dumps(list(set(d["label"] for d in detections)))
detections_json = json.dumps(detections)
cursor = conn.execute("""
INSERT INTO events (
timestamp, location, latitude, longitude, severity,
trash_count, categories, detections_json, notes, image_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
timestamp, location, latitude, longitude, severity,
trash_count, categories, detections_json, notes, image_path
))
event_id = cursor.lastrowid
conn.commit()
conn.close()
return {
"event_id": event_id,
"timestamp": timestamp,
"message": f"Event logged successfully (ID: {event_id})"
}
def query_events(
days: Optional[int] = None,
location: Optional[str] = None,
severity: Optional[str] = None,
min_trash_count: Optional[int] = None,
cleaned_only: bool = False,
limit: int = 100
):
"""
Query trash events with filtering options.
Args:
days: Only events from last N days
location: Filter by location (partial match)
severity: Filter by severity level
min_trash_count: Minimum trash items threshold
cleaned_only: Only show cleaned events
limit: Maximum results to return
Returns:
Dict containing:
- events: List of matching events
- total_count: Total matching events
- summary: Aggregate statistics
"""
conn = _get_connection()
# Build query
conditions = []
params = []
if days:
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
conditions.append("timestamp >= ?")
params.append(cutoff)
if location:
conditions.append("location LIKE ?")
params.append(f"%{location}%")
if severity:
conditions.append("severity = ?")
params.append(severity)
if min_trash_count:
conditions.append("trash_count >= ?")
params.append(min_trash_count)
if cleaned_only:
conditions.append("cleaned = 1")
where_clause = " AND ".join(conditions) if conditions else "1=1"
# Get events
query = f"""
SELECT
id, timestamp, location, latitude, longitude,
severity, trash_count, categories, notes, image_path, cleaned
FROM events
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT ?
"""
params.append(limit)
cursor = conn.execute(query, params)
events = [dict(row) for row in cursor.fetchall()]
# Parse JSON fields
for event in events:
event["categories"] = json.loads(event["categories"])
# Get summary statistics
summary_query = f"""
SELECT
COUNT(*) as total_events,
SUM(trash_count) as total_trash_items,
AVG(trash_count) as avg_trash_per_event,
COUNT(DISTINCT location) as unique_locations
FROM events
WHERE {where_clause}
"""
summary_cursor = conn.execute(summary_query, params[:-1]) # Exclude limit param
summary = dict(summary_cursor.fetchone())
conn.close()
return {
"events": events,
"total_count": len(events),
"summary": summary,
"filters_applied": {
"days": days,
"location": location,
"severity": severity,
"min_trash_count": min_trash_count
}
}
def get_hotspots(min_events: int = 2, days: Optional[int] = 30):
"""
Identify locations with recurring trash problems.
Args:
min_events: Minimum number of events to qualify as hotspot
days: Time window to analyze (None = all time)
Returns:
Dict with hotspot locations and their statistics
"""
conn = _get_connection()
conditions = ["location IS NOT NULL"]
params = []
if days:
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
conditions.append("timestamp >= ?")
params.append(cutoff)
where_clause = " AND ".join(conditions)
query = f"""
SELECT
location,
COUNT(*) as event_count,
SUM(trash_count) as total_trash,
AVG(trash_count) as avg_trash,
MAX(timestamp) as last_event,
GROUP_CONCAT(DISTINCT severity) as severities
FROM events
WHERE {where_clause}
GROUP BY location
HAVING event_count >= ?
ORDER BY event_count DESC, total_trash DESC
"""
params.append(min_events)
cursor = conn.execute(query, params)
hotspots = [dict(row) for row in cursor.fetchall()]
conn.close()
return {
"hotspots": hotspots,
"count": len(hotspots),
"criteria": f"Locations with {min_events}+ events" + (f" in last {days} days" if days else "")
}
def mark_cleaned(event_id: int):
"""Mark an event as cleaned."""
conn = _get_connection()
conn.execute("UPDATE events SET cleaned = 1 WHERE id = ?", (event_id,))
conn.commit()
conn.close()
return {"message": f"Event {event_id} marked as cleaned"}