| """ |
| Database CRUD operations for the Climate Risk Index Engine. |
| |
| Uses psycopg2 (sync) for PostgreSQL when DATABASE_URL is set, with an in-memory |
| fallback for demo/testing. Matches Weather AI 2's PgConnection pattern exactly. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| import uuid |
| from collections import defaultdict |
| from datetime import datetime, timezone |
| from typing import Any, Dict, List, Optional |
|
|
| from src.database.schema import get_full_ddl, get_table_names |
|
|
| log = logging.getLogger(__name__) |
|
|
|
|
| def _to_date(val): |
| """Convert a string or date to a datetime.date for psycopg2 DATE columns.""" |
| if val is None: |
| return None |
| if isinstance(val, datetime): |
| return val.date() |
| if hasattr(val, "date") and callable(getattr(val, "date", None)): |
| return val.date() |
| if hasattr(val, "year") and hasattr(val, "month") and hasattr(val, "day"): |
| return val |
| if isinstance(val, str): |
| return datetime.fromisoformat(val.replace("Z", "+00:00")).date() if "T" in val else datetime.strptime(val, "%Y-%m-%d").date() |
| return val |
|
|
|
|
| def _to_datetime(val) -> Optional[datetime]: |
| """Convert a string to a datetime for psycopg2 TIMESTAMPTZ columns.""" |
| if val is None: |
| return None |
| if isinstance(val, datetime): |
| return val |
| if isinstance(val, str): |
| try: |
| return datetime.fromisoformat(val.replace("Z", "+00:00")) |
| except ValueError: |
| return datetime.strptime(val, "%Y-%m-%d") |
| return val |
|
|
|
|
| |
|
|
| _pool = None |
|
|
|
|
| def _get_pool(dsn: str): |
| """Get or create a SimpleConnectionPool for the given DSN.""" |
| global _pool |
| from psycopg2.pool import SimpleConnectionPool |
| if _pool is None or _pool.closed: |
| _pool = SimpleConnectionPool(minconn=2, maxconn=10, dsn=dsn) |
| return _pool |
|
|
|
|
| class PgConnection: |
| """Thin psycopg2 wrapper matching Weather AI 2's PgConnection exactly. |
| |
| Uses SimpleConnectionPool. All methods are sync. |
| conn.execute(sql, params) returns a cursor. |
| """ |
|
|
| def __init__(self, dsn: str): |
| pool = _get_pool(dsn) |
| self._conn = pool.getconn() |
| self._conn.autocommit = True |
| self._pool = pool |
| self._last_cur = None |
| self._refresh_conn() |
|
|
| def _refresh_conn(self): |
| """Handle Neon cold starts — test with SELECT 1, reconnect if stale. |
| |
| Retries up to 3 times. SimpleConnectionPool initializes minconn=2 |
| fresh connections at startup, which typically go stale together |
| after Neon's ~5-min idle timeout; a single retry can hand back a |
| second dead conn. |
| """ |
| try: |
| cur = self._conn.cursor() |
| cur.execute("SELECT 1") |
| cur.close() |
| return |
| except Exception: |
| pass |
| log.info("Connection stale, reconnecting...") |
| last_exc = None |
| for _ in range(3): |
| try: |
| self._pool.putconn(self._conn, close=True) |
| except Exception: |
| pass |
| try: |
| self._conn = self._pool.getconn() |
| self._conn.autocommit = True |
| cur = self._conn.cursor() |
| cur.execute("SELECT 1") |
| cur.close() |
| return |
| except Exception as exc: |
| last_exc = exc |
| raise RuntimeError( |
| f"Could not acquire a healthy DB connection after 3 attempts: {last_exc}" |
| ) |
|
|
| def execute(self, sql: str, params=None): |
| """Execute SQL with %s placeholders. Returns cursor. |
| |
| To avoid leaking server-side cursor resources over long-lived |
| connections (e.g. during a pipeline run with hundreds of inserts), |
| we close the previously-returned cursor before creating a new one. |
| Callers that need to keep two cursors open simultaneously must use |
| ``self._conn.cursor()`` directly. |
| """ |
| if self._last_cur is not None: |
| try: |
| self._last_cur.close() |
| except Exception: |
| pass |
| self._last_cur = None |
| cur = self._conn.cursor() |
| cur.execute(sql, params) |
| self._last_cur = cur |
| return cur |
|
|
| def close(self): |
| """Return connection to pool.""" |
| if self._last_cur is not None: |
| try: |
| self._last_cur.close() |
| except Exception: |
| pass |
| self._last_cur = None |
| if self._pool and self._conn: |
| self._pool.putconn(self._conn) |
| self._conn = None |
|
|
| def __enter__(self): |
| return self |
|
|
| def __exit__(self, *args): |
| self.close() |
|
|
|
|
| def init_db(database_url: str | None = None): |
| """Initialize database connection. Returns PgConnection or None. |
| |
| If no DATABASE_URL, returns None (caller should use InMemoryStore). |
| Creates schema tables on first connect. |
| """ |
| url = database_url or os.environ.get("DATABASE_URL", "") |
| if not url: |
| log.info("No DATABASE_URL set, DB disabled") |
| return None |
| try: |
| conn = PgConnection(url) |
| |
| conn.execute(get_full_ddl()) |
| log.info("Database connected, schema initialized (%d tables)", len(get_table_names())) |
| return conn |
| except Exception as exc: |
| log.warning("Database connection failed: %s", exc) |
| return None |
|
|
|
|
| |
|
|
| class InMemoryStore: |
| """ |
| Simple in-memory storage for demo mode. |
| Stores rows as dicts keyed by table name. |
| """ |
|
|
| def __init__(self): |
| self.tables: Dict[str, list[dict]] = defaultdict(list) |
| self._id_counters: Dict[str, int] = defaultdict(int) |
|
|
| def insert(self, table: str, row: dict) -> int: |
| """Insert a row, returning a synthetic ID.""" |
| self._id_counters[table] += 1 |
| row_copy = dict(row) |
| row_copy["id"] = self._id_counters[table] |
| if "created_at" not in row_copy: |
| row_copy["created_at"] = datetime.now(timezone.utc).isoformat() |
| self.tables[table].append(row_copy) |
| return row_copy["id"] |
|
|
| def query( |
| self, table: str, filters: Optional[Dict[str, Any]] = None, limit: int = 100 |
| ) -> list[dict]: |
| """Query rows with optional simple equality filters.""" |
| rows = self.tables.get(table, []) |
| if filters: |
| rows = [ |
| r for r in rows |
| if all(r.get(k) == v for k, v in filters.items()) |
| ] |
| return rows[:limit] |
|
|
| def count(self, table: str) -> int: |
| return len(self.tables.get(table, [])) |
|
|
| |
|
|
| def insert_zone(self, zone_id: str, data: dict) -> int: |
| """Insert or overwrite a zone.""" |
| row = dict(data) |
| row["zone_id"] = zone_id |
| existing = [r for r in self.tables["zones"] if r["zone_id"] == zone_id] |
| if existing: |
| existing[0].update(row) |
| return existing[0].get("id", 0) |
| return self.insert("zones", row) |
|
|
| def get_zone(self, zone_id: str) -> Optional[dict]: |
| rows = self.query("zones", {"zone_id": zone_id}, limit=1) |
| return rows[0] if rows else None |
|
|
| def insert_heat_index(self, zone_id: str, date: str, data: dict) -> int: |
| row = dict(data) |
| row["zone_id"] = zone_id |
| row["date"] = date |
| return self.insert("heat_indices", row) |
|
|
| def insert_prediction(self, zone_id: str, date: str, data: dict) -> int: |
| row = dict(data) |
| row["zone_id"] = zone_id |
| row["date"] = date |
| return self.insert("predictions", row) |
|
|
| def get_recent_heat_indices(self, zone_id: str, limit: int = 90) -> list[dict]: |
| rows = self.query("heat_indices", {"zone_id": zone_id}, limit=limit) |
| return sorted(rows, key=lambda r: r.get("date", ""), reverse=True) |
|
|
| def get_recent_predictions(self, zone_id: str, limit: int = 30) -> list[dict]: |
| rows = self.query("predictions", {"zone_id": zone_id}, limit=limit) |
| return sorted(rows, key=lambda r: r.get("date", ""), reverse=True) |
|
|
|
|
| |
|
|
| |
|
|
| def upsert_zone(conn, zone_data: dict) -> None: |
| """Insert or update a zone.""" |
| conn.execute( |
| """ |
| INSERT INTO zones (zone_id, name, city, country, latitude, longitude, |
| elevation_m, area_km2, population_est, settlement_type, |
| worker_population_est, outdoor_exposure_pct, |
| heat_vulnerability, hot_months, notes) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
| ON CONFLICT (zone_id) DO UPDATE SET |
| name = EXCLUDED.name, |
| population_est = EXCLUDED.population_est, |
| worker_population_est = EXCLUDED.worker_population_est, |
| outdoor_exposure_pct = EXCLUDED.outdoor_exposure_pct, |
| heat_vulnerability = EXCLUDED.heat_vulnerability, |
| hot_months = EXCLUDED.hot_months, |
| notes = EXCLUDED.notes |
| """, |
| (zone_data["zone_id"], zone_data["name"], zone_data["city"], |
| zone_data["country"], zone_data["latitude"], zone_data["longitude"], |
| zone_data.get("elevation_m"), zone_data.get("area_km2"), |
| zone_data.get("population_est"), zone_data["settlement_type"], |
| zone_data.get("worker_population_est"), |
| zone_data.get("outdoor_exposure_pct"), |
| zone_data["heat_vulnerability"], |
| zone_data.get("hot_months", []), |
| zone_data.get("notes", "")), |
| ) |
|
|
|
|
| def get_zone(conn, zone_id: str) -> Optional[dict]: |
| """Fetch a single zone.""" |
| cur = conn.execute("SELECT * FROM zones WHERE zone_id = %s", (zone_id,)) |
| cols = [d[0] for d in cur.description] if cur.description else [] |
| row = cur.fetchone() |
| return dict(zip(cols, row)) if row else None |
|
|
|
|
| def get_all_zones(conn) -> list[dict]: |
| """Fetch all zones.""" |
| cur = conn.execute("SELECT * FROM zones ORDER BY city, name") |
| cols = [d[0] for d in cur.description] if cur.description else [] |
| rows = cur.fetchall() |
| return [dict(zip(cols, r)) for r in rows] |
|
|
|
|
| |
|
|
| def insert_daily_reading(conn, reading: dict) -> Optional[int]: |
| """Insert a daily reading. Returns the row ID.""" |
| cur = conn.execute( |
| """ |
| INSERT INTO daily_readings (zone_id, date, temp_mean_c, temp_max_c, |
| temp_min_c, humidity_pct, wind_speed_ms, solar_rad_wm2, |
| precip_mm, source, data_quality) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
| ON CONFLICT (zone_id, date) DO UPDATE SET |
| temp_mean_c = COALESCE(EXCLUDED.temp_mean_c, daily_readings.temp_mean_c), |
| temp_max_c = COALESCE(EXCLUDED.temp_max_c, daily_readings.temp_max_c), |
| temp_min_c = COALESCE(EXCLUDED.temp_min_c, daily_readings.temp_min_c), |
| humidity_pct = COALESCE(EXCLUDED.humidity_pct, daily_readings.humidity_pct), |
| wind_speed_ms = COALESCE(EXCLUDED.wind_speed_ms, daily_readings.wind_speed_ms), |
| solar_rad_wm2 = COALESCE(EXCLUDED.solar_rad_wm2, daily_readings.solar_rad_wm2), |
| precip_mm = COALESCE(EXCLUDED.precip_mm, daily_readings.precip_mm), |
| source = EXCLUDED.source, |
| data_quality = GREATEST(EXCLUDED.data_quality, daily_readings.data_quality) |
| RETURNING id |
| """, |
| (reading["zone_id"], _to_date(reading["date"]), |
| reading.get("temp_mean_c"), reading.get("temp_max_c"), |
| reading.get("temp_min_c"), reading.get("humidity_pct"), |
| reading.get("wind_speed_ms"), reading.get("solar_rad_wm2"), |
| reading.get("precip_mm"), |
| reading.get("source", "unknown"), |
| reading.get("data_quality", 0.0)), |
| ) |
| row = cur.fetchone() |
| return row[0] if row else None |
|
|
|
|
| def insert_daily_readings_batch(conn, readings: list[dict]) -> int: |
| """Bulk-insert daily readings with a single round-trip per chunk. |
| |
| Uses psycopg2's executemany (single statement, many rows). Much faster |
| than insert_daily_reading() in a loop during pipeline runs. Does not |
| return row IDs — use insert_daily_reading() if you need the ID. |
| """ |
| if not readings: |
| return 0 |
| rows = [ |
| (r["zone_id"], _to_date(r["date"]), |
| r.get("temp_mean_c"), r.get("temp_max_c"), |
| r.get("temp_min_c"), r.get("humidity_pct"), |
| r.get("wind_speed_ms"), r.get("solar_rad_wm2"), |
| r.get("precip_mm"), |
| r.get("source", "unknown"), |
| r.get("data_quality", 0.0)) |
| for r in readings |
| ] |
| cur = conn._conn.cursor() |
| try: |
| cur.executemany( |
| """ |
| INSERT INTO daily_readings (zone_id, date, temp_mean_c, temp_max_c, |
| temp_min_c, humidity_pct, wind_speed_ms, solar_rad_wm2, |
| precip_mm, source, data_quality) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
| ON CONFLICT (zone_id, date) DO UPDATE SET |
| temp_mean_c = COALESCE(EXCLUDED.temp_mean_c, daily_readings.temp_mean_c), |
| temp_max_c = COALESCE(EXCLUDED.temp_max_c, daily_readings.temp_max_c), |
| temp_min_c = COALESCE(EXCLUDED.temp_min_c, daily_readings.temp_min_c), |
| humidity_pct = COALESCE(EXCLUDED.humidity_pct, daily_readings.humidity_pct), |
| wind_speed_ms = COALESCE(EXCLUDED.wind_speed_ms, daily_readings.wind_speed_ms), |
| solar_rad_wm2 = COALESCE(EXCLUDED.solar_rad_wm2, daily_readings.solar_rad_wm2), |
| precip_mm = COALESCE(EXCLUDED.precip_mm, daily_readings.precip_mm), |
| source = EXCLUDED.source, |
| data_quality = GREATEST(EXCLUDED.data_quality, daily_readings.data_quality) |
| """, |
| rows, |
| ) |
| return len(rows) |
| finally: |
| cur.close() |
|
|
|
|
| def get_daily_readings(conn, zone_id: str, limit: int = 90) -> list[dict]: |
| """Fetch recent daily readings for a zone.""" |
| cur = conn.execute( |
| "SELECT * FROM daily_readings WHERE zone_id = %s ORDER BY date DESC LIMIT %s", |
| (zone_id, limit), |
| ) |
| cols = [d[0] for d in cur.description] if cur.description else [] |
| rows = cur.fetchall() |
| return [dict(zip(cols, r)) for r in rows] |
|
|
|
|
| |
|
|
| def insert_healed_reading(conn, reading: dict) -> Optional[int]: |
| """Insert a healed reading.""" |
| cur = conn.execute( |
| """ |
| INSERT INTO healed_readings (zone_id, date, raw_reading_id, |
| temp_mean_c, temp_max_c, temp_min_c, |
| humidity_pct, wind_speed_ms, quality_score, heal_action, fields_corrected) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
| ON CONFLICT (zone_id, date) DO UPDATE SET |
| temp_mean_c = EXCLUDED.temp_mean_c, |
| temp_max_c = EXCLUDED.temp_max_c, |
| temp_min_c = EXCLUDED.temp_min_c, |
| humidity_pct = EXCLUDED.humidity_pct, |
| wind_speed_ms = EXCLUDED.wind_speed_ms, |
| quality_score = EXCLUDED.quality_score, |
| heal_action = EXCLUDED.heal_action, |
| fields_corrected = EXCLUDED.fields_corrected, |
| healed_at = NOW() |
| RETURNING id |
| """, |
| (reading["zone_id"], _to_date(reading["date"]), reading.get("raw_reading_id"), |
| reading.get("temp_mean_c"), |
| reading.get("temp_max_c"), reading.get("temp_min_c"), |
| reading.get("humidity_pct"), reading.get("wind_speed_ms"), |
| reading.get("quality_score", 0.0), |
| reading.get("heal_action", "passthrough"), |
| reading.get("fields_corrected", [])), |
| ) |
| row = cur.fetchone() |
| return row[0] if row else None |
|
|
|
|
| |
|
|
| def insert_healing_log(conn, entry: dict) -> Optional[int]: |
| """Insert a healing log entry.""" |
| cur = conn.execute( |
| """ |
| INSERT INTO healing_log (zone_id, date, healed_reading_id, |
| agent_type, reasoning, corrections, tools_used, |
| confidence, tokens_used, latency_ms) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
| RETURNING id |
| """, |
| (entry["zone_id"], _to_date(entry["date"]), entry.get("healed_reading_id"), |
| entry.get("agent_type", "rule_based"), entry.get("reasoning"), |
| json.dumps(entry.get("corrections", {})), |
| entry.get("tools_used", []), |
| entry.get("confidence"), entry.get("tokens_used", 0), |
| entry.get("latency_ms", 0)), |
| ) |
| row = cur.fetchone() |
| return row[0] if row else None |
|
|
|
|
| |
|
|
| def insert_heat_index(conn, record: dict) -> Optional[int]: |
| """Insert a daily heat index record.""" |
| cur = conn.execute( |
| """ |
| INSERT INTO heat_indices (zone_id, date, grid_temp_c, uhi_delta_c, |
| corrected_temp_c, wbgt_c, heat_index_c, heat_risk_score, |
| risk_level, consecutive_hot_days) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
| ON CONFLICT (zone_id, date) DO UPDATE SET |
| grid_temp_c = EXCLUDED.grid_temp_c, |
| uhi_delta_c = EXCLUDED.uhi_delta_c, |
| corrected_temp_c = EXCLUDED.corrected_temp_c, |
| wbgt_c = EXCLUDED.wbgt_c, |
| heat_index_c = EXCLUDED.heat_index_c, |
| heat_risk_score = EXCLUDED.heat_risk_score, |
| risk_level = EXCLUDED.risk_level, |
| consecutive_hot_days = EXCLUDED.consecutive_hot_days, |
| computed_at = NOW() |
| RETURNING id |
| """, |
| (record["zone_id"], _to_date(record["date"]), |
| record.get("grid_temp_c"), record.get("uhi_delta_c"), |
| record.get("corrected_temp_c"), record.get("wbgt_c"), |
| record.get("heat_index_c"), record.get("heat_risk_score"), |
| record.get("risk_level"), record.get("consecutive_hot_days", 0)), |
| ) |
| row = cur.fetchone() |
| return row[0] if row else None |
|
|
|
|
| def get_recent_heat_indices(conn, zone_id: str, limit: int = 90) -> list[dict]: |
| """Fetch recent heat index records for a zone.""" |
| cur = conn.execute( |
| "SELECT * FROM heat_indices WHERE zone_id = %s ORDER BY date DESC LIMIT %s", |
| (zone_id, limit), |
| ) |
| cols = [d[0] for d in cur.description] if cur.description else [] |
| rows = cur.fetchall() |
| return [dict(zip(cols, r)) for r in rows] |
|
|
|
|
| |
|
|
| def insert_prediction(conn, record: dict) -> Optional[int]: |
| """Insert a daily prediction record.""" |
| cur = conn.execute( |
| """ |
| INSERT INTO predictions (zone_id, date, trigger_probability_7d, |
| prediction_confidence, model_tier, xgb_probability, |
| lstm_probability, ensemble_method, |
| annual_cost_per_worker, payout_factor, learned_frequency) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
| ON CONFLICT (zone_id, date) DO UPDATE SET |
| trigger_probability_7d = EXCLUDED.trigger_probability_7d, |
| prediction_confidence = EXCLUDED.prediction_confidence, |
| model_tier = EXCLUDED.model_tier, |
| xgb_probability = EXCLUDED.xgb_probability, |
| lstm_probability = EXCLUDED.lstm_probability, |
| ensemble_method = EXCLUDED.ensemble_method, |
| annual_cost_per_worker = EXCLUDED.annual_cost_per_worker, |
| payout_factor = EXCLUDED.payout_factor, |
| learned_frequency = EXCLUDED.learned_frequency, |
| predicted_at = NOW() |
| RETURNING id |
| """, |
| (record["zone_id"], _to_date(record["date"]), |
| record.get("trigger_probability_7d"), |
| record.get("prediction_confidence"), |
| record.get("model_tier", "climatology"), |
| record.get("xgb_probability"), |
| record.get("lstm_probability"), |
| record.get("ensemble_method", "average"), |
| record.get("annual_cost_per_worker"), |
| record.get("payout_factor"), |
| record.get("learned_frequency")), |
| ) |
| row = cur.fetchone() |
| return row[0] if row else None |
|
|
|
|
| def get_recent_predictions(conn, zone_id: str, limit: int = 30) -> list[dict]: |
| """Fetch recent prediction records for a zone.""" |
| cur = conn.execute( |
| "SELECT * FROM predictions WHERE zone_id = %s ORDER BY date DESC LIMIT %s", |
| (zone_id, limit), |
| ) |
| cols = [d[0] for d in cur.description] if cur.description else [] |
| rows = cur.fetchall() |
| return [dict(zip(cols, r)) for r in rows] |
|
|
|
|
| |
|
|
| def insert_trigger_event(conn, event: dict) -> Optional[int]: |
| """Insert a trigger event.""" |
| cur = conn.execute( |
| """ |
| INSERT INTO trigger_events (zone_id, trigger_level, triggered_at, |
| max_temp_c, max_wbgt_c, consecutive_days, heat_risk_score, |
| settlement_type, payout_per_worker_usd, enrolled_workers, |
| total_payout_usd) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
| RETURNING id |
| """, |
| (event["zone_id"], event["trigger_level"], _to_datetime(event["triggered_at"]), |
| event.get("max_temp_c"), event.get("max_wbgt_c"), |
| event.get("consecutive_days"), event.get("heat_risk_score"), |
| event.get("settlement_type"), event.get("payout_per_worker_usd"), |
| event.get("enrolled_workers"), event.get("total_payout_usd")), |
| ) |
| row = cur.fetchone() |
| return row[0] if row else None |
|
|
|
|
| def get_trigger_events(conn, zone_id: Optional[str] = None, limit: int = 50) -> list[dict]: |
| """Fetch trigger events, optionally filtered by zone.""" |
| if zone_id: |
| cur = conn.execute( |
| "SELECT * FROM trigger_events WHERE zone_id = %s ORDER BY triggered_at DESC LIMIT %s", |
| (zone_id, limit), |
| ) |
| else: |
| cur = conn.execute( |
| "SELECT * FROM trigger_events ORDER BY triggered_at DESC LIMIT %s", |
| (limit,), |
| ) |
| cols = [d[0] for d in cur.description] if cur.description else [] |
| rows = cur.fetchall() |
| return [dict(zip(cols, r)) for r in rows] |
|
|
|
|
| |
|
|
| def insert_basis_risk(conn, report: dict) -> Optional[int]: |
| """Insert a basis risk assessment.""" |
| cur = conn.execute( |
| """ |
| INSERT INTO basis_risk (zone_id, overall_score, false_positive_rate, |
| false_negative_rate, correlation, mae, total_events, |
| true_positives, true_negatives, false_positives, false_negatives, |
| trigger_accuracy, tier_accuracy, recommendations, |
| confidence_low, confidence_high) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
| RETURNING id |
| """, |
| (report["zone_id"], report["overall_score"], |
| report.get("false_positive_rate"), report.get("false_negative_rate"), |
| report.get("correlation"), report.get("mae"), |
| report.get("total_events"), report.get("true_positives"), |
| report.get("true_negatives"), report.get("false_positives"), |
| report.get("false_negatives"), report.get("trigger_accuracy"), |
| json.dumps(report.get("tier_accuracy", {})), |
| report.get("recommendations", []), |
| report.get("confidence_low"), report.get("confidence_high")), |
| ) |
| row = cur.fetchone() |
| return row[0] if row else None |
|
|
|
|
| |
|
|
| def insert_explanation(conn, explanation: dict) -> Optional[int]: |
| """Insert a generated explanation.""" |
| cur = conn.execute( |
| """ |
| INSERT INTO explanations (trigger_event_id, zone_id, trigger_level, |
| english_text, swahili_text, payout_amount, payout_currency, |
| settlement_type, protective_actions, provider) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
| RETURNING id |
| """, |
| (explanation.get("trigger_event_id"), explanation["zone_id"], |
| explanation["trigger_level"], explanation["english_text"], |
| explanation["swahili_text"], explanation.get("payout_amount"), |
| explanation.get("payout_currency", "KES"), |
| explanation.get("settlement_type"), |
| explanation.get("protective_actions", []), |
| explanation.get("provider", "template")), |
| ) |
| row = cur.fetchone() |
| return row[0] if row else None |
|
|
|
|
| def get_explanations(conn, zone_id: Optional[str] = None, limit: int = 20) -> list[dict]: |
| """Fetch explanations, optionally by zone.""" |
| if zone_id: |
| cur = conn.execute( |
| "SELECT * FROM explanations WHERE zone_id = %s ORDER BY generated_at DESC LIMIT %s", |
| (zone_id, limit), |
| ) |
| else: |
| cur = conn.execute( |
| "SELECT * FROM explanations ORDER BY generated_at DESC LIMIT %s", |
| (limit,), |
| ) |
| cols = [d[0] for d in cur.description] if cur.description else [] |
| rows = cur.fetchall() |
| return [dict(zip(cols, r)) for r in rows] |
|
|
|
|
| |
|
|
| def insert_notification(conn, notif: dict) -> Optional[int]: |
| """Insert a notification delivery record.""" |
| cur = conn.execute( |
| """ |
| INSERT INTO notifications (explanation_id, zone_id, recipient, channel, |
| status, message_preview, message_sid, cost_estimate, error) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) |
| RETURNING id |
| """, |
| (notif.get("explanation_id"), notif["zone_id"], |
| notif["recipient"], notif["channel"], |
| notif["status"], notif.get("message_preview"), |
| notif.get("message_sid"), notif.get("cost_estimate", 0.0), |
| notif.get("error")), |
| ) |
| row = cur.fetchone() |
| return row[0] if row else None |
|
|
|
|
| def get_notifications(conn, zone_id: Optional[str] = None, limit: int = 50) -> list[dict]: |
| """Fetch notification records.""" |
| if zone_id: |
| cur = conn.execute( |
| "SELECT * FROM notifications WHERE zone_id = %s ORDER BY sent_at DESC LIMIT %s", |
| (zone_id, limit), |
| ) |
| else: |
| cur = conn.execute( |
| "SELECT * FROM notifications ORDER BY sent_at DESC LIMIT %s", |
| (limit,), |
| ) |
| cols = [d[0] for d in cur.description] if cur.description else [] |
| rows = cur.fetchall() |
| return [dict(zip(cols, r)) for r in rows] |
|
|
|
|
| |
|
|
| def start_pipeline_run(conn, run_id: Optional[str] = None) -> str: |
| """Record the start of a pipeline run. Returns run_id.""" |
| rid = run_id or f"run-{uuid.uuid4().hex[:12]}" |
| now = datetime.now(timezone.utc) |
| conn.execute( |
| """ |
| INSERT INTO pipeline_runs (run_id, started_at, status) |
| VALUES (%s, %s, 'running') |
| """, |
| (rid, now), |
| ) |
| return rid |
|
|
|
|
| def finish_pipeline_run( |
| conn, |
| run_id: str, |
| status: str = "completed", |
| steps_completed: Optional[list[str]] = None, |
| step_status: Optional[dict] = None, |
| error: Optional[str] = None, |
| zones_processed: int = 0, |
| total_cost_usd: float = 0.0, |
| ) -> None: |
| """Record the completion of a pipeline run.""" |
| now = datetime.now(timezone.utc) |
| conn.execute( |
| """ |
| UPDATE pipeline_runs |
| SET finished_at = %s, status = %s, steps_completed = %s, |
| step_status = %s, error = %s, zones_processed = %s, |
| total_cost_usd = %s |
| WHERE run_id = %s |
| """, |
| (now, status, |
| steps_completed or [], |
| json.dumps(step_status or {}), |
| error, zones_processed, |
| total_cost_usd, |
| run_id), |
| ) |
|
|
|
|
| def get_recent_runs(conn, limit: int = 10) -> list[dict]: |
| """Fetch recent pipeline runs.""" |
| cur = conn.execute( |
| "SELECT * FROM pipeline_runs ORDER BY started_at DESC LIMIT %s", |
| (limit,), |
| ) |
| cols = [d[0] for d in cur.description] if cur.description else [] |
| rows = cur.fetchall() |
| return [dict(zip(cols, r)) for r in rows] |
|
|