""" Transformer Oil Temperature Forecasting & Anomaly Detection Using ARIMAX model with Gradio UI for Hugging Face Spaces """ import warnings warnings.filterwarnings("ignore") import pandas as pd import numpy as np import matplotlib matplotlib.use("Agg") # Non-interactive backend for server environments import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec import seaborn as sns import io import gradio as gr import tempfile from statsmodels.tsa.stattools import adfuller from statsmodels.tsa.arima.model import ARIMA from sklearn.metrics import mean_absolute_error, mean_squared_error # ───────────────────────────────────────────── # Aesthetic config # ───────────────────────────────────────────── STYLE = { "bg": "#0d1117", "panel": "#161b22", "accent": "#f78166", "accent2": "#58a6ff", "accent3": "#3fb950", "warn": "#d29922", "text": "#e6edf3", "subtext": "#8b949e", "grid": "#21262d", } def _apply_style(fig, axes_list): """Apply dark industrial style to all axes.""" fig.patch.set_facecolor(STYLE["bg"]) for ax in axes_list: ax.set_facecolor(STYLE["panel"]) ax.tick_params(colors=STYLE["subtext"], labelsize=8) ax.xaxis.label.set_color(STYLE["subtext"]) ax.yaxis.label.set_color(STYLE["subtext"]) ax.title.set_color(STYLE["text"]) for spine in ax.spines.values(): spine.set_edgecolor(STYLE["grid"]) ax.grid(color=STYLE["grid"], linewidth=0.5, linestyle="--", alpha=0.7) # ───────────────────────────────────────────── # 1. DATA LOADING # ───────────────────────────────────────────── def load_data(file_obj): """ Load CSV, parse 'date' as datetime index, fill missing values. Returns cleaned DataFrame. """ df = pd.read_csv(file_obj.name if hasattr(file_obj, "name") else file_obj) # Parse date column date_col = [c for c in df.columns if "date" in c.lower()] if not date_col: raise ValueError("No 'date' column found in CSV.") df[date_col[0]] = pd.to_datetime(df[date_col[0]]) df = df.set_index(date_col[0]).sort_index() # Forward-fill then back-fill missing values df = df.ffill().bfill() return df # ───────────────────────────────────────────── # 2. STATIONARITY CHECK # ───────────────────────────────────────────── def check_stationarity(series): """ Augmented Dickey-Fuller test. Returns (result_string, differenced_series, d_value). d=0 → already stationary; d=1 → once-differenced. """ result = adfuller(series.dropna(), autolag="AIC") adf_stat, p_value = result[0], result[1] lines = [ f"ADF Statistic : {adf_stat:.4f}", f"p-value : {p_value:.4f}", f"Critical vals : { {k: f'{v:.3f}' for k, v in result[4].items()} }", ] if p_value <= 0.05: lines.append("✅ Series is STATIONARY (p ≤ 0.05) — no differencing needed.") return "\n".join(lines), series, 0 else: lines.append("⚠️ Series is NON-STATIONARY (p > 0.05) — applying 1st differencing.") return "\n".join(lines), series.diff().dropna(), 1 # ───────────────────────────────────────────── # 3. ARIMAX TRAINING # ───────────────────────────────────────────── def train_arimax(endog, exog, d=0): """ Fit ARIMAX(p, d, q) model. Auto-selects best (p, q) by AIC over a small grid search. Returns fitted model result. """ best_aic = np.inf best_order = (1, d, 1) best_result = None # Grid search over small p/q space to keep it fast for p in range(0, 3): for q in range(0, 3): try: model = ARIMA(endog, exog=exog, order=(p, d, q), enforce_stationarity=False, enforce_invertibility=False) res = model.fit(method_kwargs={"warn_convergence": False}) if res.aic < best_aic: best_aic = res.aic best_order = (p, d, q) best_result = res except Exception: continue if best_result is None: # Fallback to simple ARIMA(1,d,1) model = ARIMA(endog, exog=exog, order=(1, d, 1), enforce_stationarity=False, enforce_invertibility=False) best_result = model.fit() return best_result, best_order # ───────────────────────────────────────────── # 4. FORECASTING # ───────────────────────────────────────────── def forecast(model_result, steps, exog_future): """ Produce out-of-sample forecast for `steps` periods. exog_future: DataFrame with same columns as training exog, length = steps. Returns forecast mean Series. """ pred = model_result.get_forecast(steps=steps, exog=exog_future) fc_mean = pred.predicted_mean fc_ci = pred.conf_int() return fc_mean, fc_ci # ───────────────────────────────────────────── # 5. ANOMALY DETECTION # ───────────────────────────────────────────── def detect_anomalies(actual, fitted, k=2.5): """ Residual-based anomaly detection. Flag points where |residual| > mean + k*std. Returns boolean mask of anomalies. """ residuals = actual - fitted threshold = residuals.mean() + k * residuals.std() anomalies = residuals.abs() > threshold return residuals, anomalies # ───────────────────────────────────────────── # PLOT HELPERS # ───────────────────────────────────────────── def _fig_to_pil(fig): """Save matplotlib figure to temp file and return filepath (Gradio-compatible).""" tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".png") fig.savefig(tmp.name, dpi=130, bbox_inches="tight", facecolor=fig.get_facecolor()) return tmp.name def plot_overview(df): """OT time series + correlation heatmap.""" feat_cols = [c for c in df.columns if c != "OT"] fig = plt.figure(figsize=(14, 8), facecolor=STYLE["bg"]) gs = gridspec.GridSpec(2, 2, figure=fig, hspace=0.45, wspace=0.35) # --- OT over time --- ax0 = fig.add_subplot(gs[0, :]) ax0.plot(df.index, df["OT"], color=STYLE["accent2"], linewidth=0.8, alpha=0.9) ax0.set_title("Oil Temperature (OT) — Full Series", fontsize=11, fontweight="bold") ax0.set_ylabel("OT") # --- Feature lines --- ax1 = fig.add_subplot(gs[1, 0]) palette = [STYLE["accent"], STYLE["accent2"], STYLE["accent3"], STYLE["warn"], "#c9d1d9", "#a371f7"] for i, col in enumerate(feat_cols): ax1.plot(df.index, df[col], linewidth=0.6, alpha=0.7, color=palette[i % len(palette)], label=col) ax1.set_title("All Load Features", fontsize=10) ax1.legend(fontsize=6, ncol=2, facecolor=STYLE["panel"], edgecolor=STYLE["grid"], labelcolor=STYLE["text"]) # --- Correlation heatmap --- ax2 = fig.add_subplot(gs[1, 1]) corr = df.corr() mask = np.triu(np.ones_like(corr, dtype=bool)) cmap = sns.diverging_palette(220, 10, as_cmap=True) sns.heatmap(corr, mask=mask, cmap=cmap, ax=ax2, annot=True, fmt=".2f", annot_kws={"size": 7}, linewidths=0.4, linecolor=STYLE["grid"], cbar_kws={"shrink": 0.7}) ax2.set_title("Correlation Matrix", fontsize=10) ax2.tick_params(axis="x", rotation=45, labelsize=7) ax2.tick_params(axis="y", rotation=0, labelsize=7) _apply_style(fig, [ax0, ax1]) plt.tight_layout() return _fig_to_pil(fig) def plot_forecast(df, fc_mean, fc_ci, order, mae, rmse): """In-sample fit + out-of-sample forecast with confidence interval.""" fig, ax = plt.subplots(figsize=(14, 5), facecolor=STYLE["bg"]) # Training portion ax.plot(df.index, df["OT"], color=STYLE["subtext"], linewidth=0.7, alpha=0.6, label="Actual OT") # Forecast ax.plot(fc_mean.index, fc_mean.values, color=STYLE["accent"], linewidth=1.8, label="Forecast", zorder=5) ax.fill_between(fc_ci.index, fc_ci.iloc[:, 0], fc_ci.iloc[:, 1], color=STYLE["accent"], alpha=0.15, label="95% CI") # Dividing line split_t = df.index[-1] ax.axvline(split_t, color=STYLE["warn"], linewidth=1.2, linestyle="--", alpha=0.8, label="Forecast start") ax.set_title( f"ARIMAX{order} Forecast | MAE={mae:.3f} RMSE={rmse:.3f}", fontsize=11, fontweight="bold" ) ax.set_ylabel("OT") ax.legend(fontsize=8, facecolor=STYLE["panel"], edgecolor=STYLE["grid"], labelcolor=STYLE["text"]) _apply_style(fig, [ax]) plt.tight_layout() return _fig_to_pil(fig) def plot_anomalies(df_ot, fitted, residuals, anomalies): """Actual vs fitted + residual anomaly plot.""" fig, axes = plt.subplots(2, 1, figsize=(14, 8), facecolor=STYLE["bg"], sharex=True) # Top: actual vs fitted axes[0].plot(df_ot.index, df_ot.values, color=STYLE["accent2"], linewidth=0.8, alpha=0.8, label="Actual") axes[0].plot(fitted.index, fitted.values, color=STYLE["accent3"], linewidth=0.8, alpha=0.8, label="Fitted") axes[0].scatter(df_ot.index[anomalies], df_ot.values[anomalies], color=STYLE["accent"], s=18, zorder=6, label=f"Anomalies ({anomalies.sum()})", marker="^") axes[0].set_title("Actual vs Fitted — Anomalies Highlighted", fontsize=11, fontweight="bold") axes[0].set_ylabel("OT") axes[0].legend(fontsize=8, facecolor=STYLE["panel"], edgecolor=STYLE["grid"], labelcolor=STYLE["text"]) # Bottom: residuals axes[1].bar(residuals.index, residuals.values, color=STYLE["accent2"], alpha=0.5, width=0.8) axes[1].scatter(residuals.index[anomalies], residuals.values[anomalies], color=STYLE["accent"], s=18, zorder=6, marker="^") thr_val = residuals.mean() + 2.5 * residuals.std() axes[1].axhline( thr_val, color=STYLE["accent"], linewidth=1, linestyle="--", alpha=0.8, label=f"+ threshold ({thr_val:.2f})") axes[1].axhline(-thr_val, color=STYLE["accent"], linewidth=1, linestyle="--", alpha=0.8, label=f"- threshold ({-thr_val:.2f})") axes[1].set_title("Residuals with Anomaly Thresholds", fontsize=10) axes[1].set_ylabel("Residual") axes[1].legend(fontsize=7, facecolor=STYLE["panel"], edgecolor=STYLE["grid"], labelcolor=STYLE["text"]) _apply_style(fig, axes) plt.tight_layout() return _fig_to_pil(fig) # ───────────────────────────────────────────── # MAIN PIPELINE (called by Gradio) # ───────────────────────────────────────────── EXOG_COLS = ["HUFL", "HULL", "MUFL", "MULL", "LUFL", "LULL"] def run_pipeline(file_obj, horizon: int): """ Full pipeline: load → stationarity → ARIMAX → forecast → anomalies. Returns (overview_img, forecast_img, anomaly_img, adf_text). """ if file_obj is None: return None, None, None, "❌ Please upload a CSV file." try: horizon = int(horizon) if horizon < 1: horizon = 1 # 1. Load data df = load_data(file_obj) # Validate required columns missing = [c for c in EXOG_COLS + ["OT"] if c not in df.columns] if missing: return None, None, None, f"❌ Missing columns: {missing}" # Use at most 2000 rows for speed on free Spaces if len(df) > 2000: df = df.iloc[-2000:] # 2. Overview plot ov_img = plot_overview(df) # 3. Stationarity adf_text, _, d = check_stationarity(df["OT"]) # 4. Train ARIMAX (use 80% for fit, 20% held for evaluation) split = int(len(df) * 0.8) train_df = df.iloc[:split] test_df = df.iloc[split:] endog_train = train_df["OT"] exog_train = train_df[EXOG_COLS] model_result, best_order = train_arimax(endog_train, exog_train, d=d) # In-sample fitted values fitted = model_result.fittedvalues # Evaluate on test set (if we have enough rows) if len(test_df) > 0: exog_test = test_df[EXOG_COLS] fc_test, _ = forecast(model_result, len(test_df), exog_test) mae = mean_absolute_error(test_df["OT"], fc_test) rmse = np.sqrt(mean_squared_error(test_df["OT"], fc_test)) else: mae, rmse = 0.0, 0.0 # 5. Out-of-sample forecast # Repeat last known exog row for simplicity (flat extrapolation) last_exog = df[EXOG_COLS].iloc[[-1]] exog_future = pd.concat([last_exog] * horizon, ignore_index=True) # Build future datetime index freq_guess = pd.infer_freq(df.index) or "h" future_idx = pd.date_range(df.index[-1], periods=horizon + 1, freq=freq_guess)[1:] exog_future.index = future_idx fc_mean, fc_ci = forecast(model_result, horizon, exog_future) fc_mean.index = future_idx fc_ci.index = future_idx fc_img = plot_forecast(df, fc_mean, fc_ci, best_order, mae, rmse) # 6. Anomaly detection (on training in-sample residuals) residuals, anomaly_mask = detect_anomalies(endog_train, fitted) an_img = plot_anomalies(endog_train, fitted, residuals, anomaly_mask) # Append metrics + order info to ADF text adf_text += ( f"\n\n📐 Best ARIMAX order : {best_order}" f"\n📊 Test MAE : {mae:.4f}" f"\n📊 Test RMSE : {rmse:.4f}" f"\n🔴 Anomalies found : {anomaly_mask.sum()} / {len(anomaly_mask)}" ) return ov_img, fc_img, an_img, adf_text except Exception as e: import traceback tb = traceback.format_exc() return None, None, None, f"❌ Error:\n{e}\n\n{tb}" # ───────────────────────────────────────────── # GRADIO UI # ───────────────────────────────────────────── CSS = """ /* ── Global reset ── */ * { box-sizing: border-box; } body, .gradio-container { background: #0d1117 !important; font-family: 'JetBrains Mono', 'Fira Code', monospace !important; color: #e6edf3 !important; } /* ── Header ── */ .app-header { text-align: center; padding: 28px 0 8px; border-bottom: 1px solid #21262d; margin-bottom: 20px; } .app-header h1 { font-size: 1.7rem; font-weight: 700; color: #f78166; letter-spacing: -0.5px; margin: 0; } .app-header p { font-size: 0.82rem; color: #8b949e; margin-top: 6px; } /* ── Panels ── */ .gr-panel, .gr-box, .gr-form { background: #161b22 !important; border: 1px solid #21262d !important; border-radius: 8px !important; } /* ── Buttons ── */ button.primary { background: #f78166 !important; border: none !important; color: #0d1117 !important; font-weight: 700 !important; letter-spacing: 0.5px; border-radius: 6px !important; } button.primary:hover { background: #ff9580 !important; } /* ── Labels ── */ label, .gr-label { color: #8b949e !important; font-size: 0.78rem !important; text-transform: uppercase; letter-spacing: 0.8px; } /* ── Textbox (ADF output) ── */ textarea, .gr-textbox textarea { background: #0d1117 !important; color: #3fb950 !important; border: 1px solid #21262d !important; font-family: 'JetBrains Mono', monospace !important; font-size: 0.8rem !important; } /* ── Tab strip ── */ .tab-nav button { color: #8b949e !important; border-bottom: 2px solid transparent !important; } .tab-nav button.selected { color: #58a6ff !important; border-bottom-color: #58a6ff !important; } """ with gr.Blocks(css=CSS, title="⚡ Transformer OT Forecaster") as demo: gr.HTML("""
ARIMAX · Anomaly Detection · Time Series Analysis — Upload ETT-style CSV data to begin