Spaces:
Running
Running
| import sqlite3 | |
| import json | |
| import os | |
| import math | |
| from datetime import datetime | |
| from typing import List, Dict, Optional | |
| DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data', 'gridsense_memory.db') | |
| def init_db(): | |
| os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| c.execute(""" | |
| CREATE TABLE IF NOT EXISTS neighborhood_reports ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| lat REAL NOT NULL, | |
| lon REAL NOT NULL, | |
| city TEXT, | |
| neighborhood TEXT, | |
| report_text TEXT, | |
| weather_condition TEXT, | |
| predicted_probability INTEGER, | |
| actual_outcome TEXT, | |
| outcome_confirmed INTEGER DEFAULT 0, | |
| time_of_day TEXT, | |
| day_of_week TEXT, | |
| signal_keywords TEXT, | |
| confidence_level TEXT | |
| ) | |
| """) | |
| conn.commit() | |
| conn.close() | |
| def haversine_distance(lat1, lon1, lat2, lon2) -> float: | |
| R = 6371.0 | |
| lat1_r, lon1_r = math.radians(lat1), math.radians(lon1) | |
| lat2_r, lon2_r = math.radians(lat2), math.radians(lon2) | |
| dlat = lat2_r - lat1_r | |
| dlon = lon2_r - lon1_r | |
| a = math.sin(dlat/2)**2 + math.cos(lat1_r)*math.cos(lat2_r)*math.sin(dlon/2)**2 | |
| return R * 2 * math.asin(math.sqrt(a)) | |
| def save_report(lat: float, lon: float, city: str, neighborhood: str, | |
| report_text: str, weather_condition: str, | |
| predicted_probability: int, time_of_day: str, | |
| day_of_week: str, signal_keywords: list, | |
| confidence_level: str) -> int: | |
| init_db() | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| c.execute(""" | |
| INSERT INTO neighborhood_reports | |
| (timestamp, lat, lon, city, neighborhood, report_text, | |
| weather_condition, predicted_probability, time_of_day, | |
| day_of_week, signal_keywords, confidence_level) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| datetime.utcnow().isoformat(), lat, lon, city, neighborhood, | |
| report_text, weather_condition, predicted_probability, | |
| time_of_day, day_of_week, json.dumps(signal_keywords), confidence_level | |
| )) | |
| report_id = c.lastrowid | |
| conn.commit() | |
| conn.close() | |
| return report_id | |
| def confirm_outcome(report_id: int, actual_outcome: str): | |
| init_db() | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| c.execute(""" | |
| UPDATE neighborhood_reports | |
| SET actual_outcome = ?, outcome_confirmed = 1 | |
| WHERE id = ? | |
| """, (actual_outcome, report_id)) | |
| conn.commit() | |
| conn.close() | |
| def get_similar_past_reports(lat: float, lon: float, | |
| radius_km: float = 1.5, | |
| limit: int = 8) -> List[Dict]: | |
| init_db() | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| lat_range = radius_km / 111.0 | |
| lon_range = radius_km / (111.0 * abs(math.cos(math.radians(lat))) + 0.001) | |
| c.execute(""" | |
| SELECT id, timestamp, lat, lon, neighborhood, report_text, | |
| weather_condition, predicted_probability, actual_outcome, | |
| outcome_confirmed, time_of_day, day_of_week, confidence_level | |
| FROM neighborhood_reports | |
| WHERE lat BETWEEN ? AND ? | |
| AND lon BETWEEN ? AND ? | |
| ORDER BY timestamp DESC | |
| LIMIT 50 | |
| """, (lat - lat_range, lat + lat_range, | |
| lon - lon_range, lon + lon_range)) | |
| rows = c.fetchall() | |
| conn.close() | |
| nearby = [] | |
| for row in rows: | |
| dist = haversine_distance(lat, lon, row[2], row[3]) | |
| if dist <= radius_km: | |
| nearby.append({ | |
| "id": row[0], | |
| "timestamp": row[1], | |
| "lat": row[2], | |
| "lon": row[3], | |
| "distance_km": round(dist, 2), | |
| "neighborhood": row[4], | |
| "report": row[5], | |
| "weather": row[6], | |
| "predicted_probability": row[7], | |
| "actual_outcome": row[8], | |
| "outcome_confirmed": bool(row[9]), | |
| "time_of_day": row[10], | |
| "day_of_week": row[11], | |
| "confidence": row[12] | |
| }) | |
| nearby.sort(key=lambda x: x["distance_km"]) | |
| return nearby[:limit] | |
| def get_neighborhood_accuracy(lat: float, lon: float, | |
| radius_km: float = 1.5) -> Dict: | |
| reports = get_similar_past_reports(lat, lon, radius_km, limit=100) | |
| confirmed = [r for r in reports if r["outcome_confirmed"]] | |
| if len(confirmed) < 3: | |
| return { | |
| "accuracy_available": False, | |
| "total_confirmed": len(confirmed), | |
| "message": f"Only {len(confirmed)} confirmed outcomes. Need 3+ for accuracy metrics." | |
| } | |
| correct = sum(1 for r in confirmed | |
| if (r["predicted_probability"] >= 65 and r["actual_outcome"] == "outage_occurred") or | |
| (r["predicted_probability"] < 65 and r["actual_outcome"] == "no_outage")) | |
| accuracy = correct / len(confirmed) | |
| outage_reports = [r for r in confirmed if r["actual_outcome"] == "outage_occurred"] | |
| return { | |
| "accuracy_available": True, | |
| "accuracy_percent": round(accuracy * 100, 1), | |
| "total_confirmed": len(confirmed), | |
| "total_outages_confirmed": len(outage_reports), | |
| "message": f"Based on {len(confirmed)} confirmed predictions in this neighborhood" | |
| } | |
| def format_memory_for_prompt(past_reports: List[Dict]) -> str: | |
| if not past_reports: | |
| return "No similar past reports found within 1.5km of this location." | |
| lines = [f"RETRIEVED MEMORY: {len(past_reports)} similar past reports from within 1.5km:\n"] | |
| for i, r in enumerate(past_reports, 1): | |
| outcome_str = "" | |
| if r["outcome_confirmed"]: | |
| outcome_str = f" ACTUAL OUTCOME: {r['actual_outcome']}" | |
| time_ago = _time_ago(r["timestamp"]) | |
| lines.append( | |
| f"Past Event {i} ({time_ago}, {r['distance_km']}km away):\n" | |
| f" Report: {r['report']}\n" | |
| f" Weather: {r['weather']}\n" | |
| f" Prediction: {r['predicted_probability']}%{outcome_str}\n" | |
| ) | |
| lines.append("\nUSE THESE PAST EVENTS to calibrate your current prediction.") | |
| return "\n".join(lines) | |
| def _time_ago(timestamp: str) -> str: | |
| try: | |
| dt = datetime.fromisoformat(timestamp) | |
| diff = datetime.utcnow() - dt | |
| if diff.days > 0: | |
| return f"{diff.days} days ago" | |
| hours = diff.seconds // 3600 | |
| if hours > 0: | |
| return f"{hours} hours ago" | |
| minutes = diff.seconds // 60 | |
| return f"{minutes} minutes ago" | |
| except: | |
| return "recently" | |
| def get_recent_reports_for_map(lat: float, lon: float, | |
| radius_km: float = 5.0, | |
| limit: int = 50) -> List[Dict]: | |
| init_db() | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| c.execute(""" | |
| SELECT id, timestamp, lat, lon, neighborhood, | |
| predicted_probability, outcome_confirmed, actual_outcome | |
| FROM neighborhood_reports | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| """, (limit,)) | |
| rows = c.fetchall() | |
| conn.close() | |
| return [{ | |
| "id": row[0], | |
| "timestamp": row[1], | |
| "lat": row[2], | |
| "lon": row[3], | |
| "neighborhood": row[4], | |
| "predicted_probability": row[5], | |
| "outcome_confirmed": bool(row[6]), | |
| "actual_outcome": row[7] | |
| } for row in rows] |