| """ |
| Shared singleton store that bridges pipeline output to API responses. |
| |
| After a pipeline run completes, call ``store.update_from_pipeline(pipeline)`` |
| to populate the store with real data. The API checks ``store.has_real_data`` |
| and serves from the store when True, falling back to synthetic demo data |
| when False. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import random |
| import threading |
| from datetime import datetime |
| from typing import Any |
|
|
| from config import ZONES, ZONE_MAP, CITIES, PAYOUT_PER_EVENT_USD |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class PipelineStore: |
| """Singleton that holds the latest pipeline run results.""" |
|
|
| def __init__(self): |
| self.has_real_data = False |
| self.zones: list[dict] = [] |
| self.indices: list[dict] = [] |
| self.triggers: list[dict] = [] |
| self.basis_risk: list[dict] = [] |
| self.notifications: list[dict] = [] |
| self.pipeline_runs: list[dict] = [] |
| self.stats: dict[str, Any] = {} |
| self._lock = threading.Lock() |
|
|
| def update_from_pipeline(self, pipeline, run_result=None): |
| """Convert pipeline state into the same dict shapes that |
| ``_generate_demo_data()`` in api.py produces, so the dashboard |
| works identically with real or synthetic data. |
| |
| Args: |
| pipeline: A ``HeatRiskPipeline`` instance after ``.run()`` |
| has completed. |
| run_result: Optional ``PipelineRunResult`` returned by |
| ``pipeline.run()``. |
| """ |
| with self._lock: |
| try: |
| now = datetime.utcnow() |
| zones = self._build_zones(pipeline, now) |
| triggers = self._build_triggers(pipeline, zones, now) |
| basis_risk = self._build_basis_risk(pipeline) |
| notifications = self._build_notifications(pipeline, triggers, now) |
| pipeline_run = self._build_pipeline_run(run_result) if run_result else None |
|
|
| self.zones = zones |
| self.triggers = triggers |
| self.basis_risk = basis_risk |
| self.notifications = notifications |
|
|
| if pipeline_run: |
| self.pipeline_runs.insert(0, pipeline_run) |
| self.pipeline_runs = self.pipeline_runs[:50] |
|
|
| self.stats = self._build_stats( |
| zones, triggers, self.pipeline_runs, now, |
| ) |
| self.has_real_data = True |
| logger.info( |
| "Store updated: %d zones, %d triggers, %d notifications", |
| len(zones), len(triggers), len(notifications), |
| ) |
| except Exception: |
| logger.exception("Failed to update store from pipeline") |
|
|
| |
| |
| |
|
|
| def _build_zones(self, pipeline, now: datetime) -> list[dict]: |
| """Build zone dicts from pipeline heat data.""" |
| zones: list[dict] = [] |
| rng = random.Random(42) |
|
|
| for z in ZONES: |
| zid = z.zone_id |
|
|
| heat = pipeline._heat_data.get(zid, {}) |
| corrected_temps = heat.get("corrected_temps", []) |
| uhi_deltas = heat.get("uhi_deltas", []) |
|
|
| current_temp = corrected_temps[-1] if corrected_temps else 30.0 |
| max_temp = max(corrected_temps) if corrected_temps else 33.0 |
| mean_uhi = sum(uhi_deltas) / len(uhi_deltas) if uhi_deltas else 2.0 |
|
|
| trigger_prob = heat.get("trigger_probability", 0.1) |
| pred_conf = heat.get("prediction_confidence", 0.3) |
| model_tier = heat.get("model_tier", "climatology") |
|
|
| |
| zone_triggers = [t for t in pipeline._triggers if t.zone_id == zid] |
| if zone_triggers: |
| levels_priority = {"critical": 0, "warning": 1, "watch": 2} |
| best = min(zone_triggers, key=lambda t: levels_priority.get(t.trigger_level, 9)) |
| risk_level = best.trigger_level |
| else: |
| risk_level = "normal" |
|
|
| healed = pipeline._healed.get(zid) |
| data_quality = healed.quality_score if healed else 0.85 |
|
|
| enrolled = rng.randint( |
| 200 if z.settlement_type == "informal" else 100, |
| 800 if z.settlement_type == "informal" else 500, |
| ) |
|
|
| zones.append({ |
| "zone_id": zid, |
| "name": z.name, |
| "city": z.city, |
| "country": z.country, |
| "latitude": z.latitude, |
| "longitude": z.longitude, |
| "elevation_m": z.elevation_m, |
| "settlement_type": z.settlement_type, |
| "worker_population_est": z.worker_population_est, |
| "outdoor_exposure_pct": z.outdoor_exposure_pct, |
| "heat_vulnerability": z.heat_vulnerability, |
| "risk_level": risk_level, |
| "current_temp_c": round(current_temp, 1), |
| "max_temp_c": round(max_temp, 1), |
| "grid_temp_c": round(current_temp - mean_uhi, 1), |
| "uhi_delta_c": round(mean_uhi, 1), |
| "corrected_temp_c": round(current_temp, 1), |
| "trigger_probability_7d": round(trigger_prob, 3), |
| "prediction_confidence": round(pred_conf, 3), |
| "model_tier": model_tier, |
| "enrolled_workers": enrolled, |
| "data_quality": round(data_quality, 2), |
| "last_updated": now.isoformat(), |
| }) |
|
|
| return zones |
|
|
| def _build_triggers( |
| self, pipeline, zones: list[dict], now: datetime, |
| ) -> list[dict]: |
| """Convert pipeline triggers into API-compatible dicts.""" |
| triggers: list[dict] = [] |
| enrolled_map = {z["zone_id"]: z["enrolled_workers"] for z in zones} |
|
|
| for te in pipeline._triggers: |
| payout = PAYOUT_PER_EVENT_USD.get(te.trigger_level, 5) |
| enrolled = enrolled_map.get(te.zone_id, 0) |
|
|
| triggers.append({ |
| "trigger_id": te.trigger_id, |
| "zone_id": te.zone_id, |
| "zone_name": te.zone_name, |
| "city": te.city, |
| "trigger_level": te.trigger_level, |
| "trigger_date": te.trigger_date, |
| "heat_risk_score": round(getattr(te, "heat_risk_score", 0), 1), |
| "max_temp_c": round(getattr(te, "max_temp_c", 0), 1), |
| "max_wbgt_c": round(getattr(te, "max_wbgt_c", 0), 1), |
| "consecutive_days": getattr(te, "consecutive_days", 0), |
| "total_days_above": getattr(te, "total_days_above", 0), |
| "settlement_type": te.settlement_type, |
| "payout_per_worker_usd": payout, |
| "enrolled_workers": enrolled, |
| "total_payout_usd": payout * enrolled, |
| "status": te.status, |
| }) |
|
|
| return triggers |
|
|
| def _build_basis_risk(self, pipeline) -> list[dict]: |
| """Convert pipeline basis risk into API-compatible list.""" |
| results: list[dict] = [] |
|
|
| for zone_id, report in pipeline._basis_risk.items(): |
| zone = ZONE_MAP.get(zone_id) |
| if zone is None: |
| continue |
|
|
| if hasattr(report, "overall_score"): |
| rec_text = "; ".join(report.recommendations) if report.recommendations else "Current calibration adequate" |
| results.append({ |
| "zone_id": zone_id, |
| "zone_name": report.zone_name, |
| "city": report.city, |
| "overall_score": round(report.overall_score, 3), |
| "false_positive_rate": round(report.false_positive_rate, 3), |
| "false_negative_rate": round(report.false_negative_rate, 3), |
| "correlation": round(report.correlation, 3), |
| "settlement_type": report.settlement_type, |
| "heat_vulnerability": zone.heat_vulnerability, |
| "recommendation": rec_text, |
| }) |
| else: |
| results.append({ |
| "zone_id": zone_id, |
| "zone_name": zone.name, |
| "city": zone.city, |
| "overall_score": round(report.get("overall_score", 0), 3), |
| "false_positive_rate": round(report.get("false_positive_rate", 0), 3), |
| "false_negative_rate": round(report.get("false_negative_rate", 0), 3), |
| "correlation": round(report.get("correlation", 0), 3), |
| "settlement_type": zone.settlement_type, |
| "heat_vulnerability": zone.heat_vulnerability, |
| "recommendation": report.get("recommendation", "Current calibration adequate"), |
| }) |
|
|
| return results |
|
|
| def _build_notifications( |
| self, pipeline, triggers: list[dict], now: datetime, |
| ) -> list[dict]: |
| """Convert pipeline explanations + notifications into API dicts.""" |
| notifications: list[dict] = [] |
| trigger_map = {t["zone_id"]: t for t in triggers} |
|
|
| for i, explanation in enumerate(pipeline._explanations, start=1): |
| zone_id = explanation.zone_id |
| trigger = trigger_map.get(zone_id, {}) |
| zone = ZONE_MAP.get(zone_id) |
| zone_name = zone.name if zone else zone_id |
| city = zone.city if zone else "" |
| enrolled = trigger.get("enrolled_workers", 0) |
|
|
| delivery = pipeline._notifications[i - 1] if i <= len(pipeline._notifications) else None |
|
|
| notifications.append({ |
| "id": f"NOT-{2*i - 1:04d}", |
| "zone_id": zone_id, |
| "zone_name": zone_name, |
| "city": city, |
| "trigger_level": explanation.trigger_level, |
| "channel": delivery.channel if delivery else "console", |
| "language": "en", |
| "recipient_count": enrolled, |
| "message_preview": ( |
| f"HEAT ALERT [{explanation.trigger_level.upper()}]: " |
| f"{zone_name}, {city}. " |
| f"{explanation.english_text[:120]}" |
| ), |
| "status": delivery.status if delivery else "dry_run", |
| "delivered_at": delivery.timestamp if delivery else now.isoformat(), |
| "cost_estimate": round(delivery.cost_estimate, 2) if delivery else 0.0, |
| }) |
|
|
| notifications.append({ |
| "id": f"NOT-{2*i:04d}", |
| "zone_id": zone_id, |
| "zone_name": zone_name, |
| "city": city, |
| "trigger_level": explanation.trigger_level, |
| "channel": "sms", |
| "language": "sw", |
| "recipient_count": enrolled, |
| "message_preview": ( |
| f"TAHADHARI YA JOTO [{explanation.trigger_level.upper()}]: " |
| f"{zone_name}, {city}. " |
| f"{explanation.swahili_text[:120]}" |
| ), |
| "status": delivery.status if delivery else "dry_run", |
| "delivered_at": delivery.timestamp if delivery else now.isoformat(), |
| "cost_estimate": round(enrolled * 0.0075, 2), |
| }) |
|
|
| return notifications |
|
|
| def _build_pipeline_run(self, run_result) -> dict: |
| return { |
| "run_id": run_result.run_id, |
| "started_at": run_result.started_at, |
| "ended_at": run_result.ended_at, |
| "status": run_result.status, |
| "duration_s": round(run_result.duration_s, 1), |
| "zones_processed": run_result.zones_processed, |
| "triggers_found": run_result.triggers_found, |
| "notifications_sent": run_result.notifications_sent, |
| "total_cost_usd": round(run_result.total_cost_usd, 4), |
| "steps": [ |
| { |
| "step": s.step, |
| "status": s.status, |
| "duration_s": round(s.duration_s, 1), |
| } |
| for s in run_result.steps |
| ], |
| } |
|
|
| def _build_stats( |
| self, |
| zones: list[dict], |
| triggers: list[dict], |
| pipeline_runs: list[dict], |
| now: datetime, |
| ) -> dict: |
| total_runs = len(pipeline_runs) |
| successful = sum(1 for r in pipeline_runs if r["status"] == "ok") |
| total_cost = sum(r.get("total_cost_usd", 0) for r in pipeline_runs) |
|
|
| return { |
| "total_runs": total_runs, |
| "successful_runs": successful, |
| "success_rate": round(successful / max(1, total_runs), 2), |
| "zones_monitored": len(ZONES), |
| "cities": len(CITIES), |
| "active_triggers": len([t for t in triggers if t.get("status") == "active"]), |
| "total_enrolled": sum(z["enrolled_workers"] for z in zones), |
| "total_cost_usd": round(total_cost, 2), |
| "avg_cost_per_run_usd": round(total_cost / max(1, total_runs), 4), |
| "last_run": pipeline_runs[0]["started_at"] if pipeline_runs else None, |
| "data_sources": ["ERA5-Land", "NASA POWER"], |
| } |
|
|
|
|
| |
| store = PipelineStore() |
|
|