Spaces:
Sleeping
Sleeping
| """ | |
| Transformer Oil Temperature Forecasting & Anomaly Detection | |
| Using ARIMAX model with Gradio UI for Hugging Face Spaces | |
| """ | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib | |
| matplotlib.use("Agg") # Non-interactive backend for server environments | |
| import matplotlib.pyplot as plt | |
| import matplotlib.gridspec as gridspec | |
| import seaborn as sns | |
| import io | |
| import gradio as gr | |
| import tempfile | |
| from statsmodels.tsa.stattools import adfuller | |
| from statsmodels.tsa.arima.model import ARIMA | |
| from sklearn.metrics import mean_absolute_error, mean_squared_error | |
| # ───────────────────────────────────────────── | |
| # Aesthetic config | |
| # ───────────────────────────────────────────── | |
| STYLE = { | |
| "bg": "#0d1117", | |
| "panel": "#161b22", | |
| "accent": "#f78166", | |
| "accent2": "#58a6ff", | |
| "accent3": "#3fb950", | |
| "warn": "#d29922", | |
| "text": "#e6edf3", | |
| "subtext": "#8b949e", | |
| "grid": "#21262d", | |
| } | |
| def _apply_style(fig, axes_list): | |
| """Apply dark industrial style to all axes.""" | |
| fig.patch.set_facecolor(STYLE["bg"]) | |
| for ax in axes_list: | |
| ax.set_facecolor(STYLE["panel"]) | |
| ax.tick_params(colors=STYLE["subtext"], labelsize=8) | |
| ax.xaxis.label.set_color(STYLE["subtext"]) | |
| ax.yaxis.label.set_color(STYLE["subtext"]) | |
| ax.title.set_color(STYLE["text"]) | |
| for spine in ax.spines.values(): | |
| spine.set_edgecolor(STYLE["grid"]) | |
| ax.grid(color=STYLE["grid"], linewidth=0.5, linestyle="--", alpha=0.7) | |
| # ───────────────────────────────────────────── | |
| # 1. DATA LOADING | |
| # ───────────────────────────────────────────── | |
| def load_data(file_obj): | |
| """ | |
| Load CSV, parse 'date' as datetime index, fill missing values. | |
| Returns cleaned DataFrame. | |
| """ | |
| df = pd.read_csv(file_obj.name if hasattr(file_obj, "name") else file_obj) | |
| # Parse date column | |
| date_col = [c for c in df.columns if "date" in c.lower()] | |
| if not date_col: | |
| raise ValueError("No 'date' column found in CSV.") | |
| df[date_col[0]] = pd.to_datetime(df[date_col[0]]) | |
| df = df.set_index(date_col[0]).sort_index() | |
| # Forward-fill then back-fill missing values | |
| df = df.ffill().bfill() | |
| return df | |
| # ───────────────────────────────────────────── | |
| # 2. STATIONARITY CHECK | |
| # ───────────────────────────────────────────── | |
| def check_stationarity(series): | |
| """ | |
| Augmented Dickey-Fuller test. | |
| Returns (result_string, differenced_series, d_value). | |
| d=0 → already stationary; d=1 → once-differenced. | |
| """ | |
| result = adfuller(series.dropna(), autolag="AIC") | |
| adf_stat, p_value = result[0], result[1] | |
| lines = [ | |
| f"ADF Statistic : {adf_stat:.4f}", | |
| f"p-value : {p_value:.4f}", | |
| f"Critical vals : { {k: f'{v:.3f}' for k, v in result[4].items()} }", | |
| ] | |
| if p_value <= 0.05: | |
| lines.append("✅ Series is STATIONARY (p ≤ 0.05) — no differencing needed.") | |
| return "\n".join(lines), series, 0 | |
| else: | |
| lines.append("⚠️ Series is NON-STATIONARY (p > 0.05) — applying 1st differencing.") | |
| return "\n".join(lines), series.diff().dropna(), 1 | |
| # ───────────────────────────────────────────── | |
| # 3. ARIMAX TRAINING | |
| # ───────────────────────────────────────────── | |
| def train_arimax(endog, exog, d=0): | |
| """ | |
| Fit ARIMAX(p, d, q) model. | |
| Auto-selects best (p, q) by AIC over a small grid search. | |
| Returns fitted model result. | |
| """ | |
| best_aic = np.inf | |
| best_order = (1, d, 1) | |
| best_result = None | |
| # Grid search over small p/q space to keep it fast | |
| for p in range(0, 3): | |
| for q in range(0, 3): | |
| try: | |
| model = ARIMA(endog, exog=exog, order=(p, d, q), | |
| enforce_stationarity=False, | |
| enforce_invertibility=False) | |
| res = model.fit(method_kwargs={"warn_convergence": False}) | |
| if res.aic < best_aic: | |
| best_aic = res.aic | |
| best_order = (p, d, q) | |
| best_result = res | |
| except Exception: | |
| continue | |
| if best_result is None: | |
| # Fallback to simple ARIMA(1,d,1) | |
| model = ARIMA(endog, exog=exog, order=(1, d, 1), | |
| enforce_stationarity=False, enforce_invertibility=False) | |
| best_result = model.fit() | |
| return best_result, best_order | |
| # ───────────────────────────────────────────── | |
| # 4. FORECASTING | |
| # ───────────────────────────────────────────── | |
| def forecast(model_result, steps, exog_future): | |
| """ | |
| Produce out-of-sample forecast for `steps` periods. | |
| exog_future: DataFrame with same columns as training exog, length = steps. | |
| Returns forecast mean Series. | |
| """ | |
| pred = model_result.get_forecast(steps=steps, exog=exog_future) | |
| fc_mean = pred.predicted_mean | |
| fc_ci = pred.conf_int() | |
| return fc_mean, fc_ci | |
| # ───────────────────────────────────────────── | |
| # 5. ANOMALY DETECTION | |
| # ───────────────────────────────────────────── | |
| def detect_anomalies(actual, fitted, k=2.5): | |
| """ | |
| Residual-based anomaly detection. | |
| Flag points where |residual| > mean + k*std. | |
| Returns boolean mask of anomalies. | |
| """ | |
| residuals = actual - fitted | |
| threshold = residuals.mean() + k * residuals.std() | |
| anomalies = residuals.abs() > threshold | |
| return residuals, anomalies | |
| # ───────────────────────────────────────────── | |
| # PLOT HELPERS | |
| # ───────────────────────────────────────────── | |
| def _fig_to_pil(fig): | |
| """Save matplotlib figure to temp file and return filepath (Gradio-compatible).""" | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".png") | |
| fig.savefig(tmp.name, dpi=130, bbox_inches="tight", | |
| facecolor=fig.get_facecolor()) | |
| return tmp.name | |
| def plot_overview(df): | |
| """OT time series + correlation heatmap.""" | |
| feat_cols = [c for c in df.columns if c != "OT"] | |
| fig = plt.figure(figsize=(14, 8), facecolor=STYLE["bg"]) | |
| gs = gridspec.GridSpec(2, 2, figure=fig, hspace=0.45, wspace=0.35) | |
| # --- OT over time --- | |
| ax0 = fig.add_subplot(gs[0, :]) | |
| ax0.plot(df.index, df["OT"], color=STYLE["accent2"], linewidth=0.8, alpha=0.9) | |
| ax0.set_title("Oil Temperature (OT) — Full Series", fontsize=11, fontweight="bold") | |
| ax0.set_ylabel("OT") | |
| # --- Feature lines --- | |
| ax1 = fig.add_subplot(gs[1, 0]) | |
| palette = [STYLE["accent"], STYLE["accent2"], STYLE["accent3"], | |
| STYLE["warn"], "#c9d1d9", "#a371f7"] | |
| for i, col in enumerate(feat_cols): | |
| ax1.plot(df.index, df[col], linewidth=0.6, alpha=0.7, | |
| color=palette[i % len(palette)], label=col) | |
| ax1.set_title("All Load Features", fontsize=10) | |
| ax1.legend(fontsize=6, ncol=2, facecolor=STYLE["panel"], | |
| edgecolor=STYLE["grid"], labelcolor=STYLE["text"]) | |
| # --- Correlation heatmap --- | |
| ax2 = fig.add_subplot(gs[1, 1]) | |
| corr = df.corr() | |
| mask = np.triu(np.ones_like(corr, dtype=bool)) | |
| cmap = sns.diverging_palette(220, 10, as_cmap=True) | |
| sns.heatmap(corr, mask=mask, cmap=cmap, ax=ax2, annot=True, | |
| fmt=".2f", annot_kws={"size": 7}, | |
| linewidths=0.4, linecolor=STYLE["grid"], | |
| cbar_kws={"shrink": 0.7}) | |
| ax2.set_title("Correlation Matrix", fontsize=10) | |
| ax2.tick_params(axis="x", rotation=45, labelsize=7) | |
| ax2.tick_params(axis="y", rotation=0, labelsize=7) | |
| _apply_style(fig, [ax0, ax1]) | |
| plt.tight_layout() | |
| return _fig_to_pil(fig) | |
| def plot_forecast(df, fc_mean, fc_ci, order, mae, rmse): | |
| """In-sample fit + out-of-sample forecast with confidence interval.""" | |
| fig, ax = plt.subplots(figsize=(14, 5), facecolor=STYLE["bg"]) | |
| # Training portion | |
| ax.plot(df.index, df["OT"], color=STYLE["subtext"], | |
| linewidth=0.7, alpha=0.6, label="Actual OT") | |
| # Forecast | |
| ax.plot(fc_mean.index, fc_mean.values, | |
| color=STYLE["accent"], linewidth=1.8, label="Forecast", zorder=5) | |
| ax.fill_between(fc_ci.index, | |
| fc_ci.iloc[:, 0], fc_ci.iloc[:, 1], | |
| color=STYLE["accent"], alpha=0.15, label="95% CI") | |
| # Dividing line | |
| split_t = df.index[-1] | |
| ax.axvline(split_t, color=STYLE["warn"], linewidth=1.2, | |
| linestyle="--", alpha=0.8, label="Forecast start") | |
| ax.set_title( | |
| f"ARIMAX{order} Forecast | MAE={mae:.3f} RMSE={rmse:.3f}", | |
| fontsize=11, fontweight="bold" | |
| ) | |
| ax.set_ylabel("OT") | |
| ax.legend(fontsize=8, facecolor=STYLE["panel"], | |
| edgecolor=STYLE["grid"], labelcolor=STYLE["text"]) | |
| _apply_style(fig, [ax]) | |
| plt.tight_layout() | |
| return _fig_to_pil(fig) | |
| def plot_anomalies(df_ot, fitted, residuals, anomalies): | |
| """Actual vs fitted + residual anomaly plot.""" | |
| fig, axes = plt.subplots(2, 1, figsize=(14, 8), | |
| facecolor=STYLE["bg"], sharex=True) | |
| # Top: actual vs fitted | |
| axes[0].plot(df_ot.index, df_ot.values, | |
| color=STYLE["accent2"], linewidth=0.8, alpha=0.8, label="Actual") | |
| axes[0].plot(fitted.index, fitted.values, | |
| color=STYLE["accent3"], linewidth=0.8, alpha=0.8, label="Fitted") | |
| axes[0].scatter(df_ot.index[anomalies], df_ot.values[anomalies], | |
| color=STYLE["accent"], s=18, zorder=6, | |
| label=f"Anomalies ({anomalies.sum()})", marker="^") | |
| axes[0].set_title("Actual vs Fitted — Anomalies Highlighted", fontsize=11, fontweight="bold") | |
| axes[0].set_ylabel("OT") | |
| axes[0].legend(fontsize=8, facecolor=STYLE["panel"], | |
| edgecolor=STYLE["grid"], labelcolor=STYLE["text"]) | |
| # Bottom: residuals | |
| axes[1].bar(residuals.index, residuals.values, | |
| color=STYLE["accent2"], alpha=0.5, width=0.8) | |
| axes[1].scatter(residuals.index[anomalies], residuals.values[anomalies], | |
| color=STYLE["accent"], s=18, zorder=6, marker="^") | |
| thr_val = residuals.mean() + 2.5 * residuals.std() | |
| axes[1].axhline( thr_val, color=STYLE["accent"], linewidth=1, | |
| linestyle="--", alpha=0.8, label=f"+ threshold ({thr_val:.2f})") | |
| axes[1].axhline(-thr_val, color=STYLE["accent"], linewidth=1, | |
| linestyle="--", alpha=0.8, label=f"- threshold ({-thr_val:.2f})") | |
| axes[1].set_title("Residuals with Anomaly Thresholds", fontsize=10) | |
| axes[1].set_ylabel("Residual") | |
| axes[1].legend(fontsize=7, facecolor=STYLE["panel"], | |
| edgecolor=STYLE["grid"], labelcolor=STYLE["text"]) | |
| _apply_style(fig, axes) | |
| plt.tight_layout() | |
| return _fig_to_pil(fig) | |
| # ───────────────────────────────────────────── | |
| # MAIN PIPELINE (called by Gradio) | |
| # ───────────────────────────────────────────── | |
| EXOG_COLS = ["HUFL", "HULL", "MUFL", "MULL", "LUFL", "LULL"] | |
| def run_pipeline(file_obj, horizon: int): | |
| """ | |
| Full pipeline: load → stationarity → ARIMAX → forecast → anomalies. | |
| Returns (overview_img, forecast_img, anomaly_img, adf_text). | |
| """ | |
| if file_obj is None: | |
| return None, None, None, "❌ Please upload a CSV file." | |
| try: | |
| horizon = int(horizon) | |
| if horizon < 1: | |
| horizon = 1 | |
| # 1. Load data | |
| df = load_data(file_obj) | |
| # Validate required columns | |
| missing = [c for c in EXOG_COLS + ["OT"] if c not in df.columns] | |
| if missing: | |
| return None, None, None, f"❌ Missing columns: {missing}" | |
| # Use at most 2000 rows for speed on free Spaces | |
| if len(df) > 2000: | |
| df = df.iloc[-2000:] | |
| # 2. Overview plot | |
| ov_img = plot_overview(df) | |
| # 3. Stationarity | |
| adf_text, _, d = check_stationarity(df["OT"]) | |
| # 4. Train ARIMAX (use 80% for fit, 20% held for evaluation) | |
| split = int(len(df) * 0.8) | |
| train_df = df.iloc[:split] | |
| test_df = df.iloc[split:] | |
| endog_train = train_df["OT"] | |
| exog_train = train_df[EXOG_COLS] | |
| model_result, best_order = train_arimax(endog_train, exog_train, d=d) | |
| # In-sample fitted values | |
| fitted = model_result.fittedvalues | |
| # Evaluate on test set (if we have enough rows) | |
| if len(test_df) > 0: | |
| exog_test = test_df[EXOG_COLS] | |
| fc_test, _ = forecast(model_result, len(test_df), exog_test) | |
| mae = mean_absolute_error(test_df["OT"], fc_test) | |
| rmse = np.sqrt(mean_squared_error(test_df["OT"], fc_test)) | |
| else: | |
| mae, rmse = 0.0, 0.0 | |
| # 5. Out-of-sample forecast | |
| # Repeat last known exog row for simplicity (flat extrapolation) | |
| last_exog = df[EXOG_COLS].iloc[[-1]] | |
| exog_future = pd.concat([last_exog] * horizon, ignore_index=True) | |
| # Build future datetime index | |
| freq_guess = pd.infer_freq(df.index) or "h" | |
| future_idx = pd.date_range(df.index[-1], periods=horizon + 1, | |
| freq=freq_guess)[1:] | |
| exog_future.index = future_idx | |
| fc_mean, fc_ci = forecast(model_result, horizon, exog_future) | |
| fc_mean.index = future_idx | |
| fc_ci.index = future_idx | |
| fc_img = plot_forecast(df, fc_mean, fc_ci, best_order, mae, rmse) | |
| # 6. Anomaly detection (on training in-sample residuals) | |
| residuals, anomaly_mask = detect_anomalies(endog_train, fitted) | |
| an_img = plot_anomalies(endog_train, fitted, residuals, anomaly_mask) | |
| # Append metrics + order info to ADF text | |
| adf_text += ( | |
| f"\n\n📐 Best ARIMAX order : {best_order}" | |
| f"\n📊 Test MAE : {mae:.4f}" | |
| f"\n📊 Test RMSE : {rmse:.4f}" | |
| f"\n🔴 Anomalies found : {anomaly_mask.sum()} / {len(anomaly_mask)}" | |
| ) | |
| return ov_img, fc_img, an_img, adf_text | |
| except Exception as e: | |
| import traceback | |
| tb = traceback.format_exc() | |
| return None, None, None, f"❌ Error:\n{e}\n\n{tb}" | |
| # ───────────────────────────────────────────── | |
| # GRADIO UI | |
| # ───────────────────────────────────────────── | |
| CSS = """ | |
| /* ── Global reset ── */ | |
| * { box-sizing: border-box; } | |
| body, .gradio-container { | |
| background: #0d1117 !important; | |
| font-family: 'JetBrains Mono', 'Fira Code', monospace !important; | |
| color: #e6edf3 !important; | |
| } | |
| /* ── Header ── */ | |
| .app-header { | |
| text-align: center; | |
| padding: 28px 0 8px; | |
| border-bottom: 1px solid #21262d; | |
| margin-bottom: 20px; | |
| } | |
| .app-header h1 { | |
| font-size: 1.7rem; | |
| font-weight: 700; | |
| color: #f78166; | |
| letter-spacing: -0.5px; | |
| margin: 0; | |
| } | |
| .app-header p { | |
| font-size: 0.82rem; | |
| color: #8b949e; | |
| margin-top: 6px; | |
| } | |
| /* ── Panels ── */ | |
| .gr-panel, .gr-box, .gr-form { | |
| background: #161b22 !important; | |
| border: 1px solid #21262d !important; | |
| border-radius: 8px !important; | |
| } | |
| /* ── Buttons ── */ | |
| button.primary { | |
| background: #f78166 !important; | |
| border: none !important; | |
| color: #0d1117 !important; | |
| font-weight: 700 !important; | |
| letter-spacing: 0.5px; | |
| border-radius: 6px !important; | |
| } | |
| button.primary:hover { | |
| background: #ff9580 !important; | |
| } | |
| /* ── Labels ── */ | |
| label, .gr-label { | |
| color: #8b949e !important; | |
| font-size: 0.78rem !important; | |
| text-transform: uppercase; | |
| letter-spacing: 0.8px; | |
| } | |
| /* ── Textbox (ADF output) ── */ | |
| textarea, .gr-textbox textarea { | |
| background: #0d1117 !important; | |
| color: #3fb950 !important; | |
| border: 1px solid #21262d !important; | |
| font-family: 'JetBrains Mono', monospace !important; | |
| font-size: 0.8rem !important; | |
| } | |
| /* ── Tab strip ── */ | |
| .tab-nav button { | |
| color: #8b949e !important; | |
| border-bottom: 2px solid transparent !important; | |
| } | |
| .tab-nav button.selected { | |
| color: #58a6ff !important; | |
| border-bottom-color: #58a6ff !important; | |
| } | |
| """ | |
| with gr.Blocks(css=CSS, title="⚡ Transformer OT Forecaster") as demo: | |
| gr.HTML(""" | |
| <div class="app-header"> | |
| <h1>⚡ Transformer Oil Temperature Forecaster</h1> | |
| <p>ARIMAX · Anomaly Detection · Time Series Analysis — Upload ETT-style CSV data to begin</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| # ── Left column: controls ── | |
| with gr.Column(scale=1, min_width=260): | |
| gr.Markdown("### 📂 Data Input") | |
| file_input = gr.File( | |
| label="Upload CSV (date, HUFL, HULL, MUFL, MULL, LUFL, LULL, OT)", | |
| file_types=[".csv"] | |
| ) | |
| horizon_input = gr.Number( | |
| label="Forecast Horizon (steps)", | |
| value=24, minimum=1, maximum=500, step=1, | |
| precision=0 | |
| ) | |
| run_btn = gr.Button("▶ Run Analysis", variant="primary") | |
| gr.Markdown(""" | |
| --- | |
| **Model:** ARIMAX (auto p,d,q) | |
| **Endog:** OT (oil temperature) | |
| **Exog:** HUFL, HULL, MUFL, MULL, LUFL, LULL | |
| **Anomaly:** Residual ± 2.5σ threshold | |
| **Eval:** MAE + RMSE on 20% hold-out | |
| """) | |
| # ── Right column: outputs ── | |
| with gr.Column(scale=3): | |
| with gr.Tabs(): | |
| with gr.TabItem("📈 Overview"): | |
| overview_img = gr.Image( | |
| label="Time Series Overview & Correlations", | |
| type="filepath" | |
| ) | |
| with gr.TabItem("🔮 Forecast"): | |
| forecast_img = gr.Image( | |
| label="ARIMAX Forecast", | |
| type="filepath" | |
| ) | |
| with gr.TabItem("🚨 Anomalies"): | |
| anomaly_img = gr.Image( | |
| label="Anomaly Detection", | |
| type="filepath" | |
| ) | |
| with gr.TabItem("📋 ADF Report"): | |
| adf_output = gr.Textbox( | |
| label="Stationarity Test + Model Metrics", | |
| lines=14, max_lines=20 | |
| ) | |
| # Wire up | |
| run_btn.click( | |
| fn=run_pipeline, | |
| inputs=[file_input, horizon_input], | |
| outputs=[overview_img, forecast_img, anomaly_img, adf_output], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |