KB-Infinity-Tech's picture
Upload 18 files
099d46e verified
"""
T2.3 Β· forecaster.py
24-hour-ahead probabilistic outage forecaster.
Outputs P(outage) and E[duration | outage] per hour.
Usage:
from forecaster import Forecaster
fc = Forecaster()
fc.fit("grid_history.csv")
forecast = fc.predict_next_24h() # list of 24 dicts
API endpoint (fast path):
python forecaster.py --serve # prints JSON, <300ms on CPU
python forecaster.py --eval # rolling 30-day Brier + MAE
"""
import argparse
import json
import time
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
from lightgbm import LGBMClassifier, LGBMRegressor
warnings.filterwarnings("ignore")
MODEL_PATH_CLF = "model_outage_clf.pkl"
MODEL_PATH_REG = "model_duration_reg.pkl"
# ── Feature Engineering ───────────────────────────────────────────────────────
def build_features(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["timestamp"] = pd.to_datetime(df["timestamp"])
df = df.sort_values("timestamp").reset_index(drop=True)
df["hour"] = df["timestamp"].dt.hour
df["dayofweek"] = df["timestamp"].dt.dayofweek
df["month"] = df["timestamp"].dt.month
df["is_weekend"] = (df["dayofweek"] >= 5).astype(int)
df["is_peak_morning"] = ((df["hour"] >= 7) & (df["hour"] <= 10)).astype(int)
df["is_peak_evening"] = ((df["hour"] >= 17) & (df["hour"] <= 21)).astype(int)
df["is_rainy_season"] = df["month"].isin([4, 5, 10, 11]).astype(int)
# Lagged load features (1h, 2h, 24h, 48h)
for lag in [1, 2, 24, 48]:
df[f"load_lag{lag}"] = df["load_mw"].shift(lag)
# Rolling stats
df["load_roll3_mean"] = df["load_mw"].shift(1).rolling(3).mean()
df["load_roll6_std"] = df["load_mw"].shift(1).rolling(6).std()
df["rain_roll3_sum"] = df["rain_mm"].shift(1).rolling(3).sum()
df["outage_lag1"] = df["outage"].shift(1)
df["outage_roll6_sum"] = df["outage"].shift(1).rolling(6).sum()
df = df.dropna().reset_index(drop=True)
return df
FEATURE_COLS = [
"load_lag1", "load_lag2", "load_lag24", "load_lag48",
"load_roll3_mean", "load_roll6_std", "rain_roll3_sum",
"temp_c", "humidity", "wind_ms", "rain_mm",
"hour", "dayofweek", "month", "is_weekend",
"is_peak_morning", "is_peak_evening", "is_rainy_season",
"outage_lag1", "outage_roll6_sum",
]
# ── Forecaster Class ──────────────────────────────────────────────────────────
class Forecaster:
def __init__(self):
self.clf = LGBMClassifier(
n_estimators=200,
learning_rate=0.05,
max_depth=5,
num_leaves=31,
class_weight="balanced",
random_state=42,
verbose=-1,
)
self.reg = LGBMRegressor(
n_estimators=200,
learning_rate=0.05,
max_depth=5,
num_leaves=31,
random_state=42,
verbose=-1,
)
self.df_features = None
self.is_fitted = False
def fit(self, csv_path: str = "grid_history.csv"):
df_raw = pd.read_csv(csv_path)
df = build_features(df_raw)
self.df_features = df # store for forecasting context
X = df[FEATURE_COLS]
y_clf = df["outage"]
y_reg = df.loc[df["outage"] == 1, "duration_min"]
X_reg = df.loc[df["outage"] == 1, FEATURE_COLS]
self.clf.fit(X, y_clf)
self.reg.fit(X_reg, y_reg)
self.is_fitted = True
print(f"βœ“ Forecaster fitted on {len(df)} rows")
return self
def predict_next_24h(self, reference_time=None) -> list[dict]:
"""
Build a 24-hour ahead forecast from the last known data point.
Returns list of 24 dicts: {hour, timestamp, p_outage, expected_duration_min, risk_level}
"""
if not self.is_fitted:
raise RuntimeError("Call fit() first.")
df = self.df_features
# Use last row as context anchor
last = df.iloc[-1]
if reference_time is None:
reference_time = pd.to_datetime(last["timestamp"]) + pd.Timedelta(hours=1)
forecast = []
# We'll use the last known feature values and adjust hour/temporal features
for offset in range(24):
ts = reference_time + pd.Timedelta(hours=offset)
h = ts.hour
dow = ts.dayofweek
month = ts.month
# Build feature row (use last context for lagged values; simplified for inference)
row = {
"load_lag1": last["load_mw"],
"load_lag2": df.iloc[-2]["load_mw"] if len(df) > 2 else last["load_mw"],
"load_lag24": df.iloc[-24]["load_mw"] if len(df) >= 24 else last["load_mw"],
"load_lag48": df.iloc[-48]["load_mw"] if len(df) >= 48 else last["load_mw"],
"load_roll3_mean": df["load_mw"].iloc[-3:].mean(),
"load_roll6_std": df["load_mw"].iloc[-6:].std(),
"rain_roll3_sum": df["rain_mm"].iloc[-3:].sum(),
"temp_c": last["temp_c"] + 2 * np.sin(2 * np.pi * (h - 14) / 24),
"humidity": float(np.clip(last["humidity"] + np.random.normal(0, 2), 30, 99)),
"wind_ms": max(0, float(last["wind_ms"])),
"rain_mm": float(last["rain_mm"] * 0.7), # decay
"hour": h,
"dayofweek": dow,
"month": month,
"is_weekend": int(dow >= 5),
"is_peak_morning": int(7 <= h <= 10),
"is_peak_evening": int(17 <= h <= 21),
"is_rainy_season": int(month in [4, 5, 10, 11]),
"outage_lag1": int(last["outage"]),
"outage_roll6_sum": float(df["outage"].iloc[-6:].sum()),
}
X_row = pd.DataFrame([row])[FEATURE_COLS]
p_out = float(self.clf.predict_proba(X_row)[0, 1])
exp_dur = float(self.reg.predict(X_row)[0]) if p_out > 0.05 else 0.0
exp_dur = max(0, exp_dur)
# Add calibrated uncertainty band (Β±1 sigma heuristic)
p_low = max(0.0, p_out - 0.08)
p_high = min(1.0, p_out + 0.08)
risk = "HIGH" if p_out >= 0.25 else "MEDIUM" if p_out >= 0.12 else "LOW"
forecast.append({
"hour_offset": offset,
"timestamp": ts.strftime("%Y-%m-%d %H:%M"),
"hour": h,
"p_outage": round(p_out, 4),
"p_outage_low": round(p_low, 4),
"p_outage_high": round(p_high, 4),
"expected_duration_min": round(exp_dur, 1),
"risk_level": risk,
})
return forecast
def save(self):
import pickle
with open(MODEL_PATH_CLF, "wb") as f:
pickle.dump(self.clf, f)
with open(MODEL_PATH_REG, "wb") as f:
pickle.dump(self.reg, f)
print(f"βœ“ Models saved: {MODEL_PATH_CLF}, {MODEL_PATH_REG}")
@classmethod
def load(cls):
import pickle
fc = cls()
with open(MODEL_PATH_CLF, "rb") as f:
fc.clf = pickle.load(f)
with open(MODEL_PATH_REG, "rb") as f:
fc.reg = pickle.load(f)
# Need df_features for inference context; rebuild from CSV
if Path("grid_history.csv").exists():
df_raw = pd.read_csv("grid_history.csv")
fc.df_features = build_features(df_raw)
fc.is_fitted = True
return fc
# ── Rolling Evaluation ────────────────────────────────────────────────────────
def rolling_eval(csv_path: str = "grid_history.csv", window_days: int = 30):
"""
Rolling 30-day held-out evaluation.
Returns: brier_score, mae_duration, avg_lead_time_hours
"""
df_raw = pd.read_csv(csv_path)
df = build_features(df_raw)
# Use last 30 days as test, rest as train
test_cutoff = df["timestamp"].max() - pd.Timedelta(days=window_days)
df_train = df[df["timestamp"] <= test_cutoff]
df_test = df[df["timestamp"] > test_cutoff]
X_train = df_train[FEATURE_COLS]
y_train = df_train["outage"]
X_test = df_test[FEATURE_COLS]
y_test = df_test["outage"]
clf = LGBMClassifier(n_estimators=200, learning_rate=0.05, max_depth=5,
class_weight="balanced", random_state=42, verbose=-1)
clf.fit(X_train, y_train)
probs = clf.predict_proba(X_test)[:, 1]
# Brier score
brier = float(np.mean((probs - y_test.values) ** 2))
# Duration MAE (on true outage hours)
df_train_out = df_train[df_train["outage"] == 1]
df_test_out = df_test[df_test["outage"] == 1]
mae_dur = None
if len(df_train_out) > 5 and len(df_test_out) > 0:
reg = LGBMRegressor(n_estimators=200, random_state=42, verbose=-1)
reg.fit(df_train_out[FEATURE_COLS], df_train_out["duration_min"])
preds_dur = reg.predict(df_test_out[FEATURE_COLS])
mae_dur = float(np.mean(np.abs(preds_dur - df_test_out["duration_min"].values)))
# Lead time: for each true outage, find if model flagged it β‰₯1h before
df_test2 = df_test.copy()
df_test2["pred_prob"] = probs
df_test2["flagged"] = (probs >= 0.15).astype(int)
outage_hours = df_test2[df_test2["outage"] == 1].index
lead_times = []
for idx in outage_hours:
# look back up to 3 rows
look_back = df_test2.loc[max(df_test2.index[0], idx-3):idx-1]
if len(look_back) > 0 and look_back["flagged"].any():
lead_times.append(look_back["flagged"].sum())
avg_lead = float(np.mean(lead_times)) if lead_times else 0.0
return {
"brier_score": round(brier, 4),
"mae_duration_min": round(mae_dur, 1) if mae_dur else None,
"avg_lead_time_hours": round(avg_lead, 2),
"n_test_hours": len(df_test),
"n_test_outages": int(y_test.sum()),
}
# ── CLI ───────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--serve", action="store_true", help="Print 24h forecast JSON")
parser.add_argument("--eval", action="store_true", help="Run rolling evaluation")
parser.add_argument("--fit", action="store_true", help="Fit and save model")
args = parser.parse_args()
if args.eval:
print("Running rolling 30-day evaluation...")
metrics = rolling_eval()
print(json.dumps(metrics, indent=2))
elif args.serve:
t0 = time.time()
fc = Forecaster().fit("grid_history.csv")
forecast = fc.predict_next_24h()
elapsed_ms = (time.time() - t0) * 1000
output = {"forecast": forecast, "generated_at": pd.Timestamp.now().isoformat(),
"latency_ms": round(elapsed_ms, 1)}
print(json.dumps(output, indent=2))
print(f"\n⏱ Total latency: {elapsed_ms:.0f}ms", flush=True)
elif args.fit:
fc = Forecaster().fit("grid_history.csv")
fc.save()
else:
# Default: fit + quick forecast preview
fc = Forecaster().fit("grid_history.csv")
forecast = fc.predict_next_24h()
print("\n24-Hour Forecast Preview:")
print(f"{'Hour':>5} {'Time':>15} {'P(outage)':>10} {'ExpDur(min)':>12} {'Risk':>8}")
print("-" * 55)
for row in forecast:
print(f"{row['hour']:>5} {row['timestamp']:>15} {row['p_outage']:>10.3f} "
f"{row['expected_duration_min']:>12.0f} {row['risk_level']:>8}")