""" PostgreSQL schema for the Climate Risk Index Engine. All table definitions as SQL strings with proper types, foreign keys, indexes, and constraints. Tables are designed to be created in order (referenced tables first). """ from __future__ import annotations # ── Table creation order (respects foreign keys) ───────────────────────── TABLES_ORDERED: list[str] = [ "zones", "daily_readings", "healed_readings", "healing_log", "heat_indices", "predictions", "trigger_events", "basis_risk", "explanations", "notifications", "pipeline_runs", ] # ── DDL statements ─────────────────────────────────────────────────────── CREATE_ZONES = """ CREATE TABLE IF NOT EXISTS zones ( zone_id TEXT PRIMARY KEY, name TEXT NOT NULL, city TEXT NOT NULL, country TEXT NOT NULL, latitude DOUBLE PRECISION NOT NULL, longitude DOUBLE PRECISION NOT NULL, elevation_m DOUBLE PRECISION, area_km2 DOUBLE PRECISION, population_est INTEGER, settlement_type TEXT NOT NULL CHECK (settlement_type IN ('formal', 'informal', 'mixed', 'commercial')), worker_population_est INTEGER, outdoor_exposure_pct DOUBLE PRECISION, heat_vulnerability TEXT NOT NULL CHECK (heat_vulnerability IN ('high', 'moderate', 'low')), hot_months INTEGER[] DEFAULT '{}', notes TEXT DEFAULT '', created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_zones_city ON zones (city); CREATE INDEX IF NOT EXISTS idx_zones_settlement ON zones (settlement_type); CREATE INDEX IF NOT EXISTS idx_zones_vulnerability ON zones (heat_vulnerability); """ CREATE_DAILY_READINGS = """ CREATE TABLE IF NOT EXISTS daily_readings ( id BIGSERIAL PRIMARY KEY, zone_id TEXT NOT NULL REFERENCES zones(zone_id), date DATE NOT NULL, temp_mean_c DOUBLE PRECISION, temp_max_c DOUBLE PRECISION, temp_min_c DOUBLE PRECISION, humidity_pct DOUBLE PRECISION, wind_speed_ms DOUBLE PRECISION, solar_rad_wm2 DOUBLE PRECISION, precip_mm DOUBLE PRECISION, source TEXT DEFAULT 'unknown', data_quality DOUBLE PRECISION DEFAULT 0.0, ingested_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE (zone_id, date) ); CREATE INDEX IF NOT EXISTS idx_daily_readings_zone_date ON daily_readings (zone_id, date DESC); CREATE INDEX IF NOT EXISTS idx_daily_readings_date ON daily_readings (date DESC); """ CREATE_HEALED_READINGS = """ CREATE TABLE IF NOT EXISTS healed_readings ( id BIGSERIAL PRIMARY KEY, zone_id TEXT NOT NULL REFERENCES zones(zone_id), date DATE NOT NULL, raw_reading_id BIGINT REFERENCES daily_readings(id), temp_mean_c DOUBLE PRECISION, temp_max_c DOUBLE PRECISION, temp_min_c DOUBLE PRECISION, humidity_pct DOUBLE PRECISION, wind_speed_ms DOUBLE PRECISION, quality_score DOUBLE PRECISION DEFAULT 0.0, heal_action TEXT DEFAULT 'passthrough', fields_corrected TEXT[] DEFAULT '{}', healed_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE (zone_id, date) ); CREATE INDEX IF NOT EXISTS idx_healed_readings_zone_date ON healed_readings (zone_id, date DESC); CREATE INDEX IF NOT EXISTS idx_healed_readings_quality ON healed_readings (quality_score); """ CREATE_HEALING_LOG = """ CREATE TABLE IF NOT EXISTS healing_log ( id BIGSERIAL PRIMARY KEY, zone_id TEXT NOT NULL REFERENCES zones(zone_id), date DATE NOT NULL, healed_reading_id BIGINT REFERENCES healed_readings(id), agent_type TEXT DEFAULT 'rule_based', reasoning TEXT, corrections JSONB DEFAULT '{}', tools_used TEXT[] DEFAULT '{}', confidence DOUBLE PRECISION, tokens_used INTEGER DEFAULT 0, latency_ms INTEGER DEFAULT 0, created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_healing_log_zone ON healing_log (zone_id, date DESC); """ CREATE_HEAT_INDICES = """ CREATE TABLE IF NOT EXISTS heat_indices ( id BIGSERIAL PRIMARY KEY, zone_id TEXT NOT NULL REFERENCES zones(zone_id), date DATE NOT NULL, grid_temp_c DOUBLE PRECISION, uhi_delta_c DOUBLE PRECISION, corrected_temp_c DOUBLE PRECISION, wbgt_c DOUBLE PRECISION, heat_index_c DOUBLE PRECISION, heat_risk_score DOUBLE PRECISION CHECK (heat_risk_score BETWEEN 0 AND 100), risk_level TEXT CHECK (risk_level IN ('low', 'moderate', 'high', 'critical')), consecutive_hot_days INTEGER DEFAULT 0, computed_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE (zone_id, date) ); CREATE INDEX IF NOT EXISTS idx_heat_indices_zone_date ON heat_indices (zone_id, date DESC); CREATE INDEX IF NOT EXISTS idx_heat_indices_risk ON heat_indices (risk_level); """ CREATE_PREDICTIONS = """ CREATE TABLE IF NOT EXISTS predictions ( id BIGSERIAL PRIMARY KEY, zone_id TEXT NOT NULL REFERENCES zones(zone_id), date DATE NOT NULL, trigger_probability_7d DOUBLE PRECISION CHECK (trigger_probability_7d BETWEEN 0 AND 1), prediction_confidence DOUBLE PRECISION CHECK (prediction_confidence BETWEEN 0 AND 1), model_tier TEXT DEFAULT 'climatology', xgb_probability DOUBLE PRECISION, lstm_probability DOUBLE PRECISION, ensemble_method TEXT DEFAULT 'average', predicted_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE (zone_id, date) ); CREATE INDEX IF NOT EXISTS idx_predictions_zone_date ON predictions (zone_id, date DESC); CREATE INDEX IF NOT EXISTS idx_predictions_tier ON predictions (model_tier); """ CREATE_TRIGGER_EVENTS = """ CREATE TABLE IF NOT EXISTS trigger_events ( id BIGSERIAL PRIMARY KEY, zone_id TEXT NOT NULL REFERENCES zones(zone_id), trigger_level TEXT NOT NULL CHECK (trigger_level IN ('critical', 'warning', 'watch')), triggered_at TIMESTAMPTZ NOT NULL, max_temp_c DOUBLE PRECISION, max_wbgt_c DOUBLE PRECISION, consecutive_days INTEGER, heat_risk_score DOUBLE PRECISION, settlement_type TEXT, payout_per_worker_usd DOUBLE PRECISION, enrolled_workers INTEGER, total_payout_usd DOUBLE PRECISION, resolved_at TIMESTAMPTZ, resolution_notes TEXT, created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_trigger_events_zone ON trigger_events (zone_id, triggered_at DESC); CREATE INDEX IF NOT EXISTS idx_trigger_events_level ON trigger_events (trigger_level); CREATE INDEX IF NOT EXISTS idx_trigger_events_unresolved ON trigger_events (zone_id) WHERE resolved_at IS NULL; """ CREATE_BASIS_RISK = """ CREATE TABLE IF NOT EXISTS basis_risk ( id BIGSERIAL PRIMARY KEY, zone_id TEXT NOT NULL REFERENCES zones(zone_id), assessed_at TIMESTAMPTZ DEFAULT NOW(), overall_score DOUBLE PRECISION NOT NULL CHECK (overall_score BETWEEN 0 AND 1), false_positive_rate DOUBLE PRECISION, false_negative_rate DOUBLE PRECISION, correlation DOUBLE PRECISION, mae DOUBLE PRECISION, total_events INTEGER, true_positives INTEGER, true_negatives INTEGER, false_positives INTEGER, false_negatives INTEGER, trigger_accuracy DOUBLE PRECISION, tier_accuracy JSONB DEFAULT '{}', recommendations TEXT[] DEFAULT '{}', confidence_low DOUBLE PRECISION, confidence_high DOUBLE PRECISION ); CREATE INDEX IF NOT EXISTS idx_basis_risk_zone ON basis_risk (zone_id, assessed_at DESC); """ CREATE_EXPLANATIONS = """ CREATE TABLE IF NOT EXISTS explanations ( id BIGSERIAL PRIMARY KEY, trigger_event_id BIGINT REFERENCES trigger_events(id), zone_id TEXT NOT NULL REFERENCES zones(zone_id), trigger_level TEXT NOT NULL, english_text TEXT NOT NULL, swahili_text TEXT NOT NULL, payout_amount DOUBLE PRECISION, payout_currency TEXT DEFAULT 'USD', settlement_type TEXT, protective_actions TEXT[] DEFAULT '{}', provider TEXT DEFAULT 'template', generated_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_explanations_trigger ON explanations (trigger_event_id); CREATE INDEX IF NOT EXISTS idx_explanations_zone ON explanations (zone_id, generated_at DESC); """ CREATE_NOTIFICATIONS = """ CREATE TABLE IF NOT EXISTS notifications ( id BIGSERIAL PRIMARY KEY, explanation_id BIGINT REFERENCES explanations(id), zone_id TEXT NOT NULL REFERENCES zones(zone_id), recipient TEXT NOT NULL, channel TEXT NOT NULL CHECK (channel IN ('console', 'sms', 'whatsapp')), status TEXT NOT NULL CHECK (status IN ('sent', 'failed', 'dry_run', 'pending')), language TEXT DEFAULT 'en', message_preview TEXT, message_sid TEXT, cost_estimate DOUBLE PRECISION DEFAULT 0.0, error TEXT, sent_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_notifications_zone ON notifications (zone_id, sent_at DESC); CREATE INDEX IF NOT EXISTS idx_notifications_status ON notifications (status); """ CREATE_PIPELINE_RUNS = """ CREATE TABLE IF NOT EXISTS pipeline_runs ( id BIGSERIAL PRIMARY KEY, run_id TEXT UNIQUE NOT NULL, started_at TIMESTAMPTZ NOT NULL, finished_at TIMESTAMPTZ, status TEXT DEFAULT 'running' CHECK (status IN ('running', 'completed', 'failed', 'partial')), zones_processed INTEGER DEFAULT 0, triggers_found INTEGER DEFAULT 0, notifications_sent INTEGER DEFAULT 0, steps_completed TEXT[] DEFAULT '{}', step_status JSONB DEFAULT '{}', total_cost_usd DOUBLE PRECISION DEFAULT 0, error TEXT, duration_s DOUBLE PRECISION, config_snapshot JSONB DEFAULT '{}' ); CREATE INDEX IF NOT EXISTS idx_pipeline_runs_status ON pipeline_runs (status); CREATE INDEX IF NOT EXISTS idx_pipeline_runs_started ON pipeline_runs (started_at DESC); """ CREATE_WORKERS = """ CREATE TABLE IF NOT EXISTS workers ( worker_id TEXT PRIMARY KEY, name TEXT NOT NULL, name_swahili TEXT, nida_id TEXT, phone TEXT NOT NULL, zone_id TEXT NOT NULL REFERENCES zones(zone_id), occupation TEXT NOT NULL, age INTEGER, years_outdoor INTEGER, household_size INTEGER, mobile_money TEXT, tasaf_enrolled BOOLEAN DEFAULT false, enrolled_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_workers_zone ON workers (zone_id); """ # ── Aggregate DDL ──────────────────────────────────────────────────────── CREATE_ZONE_THRESHOLDS = """ CREATE TABLE IF NOT EXISTS zone_thresholds ( zone_id TEXT PRIMARY KEY REFERENCES zones(zone_id), alert_threshold_c DOUBLE PRECISION NOT NULL, payout_threshold_c DOUBLE PRECISION NOT NULL, uhi_model TEXT NOT NULL, threshold_mode TEXT NOT NULL, computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS zone_thresholds_computed_at_idx ON zone_thresholds(computed_at DESC); """ ALL_DDL: dict[str, str] = { "zones": CREATE_ZONES, "daily_readings": CREATE_DAILY_READINGS, "healed_readings": CREATE_HEALED_READINGS, "healing_log": CREATE_HEALING_LOG, "heat_indices": CREATE_HEAT_INDICES, "predictions": CREATE_PREDICTIONS, "trigger_events": CREATE_TRIGGER_EVENTS, "basis_risk": CREATE_BASIS_RISK, "explanations": CREATE_EXPLANATIONS, "notifications": CREATE_NOTIFICATIONS, "pipeline_runs": CREATE_PIPELINE_RUNS, "workers": CREATE_WORKERS, "zone_thresholds": CREATE_ZONE_THRESHOLDS, } def get_full_ddl() -> str: """Return the complete DDL script to create all tables in order.""" parts = [f"-- Table: {name}\n{ddl}" for name, ddl in ALL_DDL.items()] return "\n\n".join(parts) def get_table_names() -> list[str]: """Return table names in creation order.""" return list(TABLES_ORDERED)