""" Shared singleton store that bridges pipeline output to API responses. After a pipeline run completes, call ``store.update_from_pipeline(pipeline)`` to populate the store with real data. The API checks ``store.has_real_data`` and serves from the store when True, falling back to synthetic demo data when False. """ from __future__ import annotations import logging import random import threading from datetime import datetime from typing import Any from config import ZONES, ZONE_MAP, CITIES, PAYOUT_PER_EVENT_USD logger = logging.getLogger(__name__) class PipelineStore: """Singleton that holds the latest pipeline run results.""" def __init__(self): self.has_real_data = False self.zones: list[dict] = [] self.indices: list[dict] = [] self.triggers: list[dict] = [] self.basis_risk: list[dict] = [] self.notifications: list[dict] = [] self.pipeline_runs: list[dict] = [] self.stats: dict[str, Any] = {} self._lock = threading.Lock() def update_from_pipeline(self, pipeline, run_result=None): """Convert pipeline state into the same dict shapes that ``_generate_demo_data()`` in api.py produces, so the dashboard works identically with real or synthetic data. Args: pipeline: A ``HeatRiskPipeline`` instance after ``.run()`` has completed. run_result: Optional ``PipelineRunResult`` returned by ``pipeline.run()``. """ with self._lock: try: now = datetime.utcnow() zones = self._build_zones(pipeline, now) triggers = self._build_triggers(pipeline, zones, now) basis_risk = self._build_basis_risk(pipeline) notifications = self._build_notifications(pipeline, triggers, now) pipeline_run = self._build_pipeline_run(run_result) if run_result else None self.zones = zones self.triggers = triggers self.basis_risk = basis_risk self.notifications = notifications if pipeline_run: self.pipeline_runs.insert(0, pipeline_run) self.pipeline_runs = self.pipeline_runs[:50] self.stats = self._build_stats( zones, triggers, self.pipeline_runs, now, ) self.has_real_data = True logger.info( "Store updated: %d zones, %d triggers, %d notifications", len(zones), len(triggers), len(notifications), ) except Exception: logger.exception("Failed to update store from pipeline") # ------------------------------------------------------------------ # Conversion helpers (private) # ------------------------------------------------------------------ def _build_zones(self, pipeline, now: datetime) -> list[dict]: """Build zone dicts from pipeline heat data.""" zones: list[dict] = [] rng = random.Random(42) for z in ZONES: zid = z.zone_id heat = pipeline._heat_data.get(zid, {}) corrected_temps = heat.get("corrected_temps", []) uhi_deltas = heat.get("uhi_deltas", []) current_temp = corrected_temps[-1] if corrected_temps else 30.0 max_temp = max(corrected_temps) if corrected_temps else 33.0 mean_uhi = sum(uhi_deltas) / len(uhi_deltas) if uhi_deltas else 2.0 trigger_prob = heat.get("trigger_probability", 0.1) pred_conf = heat.get("prediction_confidence", 0.3) model_tier = heat.get("model_tier", "climatology") # Determine risk level from triggers zone_triggers = [t for t in pipeline._triggers if t.zone_id == zid] if zone_triggers: levels_priority = {"critical": 0, "warning": 1, "watch": 2} best = min(zone_triggers, key=lambda t: levels_priority.get(t.trigger_level, 9)) risk_level = best.trigger_level else: risk_level = "normal" healed = pipeline._healed.get(zid) data_quality = healed.quality_score if healed else 0.85 enrolled = rng.randint( 200 if z.settlement_type == "informal" else 100, 800 if z.settlement_type == "informal" else 500, ) zones.append({ "zone_id": zid, "name": z.name, "city": z.city, "country": z.country, "latitude": z.latitude, "longitude": z.longitude, "elevation_m": z.elevation_m, "settlement_type": z.settlement_type, "worker_population_est": z.worker_population_est, "outdoor_exposure_pct": z.outdoor_exposure_pct, "heat_vulnerability": z.heat_vulnerability, "risk_level": risk_level, "current_temp_c": round(current_temp, 1), "max_temp_c": round(max_temp, 1), "grid_temp_c": round(current_temp - mean_uhi, 1), "uhi_delta_c": round(mean_uhi, 1), "corrected_temp_c": round(current_temp, 1), "trigger_probability_7d": round(trigger_prob, 3), "prediction_confidence": round(pred_conf, 3), "model_tier": model_tier, "enrolled_workers": enrolled, "data_quality": round(data_quality, 2), "last_updated": now.isoformat(), }) return zones def _build_triggers( self, pipeline, zones: list[dict], now: datetime, ) -> list[dict]: """Convert pipeline triggers into API-compatible dicts.""" triggers: list[dict] = [] enrolled_map = {z["zone_id"]: z["enrolled_workers"] for z in zones} for te in pipeline._triggers: payout = PAYOUT_PER_EVENT_USD.get(te.trigger_level, 5) enrolled = enrolled_map.get(te.zone_id, 0) triggers.append({ "trigger_id": te.trigger_id, "zone_id": te.zone_id, "zone_name": te.zone_name, "city": te.city, "trigger_level": te.trigger_level, "trigger_date": te.trigger_date, "heat_risk_score": round(getattr(te, "heat_risk_score", 0), 1), "max_temp_c": round(getattr(te, "max_temp_c", 0), 1), "max_wbgt_c": round(getattr(te, "max_wbgt_c", 0), 1), "consecutive_days": getattr(te, "consecutive_days", 0), "total_days_above": getattr(te, "total_days_above", 0), "settlement_type": te.settlement_type, "payout_per_worker_usd": payout, "enrolled_workers": enrolled, "total_payout_usd": payout * enrolled, "status": te.status, }) return triggers def _build_basis_risk(self, pipeline) -> list[dict]: """Convert pipeline basis risk into API-compatible list.""" results: list[dict] = [] for zone_id, report in pipeline._basis_risk.items(): zone = ZONE_MAP.get(zone_id) if zone is None: continue if hasattr(report, "overall_score"): rec_text = "; ".join(report.recommendations) if report.recommendations else "Current calibration adequate" results.append({ "zone_id": zone_id, "zone_name": report.zone_name, "city": report.city, "overall_score": round(report.overall_score, 3), "false_positive_rate": round(report.false_positive_rate, 3), "false_negative_rate": round(report.false_negative_rate, 3), "correlation": round(report.correlation, 3), "settlement_type": report.settlement_type, "heat_vulnerability": zone.heat_vulnerability, "recommendation": rec_text, }) else: results.append({ "zone_id": zone_id, "zone_name": zone.name, "city": zone.city, "overall_score": round(report.get("overall_score", 0), 3), "false_positive_rate": round(report.get("false_positive_rate", 0), 3), "false_negative_rate": round(report.get("false_negative_rate", 0), 3), "correlation": round(report.get("correlation", 0), 3), "settlement_type": zone.settlement_type, "heat_vulnerability": zone.heat_vulnerability, "recommendation": report.get("recommendation", "Current calibration adequate"), }) return results def _build_notifications( self, pipeline, triggers: list[dict], now: datetime, ) -> list[dict]: """Convert pipeline explanations + notifications into API dicts.""" notifications: list[dict] = [] trigger_map = {t["zone_id"]: t for t in triggers} for i, explanation in enumerate(pipeline._explanations, start=1): zone_id = explanation.zone_id trigger = trigger_map.get(zone_id, {}) zone = ZONE_MAP.get(zone_id) zone_name = zone.name if zone else zone_id city = zone.city if zone else "" enrolled = trigger.get("enrolled_workers", 0) delivery = pipeline._notifications[i - 1] if i <= len(pipeline._notifications) else None notifications.append({ "id": f"NOT-{2*i - 1:04d}", "zone_id": zone_id, "zone_name": zone_name, "city": city, "trigger_level": explanation.trigger_level, "channel": delivery.channel if delivery else "console", "language": "en", "recipient_count": enrolled, "message_preview": ( f"HEAT ALERT [{explanation.trigger_level.upper()}]: " f"{zone_name}, {city}. " f"{explanation.english_text[:120]}" ), "status": delivery.status if delivery else "dry_run", "delivered_at": delivery.timestamp if delivery else now.isoformat(), "cost_estimate": round(delivery.cost_estimate, 2) if delivery else 0.0, }) notifications.append({ "id": f"NOT-{2*i:04d}", "zone_id": zone_id, "zone_name": zone_name, "city": city, "trigger_level": explanation.trigger_level, "channel": "sms", "language": "sw", "recipient_count": enrolled, "message_preview": ( f"TAHADHARI YA JOTO [{explanation.trigger_level.upper()}]: " f"{zone_name}, {city}. " f"{explanation.swahili_text[:120]}" ), "status": delivery.status if delivery else "dry_run", "delivered_at": delivery.timestamp if delivery else now.isoformat(), "cost_estimate": round(enrolled * 0.0075, 2), }) return notifications def _build_pipeline_run(self, run_result) -> dict: return { "run_id": run_result.run_id, "started_at": run_result.started_at, "ended_at": run_result.ended_at, "status": run_result.status, "duration_s": round(run_result.duration_s, 1), "zones_processed": run_result.zones_processed, "triggers_found": run_result.triggers_found, "notifications_sent": run_result.notifications_sent, "total_cost_usd": round(run_result.total_cost_usd, 4), "steps": [ { "step": s.step, "status": s.status, "duration_s": round(s.duration_s, 1), } for s in run_result.steps ], } def _build_stats( self, zones: list[dict], triggers: list[dict], pipeline_runs: list[dict], now: datetime, ) -> dict: total_runs = len(pipeline_runs) successful = sum(1 for r in pipeline_runs if r["status"] == "ok") total_cost = sum(r.get("total_cost_usd", 0) for r in pipeline_runs) return { "total_runs": total_runs, "successful_runs": successful, "success_rate": round(successful / max(1, total_runs), 2), "zones_monitored": len(ZONES), "cities": len(CITIES), "active_triggers": len([t for t in triggers if t.get("status") == "active"]), "total_enrolled": sum(z["enrolled_workers"] for z in zones), "total_cost_usd": round(total_cost, 2), "avg_cost_per_run_usd": round(total_cost / max(1, total_runs), 4), "last_run": pipeline_runs[0]["started_at"] if pipeline_runs else None, "data_sources": ["ERA5-Land", "NASA POWER"], } # Module-level singleton store = PipelineStore()