gridsense-gemma4 / brain /memory.py
Nestroy2003's picture
Sync from GitHub via hub-sync
91bf1ce verified
import sqlite3
import json
import os
import math
from datetime import datetime
from typing import List, Dict, Optional
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data', 'gridsense_memory.db')
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS neighborhood_reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
lat REAL NOT NULL,
lon REAL NOT NULL,
city TEXT,
neighborhood TEXT,
report_text TEXT,
weather_condition TEXT,
predicted_probability INTEGER,
actual_outcome TEXT,
outcome_confirmed INTEGER DEFAULT 0,
time_of_day TEXT,
day_of_week TEXT,
signal_keywords TEXT,
confidence_level TEXT
)
""")
conn.commit()
conn.close()
def haversine_distance(lat1, lon1, lat2, lon2) -> float:
R = 6371.0
lat1_r, lon1_r = math.radians(lat1), math.radians(lon1)
lat2_r, lon2_r = math.radians(lat2), math.radians(lon2)
dlat = lat2_r - lat1_r
dlon = lon2_r - lon1_r
a = math.sin(dlat/2)**2 + math.cos(lat1_r)*math.cos(lat2_r)*math.sin(dlon/2)**2
return R * 2 * math.asin(math.sqrt(a))
def save_report(lat: float, lon: float, city: str, neighborhood: str,
report_text: str, weather_condition: str,
predicted_probability: int, time_of_day: str,
day_of_week: str, signal_keywords: list,
confidence_level: str) -> int:
init_db()
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
INSERT INTO neighborhood_reports
(timestamp, lat, lon, city, neighborhood, report_text,
weather_condition, predicted_probability, time_of_day,
day_of_week, signal_keywords, confidence_level)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
datetime.utcnow().isoformat(), lat, lon, city, neighborhood,
report_text, weather_condition, predicted_probability,
time_of_day, day_of_week, json.dumps(signal_keywords), confidence_level
))
report_id = c.lastrowid
conn.commit()
conn.close()
return report_id
def confirm_outcome(report_id: int, actual_outcome: str):
init_db()
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
UPDATE neighborhood_reports
SET actual_outcome = ?, outcome_confirmed = 1
WHERE id = ?
""", (actual_outcome, report_id))
conn.commit()
conn.close()
def get_similar_past_reports(lat: float, lon: float,
radius_km: float = 1.5,
limit: int = 8) -> List[Dict]:
init_db()
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
lat_range = radius_km / 111.0
lon_range = radius_km / (111.0 * abs(math.cos(math.radians(lat))) + 0.001)
c.execute("""
SELECT id, timestamp, lat, lon, neighborhood, report_text,
weather_condition, predicted_probability, actual_outcome,
outcome_confirmed, time_of_day, day_of_week, confidence_level
FROM neighborhood_reports
WHERE lat BETWEEN ? AND ?
AND lon BETWEEN ? AND ?
ORDER BY timestamp DESC
LIMIT 50
""", (lat - lat_range, lat + lat_range,
lon - lon_range, lon + lon_range))
rows = c.fetchall()
conn.close()
nearby = []
for row in rows:
dist = haversine_distance(lat, lon, row[2], row[3])
if dist <= radius_km:
nearby.append({
"id": row[0],
"timestamp": row[1],
"lat": row[2],
"lon": row[3],
"distance_km": round(dist, 2),
"neighborhood": row[4],
"report": row[5],
"weather": row[6],
"predicted_probability": row[7],
"actual_outcome": row[8],
"outcome_confirmed": bool(row[9]),
"time_of_day": row[10],
"day_of_week": row[11],
"confidence": row[12]
})
nearby.sort(key=lambda x: x["distance_km"])
return nearby[:limit]
def get_neighborhood_accuracy(lat: float, lon: float,
radius_km: float = 1.5) -> Dict:
reports = get_similar_past_reports(lat, lon, radius_km, limit=100)
confirmed = [r for r in reports if r["outcome_confirmed"]]
if len(confirmed) < 3:
return {
"accuracy_available": False,
"total_confirmed": len(confirmed),
"message": f"Only {len(confirmed)} confirmed outcomes. Need 3+ for accuracy metrics."
}
correct = sum(1 for r in confirmed
if (r["predicted_probability"] >= 65 and r["actual_outcome"] == "outage_occurred") or
(r["predicted_probability"] < 65 and r["actual_outcome"] == "no_outage"))
accuracy = correct / len(confirmed)
outage_reports = [r for r in confirmed if r["actual_outcome"] == "outage_occurred"]
return {
"accuracy_available": True,
"accuracy_percent": round(accuracy * 100, 1),
"total_confirmed": len(confirmed),
"total_outages_confirmed": len(outage_reports),
"message": f"Based on {len(confirmed)} confirmed predictions in this neighborhood"
}
def format_memory_for_prompt(past_reports: List[Dict]) -> str:
if not past_reports:
return "No similar past reports found within 1.5km of this location."
lines = [f"RETRIEVED MEMORY: {len(past_reports)} similar past reports from within 1.5km:\n"]
for i, r in enumerate(past_reports, 1):
outcome_str = ""
if r["outcome_confirmed"]:
outcome_str = f" ACTUAL OUTCOME: {r['actual_outcome']}"
time_ago = _time_ago(r["timestamp"])
lines.append(
f"Past Event {i} ({time_ago}, {r['distance_km']}km away):\n"
f" Report: {r['report']}\n"
f" Weather: {r['weather']}\n"
f" Prediction: {r['predicted_probability']}%{outcome_str}\n"
)
lines.append("\nUSE THESE PAST EVENTS to calibrate your current prediction.")
return "\n".join(lines)
def _time_ago(timestamp: str) -> str:
try:
dt = datetime.fromisoformat(timestamp)
diff = datetime.utcnow() - dt
if diff.days > 0:
return f"{diff.days} days ago"
hours = diff.seconds // 3600
if hours > 0:
return f"{hours} hours ago"
minutes = diff.seconds // 60
return f"{minutes} minutes ago"
except:
return "recently"
def get_recent_reports_for_map(lat: float, lon: float,
radius_km: float = 5.0,
limit: int = 50) -> List[Dict]:
init_db()
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
SELECT id, timestamp, lat, lon, neighborhood,
predicted_probability, outcome_confirmed, actual_outcome
FROM neighborhood_reports
ORDER BY timestamp DESC
LIMIT ?
""", (limit,))
rows = c.fetchall()
conn.close()
return [{
"id": row[0],
"timestamp": row[1],
"lat": row[2],
"lon": row[3],
"neighborhood": row[4],
"predicted_probability": row[5],
"outcome_confirmed": bool(row[6]),
"actual_outcome": row[7]
} for row in rows]