jtlevine's picture
Add LSTM neural model, ERA5 data, FAISS+BM25 RAG, Neon DB, eval suite; de-jargon frontend
f2b0895
"""
Shared singleton store that bridges pipeline output to API responses.
After a pipeline run completes, call ``store.update_from_pipeline(pipeline)``
to populate the store with real data. The API checks ``store.has_real_data``
and serves from the store when True, falling back to synthetic demo data
when False.
"""
from __future__ import annotations
import logging
import random
import threading
from datetime import datetime
from typing import Any
from config import ZONES, ZONE_MAP, CITIES, PAYOUT_PER_EVENT_USD
logger = logging.getLogger(__name__)
class PipelineStore:
"""Singleton that holds the latest pipeline run results."""
def __init__(self):
self.has_real_data = False
self.zones: list[dict] = []
self.indices: list[dict] = []
self.triggers: list[dict] = []
self.basis_risk: list[dict] = []
self.notifications: list[dict] = []
self.pipeline_runs: list[dict] = []
self.stats: dict[str, Any] = {}
self._lock = threading.Lock()
def update_from_pipeline(self, pipeline, run_result=None):
"""Convert pipeline state into the same dict shapes that
``_generate_demo_data()`` in api.py produces, so the dashboard
works identically with real or synthetic data.
Args:
pipeline: A ``HeatRiskPipeline`` instance after ``.run()``
has completed.
run_result: Optional ``PipelineRunResult`` returned by
``pipeline.run()``.
"""
with self._lock:
try:
now = datetime.utcnow()
zones = self._build_zones(pipeline, now)
triggers = self._build_triggers(pipeline, zones, now)
basis_risk = self._build_basis_risk(pipeline)
notifications = self._build_notifications(pipeline, triggers, now)
pipeline_run = self._build_pipeline_run(run_result) if run_result else None
self.zones = zones
self.triggers = triggers
self.basis_risk = basis_risk
self.notifications = notifications
if pipeline_run:
self.pipeline_runs.insert(0, pipeline_run)
self.pipeline_runs = self.pipeline_runs[:50]
self.stats = self._build_stats(
zones, triggers, self.pipeline_runs, now,
)
self.has_real_data = True
logger.info(
"Store updated: %d zones, %d triggers, %d notifications",
len(zones), len(triggers), len(notifications),
)
except Exception:
logger.exception("Failed to update store from pipeline")
# ------------------------------------------------------------------
# Conversion helpers (private)
# ------------------------------------------------------------------
def _build_zones(self, pipeline, now: datetime) -> list[dict]:
"""Build zone dicts from pipeline heat data."""
zones: list[dict] = []
rng = random.Random(42)
for z in ZONES:
zid = z.zone_id
heat = pipeline._heat_data.get(zid, {})
corrected_temps = heat.get("corrected_temps", [])
uhi_deltas = heat.get("uhi_deltas", [])
current_temp = corrected_temps[-1] if corrected_temps else 30.0
max_temp = max(corrected_temps) if corrected_temps else 33.0
mean_uhi = sum(uhi_deltas) / len(uhi_deltas) if uhi_deltas else 2.0
trigger_prob = heat.get("trigger_probability", 0.1)
pred_conf = heat.get("prediction_confidence", 0.3)
model_tier = heat.get("model_tier", "climatology")
# Determine risk level from triggers
zone_triggers = [t for t in pipeline._triggers if t.zone_id == zid]
if zone_triggers:
levels_priority = {"critical": 0, "warning": 1, "watch": 2}
best = min(zone_triggers, key=lambda t: levels_priority.get(t.trigger_level, 9))
risk_level = best.trigger_level
else:
risk_level = "normal"
healed = pipeline._healed.get(zid)
data_quality = healed.quality_score if healed else 0.85
enrolled = rng.randint(
200 if z.settlement_type == "informal" else 100,
800 if z.settlement_type == "informal" else 500,
)
zones.append({
"zone_id": zid,
"name": z.name,
"city": z.city,
"country": z.country,
"latitude": z.latitude,
"longitude": z.longitude,
"elevation_m": z.elevation_m,
"settlement_type": z.settlement_type,
"worker_population_est": z.worker_population_est,
"outdoor_exposure_pct": z.outdoor_exposure_pct,
"heat_vulnerability": z.heat_vulnerability,
"risk_level": risk_level,
"current_temp_c": round(current_temp, 1),
"max_temp_c": round(max_temp, 1),
"grid_temp_c": round(current_temp - mean_uhi, 1),
"uhi_delta_c": round(mean_uhi, 1),
"corrected_temp_c": round(current_temp, 1),
"trigger_probability_7d": round(trigger_prob, 3),
"prediction_confidence": round(pred_conf, 3),
"model_tier": model_tier,
"enrolled_workers": enrolled,
"data_quality": round(data_quality, 2),
"last_updated": now.isoformat(),
})
return zones
def _build_triggers(
self, pipeline, zones: list[dict], now: datetime,
) -> list[dict]:
"""Convert pipeline triggers into API-compatible dicts."""
triggers: list[dict] = []
enrolled_map = {z["zone_id"]: z["enrolled_workers"] for z in zones}
for te in pipeline._triggers:
payout = PAYOUT_PER_EVENT_USD.get(te.trigger_level, 5)
enrolled = enrolled_map.get(te.zone_id, 0)
triggers.append({
"trigger_id": te.trigger_id,
"zone_id": te.zone_id,
"zone_name": te.zone_name,
"city": te.city,
"trigger_level": te.trigger_level,
"trigger_date": te.trigger_date,
"heat_risk_score": round(getattr(te, "heat_risk_score", 0), 1),
"max_temp_c": round(getattr(te, "max_temp_c", 0), 1),
"max_wbgt_c": round(getattr(te, "max_wbgt_c", 0), 1),
"consecutive_days": getattr(te, "consecutive_days", 0),
"total_days_above": getattr(te, "total_days_above", 0),
"settlement_type": te.settlement_type,
"payout_per_worker_usd": payout,
"enrolled_workers": enrolled,
"total_payout_usd": payout * enrolled,
"status": te.status,
})
return triggers
def _build_basis_risk(self, pipeline) -> list[dict]:
"""Convert pipeline basis risk into API-compatible list."""
results: list[dict] = []
for zone_id, report in pipeline._basis_risk.items():
zone = ZONE_MAP.get(zone_id)
if zone is None:
continue
if hasattr(report, "overall_score"):
rec_text = "; ".join(report.recommendations) if report.recommendations else "Current calibration adequate"
results.append({
"zone_id": zone_id,
"zone_name": report.zone_name,
"city": report.city,
"overall_score": round(report.overall_score, 3),
"false_positive_rate": round(report.false_positive_rate, 3),
"false_negative_rate": round(report.false_negative_rate, 3),
"correlation": round(report.correlation, 3),
"settlement_type": report.settlement_type,
"heat_vulnerability": zone.heat_vulnerability,
"recommendation": rec_text,
})
else:
results.append({
"zone_id": zone_id,
"zone_name": zone.name,
"city": zone.city,
"overall_score": round(report.get("overall_score", 0), 3),
"false_positive_rate": round(report.get("false_positive_rate", 0), 3),
"false_negative_rate": round(report.get("false_negative_rate", 0), 3),
"correlation": round(report.get("correlation", 0), 3),
"settlement_type": zone.settlement_type,
"heat_vulnerability": zone.heat_vulnerability,
"recommendation": report.get("recommendation", "Current calibration adequate"),
})
return results
def _build_notifications(
self, pipeline, triggers: list[dict], now: datetime,
) -> list[dict]:
"""Convert pipeline explanations + notifications into API dicts."""
notifications: list[dict] = []
trigger_map = {t["zone_id"]: t for t in triggers}
for i, explanation in enumerate(pipeline._explanations, start=1):
zone_id = explanation.zone_id
trigger = trigger_map.get(zone_id, {})
zone = ZONE_MAP.get(zone_id)
zone_name = zone.name if zone else zone_id
city = zone.city if zone else ""
enrolled = trigger.get("enrolled_workers", 0)
delivery = pipeline._notifications[i - 1] if i <= len(pipeline._notifications) else None
notifications.append({
"id": f"NOT-{2*i - 1:04d}",
"zone_id": zone_id,
"zone_name": zone_name,
"city": city,
"trigger_level": explanation.trigger_level,
"channel": delivery.channel if delivery else "console",
"language": "en",
"recipient_count": enrolled,
"message_preview": (
f"HEAT ALERT [{explanation.trigger_level.upper()}]: "
f"{zone_name}, {city}. "
f"{explanation.english_text[:120]}"
),
"status": delivery.status if delivery else "dry_run",
"delivered_at": delivery.timestamp if delivery else now.isoformat(),
"cost_estimate": round(delivery.cost_estimate, 2) if delivery else 0.0,
})
notifications.append({
"id": f"NOT-{2*i:04d}",
"zone_id": zone_id,
"zone_name": zone_name,
"city": city,
"trigger_level": explanation.trigger_level,
"channel": "sms",
"language": "sw",
"recipient_count": enrolled,
"message_preview": (
f"TAHADHARI YA JOTO [{explanation.trigger_level.upper()}]: "
f"{zone_name}, {city}. "
f"{explanation.swahili_text[:120]}"
),
"status": delivery.status if delivery else "dry_run",
"delivered_at": delivery.timestamp if delivery else now.isoformat(),
"cost_estimate": round(enrolled * 0.0075, 2),
})
return notifications
def _build_pipeline_run(self, run_result) -> dict:
return {
"run_id": run_result.run_id,
"started_at": run_result.started_at,
"ended_at": run_result.ended_at,
"status": run_result.status,
"duration_s": round(run_result.duration_s, 1),
"zones_processed": run_result.zones_processed,
"triggers_found": run_result.triggers_found,
"notifications_sent": run_result.notifications_sent,
"total_cost_usd": round(run_result.total_cost_usd, 4),
"steps": [
{
"step": s.step,
"status": s.status,
"duration_s": round(s.duration_s, 1),
}
for s in run_result.steps
],
}
def _build_stats(
self,
zones: list[dict],
triggers: list[dict],
pipeline_runs: list[dict],
now: datetime,
) -> dict:
total_runs = len(pipeline_runs)
successful = sum(1 for r in pipeline_runs if r["status"] == "ok")
total_cost = sum(r.get("total_cost_usd", 0) for r in pipeline_runs)
return {
"total_runs": total_runs,
"successful_runs": successful,
"success_rate": round(successful / max(1, total_runs), 2),
"zones_monitored": len(ZONES),
"cities": len(CITIES),
"active_triggers": len([t for t in triggers if t.get("status") == "active"]),
"total_enrolled": sum(z["enrolled_workers"] for z in zones),
"total_cost_usd": round(total_cost, 2),
"avg_cost_per_run_usd": round(total_cost / max(1, total_runs), 4),
"last_run": pipeline_runs[0]["started_at"] if pipeline_runs else None,
"data_sources": ["ERA5-Land", "NASA POWER"],
}
# Module-level singleton
store = PipelineStore()