| """ |
| T2.3 Β· forecaster.py |
| 24-hour-ahead probabilistic outage forecaster. |
| Outputs P(outage) and E[duration | outage] per hour. |
| |
| Usage: |
| from forecaster import Forecaster |
| fc = Forecaster() |
| fc.fit("grid_history.csv") |
| forecast = fc.predict_next_24h() # list of 24 dicts |
| |
| API endpoint (fast path): |
| python forecaster.py --serve # prints JSON, <300ms on CPU |
| python forecaster.py --eval # rolling 30-day Brier + MAE |
| """ |
|
|
| import argparse |
| import json |
| import time |
| import warnings |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| from lightgbm import LGBMClassifier, LGBMRegressor |
|
|
| warnings.filterwarnings("ignore") |
|
|
| MODEL_PATH_CLF = "model_outage_clf.pkl" |
| MODEL_PATH_REG = "model_duration_reg.pkl" |
|
|
|
|
| |
|
|
| def build_features(df: pd.DataFrame) -> pd.DataFrame: |
| df = df.copy() |
| df["timestamp"] = pd.to_datetime(df["timestamp"]) |
| df = df.sort_values("timestamp").reset_index(drop=True) |
|
|
| df["hour"] = df["timestamp"].dt.hour |
| df["dayofweek"] = df["timestamp"].dt.dayofweek |
| df["month"] = df["timestamp"].dt.month |
| df["is_weekend"] = (df["dayofweek"] >= 5).astype(int) |
| df["is_peak_morning"] = ((df["hour"] >= 7) & (df["hour"] <= 10)).astype(int) |
| df["is_peak_evening"] = ((df["hour"] >= 17) & (df["hour"] <= 21)).astype(int) |
| df["is_rainy_season"] = df["month"].isin([4, 5, 10, 11]).astype(int) |
|
|
| |
| for lag in [1, 2, 24, 48]: |
| df[f"load_lag{lag}"] = df["load_mw"].shift(lag) |
|
|
| |
| df["load_roll3_mean"] = df["load_mw"].shift(1).rolling(3).mean() |
| df["load_roll6_std"] = df["load_mw"].shift(1).rolling(6).std() |
| df["rain_roll3_sum"] = df["rain_mm"].shift(1).rolling(3).sum() |
| df["outage_lag1"] = df["outage"].shift(1) |
| df["outage_roll6_sum"] = df["outage"].shift(1).rolling(6).sum() |
|
|
| df = df.dropna().reset_index(drop=True) |
| return df |
|
|
|
|
| FEATURE_COLS = [ |
| "load_lag1", "load_lag2", "load_lag24", "load_lag48", |
| "load_roll3_mean", "load_roll6_std", "rain_roll3_sum", |
| "temp_c", "humidity", "wind_ms", "rain_mm", |
| "hour", "dayofweek", "month", "is_weekend", |
| "is_peak_morning", "is_peak_evening", "is_rainy_season", |
| "outage_lag1", "outage_roll6_sum", |
| ] |
|
|
|
|
| |
|
|
| class Forecaster: |
| def __init__(self): |
| self.clf = LGBMClassifier( |
| n_estimators=200, |
| learning_rate=0.05, |
| max_depth=5, |
| num_leaves=31, |
| class_weight="balanced", |
| random_state=42, |
| verbose=-1, |
| ) |
| self.reg = LGBMRegressor( |
| n_estimators=200, |
| learning_rate=0.05, |
| max_depth=5, |
| num_leaves=31, |
| random_state=42, |
| verbose=-1, |
| ) |
| self.df_features = None |
| self.is_fitted = False |
|
|
| def fit(self, csv_path: str = "grid_history.csv"): |
| df_raw = pd.read_csv(csv_path) |
| df = build_features(df_raw) |
| self.df_features = df |
|
|
| X = df[FEATURE_COLS] |
| y_clf = df["outage"] |
| y_reg = df.loc[df["outage"] == 1, "duration_min"] |
| X_reg = df.loc[df["outage"] == 1, FEATURE_COLS] |
|
|
| self.clf.fit(X, y_clf) |
| self.reg.fit(X_reg, y_reg) |
| self.is_fitted = True |
| print(f"β Forecaster fitted on {len(df)} rows") |
| return self |
|
|
| def predict_next_24h(self, reference_time=None) -> list[dict]: |
| """ |
| Build a 24-hour ahead forecast from the last known data point. |
| Returns list of 24 dicts: {hour, timestamp, p_outage, expected_duration_min, risk_level} |
| """ |
| if not self.is_fitted: |
| raise RuntimeError("Call fit() first.") |
|
|
| df = self.df_features |
| |
| last = df.iloc[-1] |
| if reference_time is None: |
| reference_time = pd.to_datetime(last["timestamp"]) + pd.Timedelta(hours=1) |
|
|
| forecast = [] |
| |
| for offset in range(24): |
| ts = reference_time + pd.Timedelta(hours=offset) |
| h = ts.hour |
| dow = ts.dayofweek |
| month = ts.month |
|
|
| |
| row = { |
| "load_lag1": last["load_mw"], |
| "load_lag2": df.iloc[-2]["load_mw"] if len(df) > 2 else last["load_mw"], |
| "load_lag24": df.iloc[-24]["load_mw"] if len(df) >= 24 else last["load_mw"], |
| "load_lag48": df.iloc[-48]["load_mw"] if len(df) >= 48 else last["load_mw"], |
| "load_roll3_mean": df["load_mw"].iloc[-3:].mean(), |
| "load_roll6_std": df["load_mw"].iloc[-6:].std(), |
| "rain_roll3_sum": df["rain_mm"].iloc[-3:].sum(), |
| "temp_c": last["temp_c"] + 2 * np.sin(2 * np.pi * (h - 14) / 24), |
| "humidity": float(np.clip(last["humidity"] + np.random.normal(0, 2), 30, 99)), |
| "wind_ms": max(0, float(last["wind_ms"])), |
| "rain_mm": float(last["rain_mm"] * 0.7), |
| "hour": h, |
| "dayofweek": dow, |
| "month": month, |
| "is_weekend": int(dow >= 5), |
| "is_peak_morning": int(7 <= h <= 10), |
| "is_peak_evening": int(17 <= h <= 21), |
| "is_rainy_season": int(month in [4, 5, 10, 11]), |
| "outage_lag1": int(last["outage"]), |
| "outage_roll6_sum": float(df["outage"].iloc[-6:].sum()), |
| } |
|
|
| X_row = pd.DataFrame([row])[FEATURE_COLS] |
| p_out = float(self.clf.predict_proba(X_row)[0, 1]) |
| exp_dur = float(self.reg.predict(X_row)[0]) if p_out > 0.05 else 0.0 |
| exp_dur = max(0, exp_dur) |
|
|
| |
| p_low = max(0.0, p_out - 0.08) |
| p_high = min(1.0, p_out + 0.08) |
|
|
| risk = "HIGH" if p_out >= 0.25 else "MEDIUM" if p_out >= 0.12 else "LOW" |
|
|
| forecast.append({ |
| "hour_offset": offset, |
| "timestamp": ts.strftime("%Y-%m-%d %H:%M"), |
| "hour": h, |
| "p_outage": round(p_out, 4), |
| "p_outage_low": round(p_low, 4), |
| "p_outage_high": round(p_high, 4), |
| "expected_duration_min": round(exp_dur, 1), |
| "risk_level": risk, |
| }) |
|
|
| return forecast |
|
|
| def save(self): |
| import pickle |
| with open(MODEL_PATH_CLF, "wb") as f: |
| pickle.dump(self.clf, f) |
| with open(MODEL_PATH_REG, "wb") as f: |
| pickle.dump(self.reg, f) |
| print(f"β Models saved: {MODEL_PATH_CLF}, {MODEL_PATH_REG}") |
|
|
| @classmethod |
| def load(cls): |
| import pickle |
| fc = cls() |
| with open(MODEL_PATH_CLF, "rb") as f: |
| fc.clf = pickle.load(f) |
| with open(MODEL_PATH_REG, "rb") as f: |
| fc.reg = pickle.load(f) |
| |
| if Path("grid_history.csv").exists(): |
| df_raw = pd.read_csv("grid_history.csv") |
| fc.df_features = build_features(df_raw) |
| fc.is_fitted = True |
| return fc |
|
|
|
|
| |
|
|
| def rolling_eval(csv_path: str = "grid_history.csv", window_days: int = 30): |
| """ |
| Rolling 30-day held-out evaluation. |
| Returns: brier_score, mae_duration, avg_lead_time_hours |
| """ |
| df_raw = pd.read_csv(csv_path) |
| df = build_features(df_raw) |
|
|
| |
| test_cutoff = df["timestamp"].max() - pd.Timedelta(days=window_days) |
| df_train = df[df["timestamp"] <= test_cutoff] |
| df_test = df[df["timestamp"] > test_cutoff] |
|
|
| X_train = df_train[FEATURE_COLS] |
| y_train = df_train["outage"] |
| X_test = df_test[FEATURE_COLS] |
| y_test = df_test["outage"] |
|
|
| clf = LGBMClassifier(n_estimators=200, learning_rate=0.05, max_depth=5, |
| class_weight="balanced", random_state=42, verbose=-1) |
| clf.fit(X_train, y_train) |
| probs = clf.predict_proba(X_test)[:, 1] |
|
|
| |
| brier = float(np.mean((probs - y_test.values) ** 2)) |
|
|
| |
| df_train_out = df_train[df_train["outage"] == 1] |
| df_test_out = df_test[df_test["outage"] == 1] |
| mae_dur = None |
| if len(df_train_out) > 5 and len(df_test_out) > 0: |
| reg = LGBMRegressor(n_estimators=200, random_state=42, verbose=-1) |
| reg.fit(df_train_out[FEATURE_COLS], df_train_out["duration_min"]) |
| preds_dur = reg.predict(df_test_out[FEATURE_COLS]) |
| mae_dur = float(np.mean(np.abs(preds_dur - df_test_out["duration_min"].values))) |
|
|
| |
| df_test2 = df_test.copy() |
| df_test2["pred_prob"] = probs |
| df_test2["flagged"] = (probs >= 0.15).astype(int) |
| outage_hours = df_test2[df_test2["outage"] == 1].index |
| lead_times = [] |
| for idx in outage_hours: |
| |
| look_back = df_test2.loc[max(df_test2.index[0], idx-3):idx-1] |
| if len(look_back) > 0 and look_back["flagged"].any(): |
| lead_times.append(look_back["flagged"].sum()) |
| avg_lead = float(np.mean(lead_times)) if lead_times else 0.0 |
|
|
| return { |
| "brier_score": round(brier, 4), |
| "mae_duration_min": round(mae_dur, 1) if mae_dur else None, |
| "avg_lead_time_hours": round(avg_lead, 2), |
| "n_test_hours": len(df_test), |
| "n_test_outages": int(y_test.sum()), |
| } |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--serve", action="store_true", help="Print 24h forecast JSON") |
| parser.add_argument("--eval", action="store_true", help="Run rolling evaluation") |
| parser.add_argument("--fit", action="store_true", help="Fit and save model") |
| args = parser.parse_args() |
|
|
| if args.eval: |
| print("Running rolling 30-day evaluation...") |
| metrics = rolling_eval() |
| print(json.dumps(metrics, indent=2)) |
|
|
| elif args.serve: |
| t0 = time.time() |
| fc = Forecaster().fit("grid_history.csv") |
| forecast = fc.predict_next_24h() |
| elapsed_ms = (time.time() - t0) * 1000 |
| output = {"forecast": forecast, "generated_at": pd.Timestamp.now().isoformat(), |
| "latency_ms": round(elapsed_ms, 1)} |
| print(json.dumps(output, indent=2)) |
| print(f"\nβ± Total latency: {elapsed_ms:.0f}ms", flush=True) |
|
|
| elif args.fit: |
| fc = Forecaster().fit("grid_history.csv") |
| fc.save() |
|
|
| else: |
| |
| fc = Forecaster().fit("grid_history.csv") |
| forecast = fc.predict_next_24h() |
| print("\n24-Hour Forecast Preview:") |
| print(f"{'Hour':>5} {'Time':>15} {'P(outage)':>10} {'ExpDur(min)':>12} {'Risk':>8}") |
| print("-" * 55) |
| for row in forecast: |
| print(f"{row['hour']:>5} {row['timestamp']:>15} {row['p_outage']:>10.3f} " |
| f"{row['expected_duration_min']:>12.0f} {row['risk_level']:>8}") |
|
|