forecasting / app.py
adamkahle's picture
Update app.py
8c39017 verified
import os
import tempfile
import traceback
from datetime import datetime
import gradio as gr
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from statsforecast import StatsForecast
from statsforecast.models import (
HistoricAverage,
Naive,
SeasonalNaive,
WindowAverage,
SeasonalWindowAverage,
AutoETS,
AutoARIMA,
)
from utilsforecast.evaluation import evaluate
from utilsforecast.losses import mae, mape, mase, mse, rmse, smape
REQUIRED_COLS = ["unique_id", "ds", "y"]
PLOT_TAIL_POINTS = 300
def _to_int(x, name):
if x is None:
raise ValueError(f"{name} is None")
return int(round(float(x)))
def _ensure_pandas_df(obj):
if obj is None:
return None
if isinstance(obj, pd.DataFrame):
return obj
if hasattr(obj, "to_pandas"):
return obj.to_pandas()
return pd.DataFrame(obj)
def _tail_history(df_y, n=PLOT_TAIL_POINTS):
if df_y is None or df_y.empty:
return df_y
return (
df_y.sort_values(["unique_id", "ds"])
.groupby("unique_id", as_index=False, group_keys=False)
.tail(n)
.reset_index(drop=True)
)
def load_training_data(file_obj):
if file_obj is None:
return None, [], "Please upload a CSV file."
try:
path = file_obj.name if hasattr(file_obj, "name") else file_obj
df = pd.read_csv(path)
missing = [c for c in REQUIRED_COLS if c not in df.columns]
if missing:
return None, [], f"Missing required columns: {', '.join(missing)}"
df["ds"] = pd.to_datetime(df["ds"])
df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True)
if df["y"].isna().any():
return None, [], "Data contains missing values in the 'y' column."
candidate_xcols = [c for c in df.columns if c not in REQUIRED_COLS]
return df, candidate_xcols, "Training data loaded successfully!"
except Exception as e:
return None, [], f"Error loading data: {type(e).__name__}: {e}"
def load_future_x(file_obj, xcols):
if not xcols:
return None, "No predictors selected."
if file_obj is None:
return None, "Future predictors CSV (X_df) is required to use predictors."
try:
path = file_obj.name if hasattr(file_obj, "name") else file_obj
X_df = pd.read_csv(path)
required = ["unique_id", "ds"] + list(xcols)
missing = [c for c in required if c not in X_df.columns]
if missing:
return None, f"Future predictors CSV missing: {', '.join(missing)}"
X_df["ds"] = pd.to_datetime(X_df["ds"])
X_df = X_df.sort_values(["unique_id", "ds"]).reset_index(drop=True)
if X_df.duplicated(["unique_id", "ds"]).any():
return None, "Future predictors CSV has duplicate (unique_id, ds) rows."
return X_df, "Future predictors loaded."
except Exception as e:
return None, f"Error loading future predictors: {type(e).__name__}: {e}"
def align_future_x_to_horizon(df_train, X_future, xcols, h):
last_ds = df_train.groupby("unique_id", as_index=False)["ds"].max().rename(columns={"ds": "last_ds"})
X = X_future.merge(last_ds, on="unique_id", how="inner")
X = X[X["ds"] > X["last_ds"]].copy()
X = X.sort_values(["unique_id", "ds"]).reset_index(drop=True)
X_h = X.groupby("unique_id", as_index=False).head(h).copy()
counts = X_h.groupby("unique_id")["ds"].size()
missing_ids = sorted(set(last_ds["unique_id"]) - set(counts.index))
short_ids = sorted([uid for uid, c in counts.items() if c < h])
if missing_ids or short_ids:
parts = []
if missing_ids:
parts.append(f"Missing future X rows for: {', '.join(missing_ids)}")
if short_ids:
parts.append(f"Not enough future X rows (need {h}) for: {', '.join(short_ids)}")
raise ValueError(" | ".join(parts))
X_h = X_h[["unique_id", "ds"] + list(xcols)]
for c in xcols:
X_h[c] = pd.to_numeric(X_h[c], errors="raise")
return X_h.reset_index(drop=True)
def _normalize_timegpt_output(tgpt_raw, df_y, h):
tgpt = _ensure_pandas_df(tgpt_raw)
if tgpt is None or tgpt.empty:
raise ValueError("TimeGPT returned empty output.")
if "unique_id" not in tgpt.columns or "ds" not in tgpt.columns:
raise ValueError(f"TimeGPT output missing required columns. Got: {list(tgpt.columns)}")
tgpt["ds"] = pd.to_datetime(tgpt["ds"], errors="coerce")
if tgpt["ds"].isna().any():
raise ValueError("TimeGPT output has invalid ds values.")
out_col = None
for c in tgpt.columns:
if c.lower() in ("timegpt", "yhat", "y_hat", "forecast"):
out_col = c
break
if out_col is None:
non_id = [c for c in tgpt.columns if c not in ["unique_id", "ds"]]
if non_id:
out_col = non_id[0]
if out_col is None:
raise ValueError("TimeGPT output has no forecast column.")
if out_col != "timegpt":
tgpt = tgpt.rename(columns={out_col: "timegpt"})
last_ds = df_y.groupby("unique_id", as_index=False)["ds"].max().rename(columns={"ds": "last_ds"})
tgpt = tgpt.merge(last_ds, on="unique_id", how="inner")
tgpt = tgpt[tgpt["ds"] > tgpt["last_ds"]].copy()
tgpt = tgpt.sort_values(["unique_id", "ds"]).reset_index(drop=True)
tgpt = tgpt.groupby("unique_id", as_index=False).head(h).copy()
counts = tgpt.groupby("unique_id")["ds"].size()
missing_ids = sorted(set(last_ds["unique_id"]) - set(counts.index))
short_ids = sorted([uid for uid, c in counts.items() if c < h])
if missing_ids or short_ids:
parts = []
if missing_ids:
parts.append(f"Missing TimeGPT future rows for: {', '.join(missing_ids)}")
if short_ids:
parts.append(f"Not enough TimeGPT rows (need {h}) for: {', '.join(short_ids)}")
raise ValueError(" | ".join(parts))
tgpt = tgpt[["unique_id", "ds", "timegpt"]].copy()
return tgpt
def _ensure_forecast_cols(df_fc, colname):
if df_fc is None or df_fc.empty:
return None
df_fc = _ensure_pandas_df(df_fc)
if df_fc is None or df_fc.empty:
return None
if colname not in df_fc.columns:
non_id = [c for c in df_fc.columns if c not in ["unique_id", "ds"]]
if not non_id:
raise ValueError(f"Forecast output missing forecast column. Got: {list(df_fc.columns)}")
df_fc = df_fc.rename(columns={non_id[0]: colname})
df_fc = df_fc[["unique_id", "ds", colname]].copy()
df_fc["ds"] = pd.to_datetime(df_fc["ds"])
return df_fc
def create_future_plot(fcst_df, original_df, title="Forecast"):
if fcst_df is None or fcst_df.empty:
return None
plt.figure(figsize=(12, 7))
forecast_cols = [c for c in fcst_df.columns if c not in ["unique_id", "ds"]]
unique_ids = fcst_df["unique_id"].unique()
colors = plt.cm.tab10.colors
for uid in unique_ids:
hist = original_df[original_df["unique_id"] == uid]
f = fcst_df[fcst_df["unique_id"] == uid]
plt.plot(hist["ds"], hist["y"], "k-", linewidth=2, label=f"{uid} (Historical)")
if not f.empty:
start = f["ds"].min()
plt.axvline(x=start, color="gray", linestyle="--", alpha=0.5)
plt.axvspan(start, f["ds"].max(), alpha=0.1)
for j, col in enumerate(forecast_cols):
name = col.replace("_", " ").title()
if name == "Timegpt":
name = "TimeGPT"
plt.plot(
f["ds"],
f[col],
linestyle="--",
linewidth=1.5,
color=colors[j % len(colors)],
label=name,
)
plt.title(title, fontsize=16)
plt.xlabel("Date")
plt.ylabel("Value")
plt.grid(True, alpha=0.3)
plt.legend(loc="upper center", bbox_to_anchor=(0.5, -0.15), ncol=3, fontsize=10)
plt.tight_layout(rect=[0, 0.05, 1, 0.95])
fig = plt.gcf()
fig.autofmt_xdate()
plt.close(fig)
return fig
def export_results(eval_df, future_df):
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
temp_dir = tempfile.mkdtemp()
out = []
if eval_df is not None and not eval_df.empty:
p = os.path.join(temp_dir, f"evaluation_metrics_{ts}.csv")
eval_df.to_csv(p, index=False)
out.append(p)
if future_df is not None and not future_df.empty:
p = os.path.join(temp_dir, f"future_forecasts_{ts}.csv")
future_df.to_csv(p, index=False)
out.append(p)
return out
def run_forecast(
train_file,
freq,
season_length,
cv_h,
cv_step_size,
cv_windows,
future_h,
loss_name,
run_cv,
use_histavg,
use_naive,
use_snaive,
use_winavg,
use_swinavg,
use_autoets,
use_autoarima,
use_timegpt,
nixtla_api_key,
use_nf_lstm,
use_tc_prophet,
xcols,
future_x_file,
last_eval,
last_future,
last_files,
last_train_tail,
):
def rebuild_plot():
if isinstance(last_future, pd.DataFrame) and isinstance(last_train_tail, pd.DataFrame) and (not last_future.empty):
return create_future_plot(last_future, last_train_tail, title="Forecast")
return None
def keep(msg):
return (
last_eval,
rebuild_plot(),
last_future,
msg,
last_files,
last_eval,
last_future,
last_files,
last_train_tail,
)
try:
season_length = _to_int(season_length, "season_length")
cv_h = _to_int(cv_h, "cv_h")
cv_step_size = _to_int(cv_step_size, "cv_step_size")
cv_windows = _to_int(cv_windows, "cv_windows")
future_h = _to_int(future_h, "future_h")
df, candidate_xcols, msg = load_training_data(train_file)
if df is None:
return keep(msg)
df_y = df[["unique_id", "ds", "y"]].copy()
train_tail = _tail_history(df_y, n=PLOT_TAIL_POINTS)
use_exog = bool(xcols)
needs_future_x = use_exog and (use_timegpt or use_nf_lstm or use_autoarima)
X_future_h = None
df_exog = None
if needs_future_x:
X_future, xmsg = load_future_x(future_x_file, xcols)
if X_future is None:
return keep(xmsg)
try:
X_future_h = align_future_x_to_horizon(df_y, X_future, xcols, future_h)
except Exception as e:
return keep(f"Future predictors invalid: {type(e).__name__}: {e}")
if use_exog:
df_exog = df[["unique_id", "ds", "y"] + list(xcols)].copy()
for c in xcols:
df_exog[c] = pd.to_numeric(df_exog[c], errors="raise")
loss_fn = {
"rmse": rmse,
"mae": mae,
"mse": mse,
"mape": mape,
"smape": smape,
"mase": mase,
}.get(loss_name, rmse)
uni_models = []
exog_models = []
if use_histavg:
uni_models.append(HistoricAverage(alias="hist_avg"))
if use_naive:
uni_models.append(Naive(alias="naive"))
if use_snaive:
uni_models.append(SeasonalNaive(alias="seasonal_naive", season_length=season_length))
if use_winavg:
uni_models.append(WindowAverage(alias="window_avg", window_size=season_length))
if use_swinavg:
uni_models.append(SeasonalWindowAverage(alias="seasonal_window_avg", season_length=season_length))
if use_autoets:
uni_models.append(AutoETS(alias="autoets", season_length=season_length))
if use_autoarima:
m = AutoARIMA(alias="autoarima", season_length=season_length)
if use_exog:
exog_models.append(m)
else:
uni_models.append(m)
eval_parts = []
future_parts = []
if uni_models:
sf_u = StatsForecast(models=uni_models, freq=freq, n_jobs=1)
if run_cv:
cv_u = sf_u.cross_validation(df=df_y, h=cv_h, step_size=cv_step_size, n_windows=cv_windows)
ev_u = evaluate(cv_u, metrics=[loss_fn])
ev_u["pipeline"] = "univariate"
eval_parts.append(ev_u)
fc_u = sf_u.forecast(df=df_y, h=future_h)
fc_u = _ensure_forecast_cols(fc_u, "statsforecast")
future_parts.append(fc_u)
if exog_models:
if not use_exog or X_future_h is None or df_exog is None:
return keep("Exogenous model selected but predictors/X_df are missing.")
sf_x = StatsForecast(models=exog_models, freq=freq, n_jobs=1)
fc_x = sf_x.forecast(df=df_exog, h=future_h, X_df=X_future_h)
fc_x = _ensure_forecast_cols(fc_x, "autoarima")
future_parts.append(fc_x)
if use_timegpt:
if not nixtla_api_key:
return keep("TimeGPT selected but no Nixtla API key provided.")
if use_exog and X_future_h is None:
return keep("TimeGPT predictors selected but X_df is missing/invalid.")
try:
from nixtla import NixtlaClient
except Exception as e:
return keep(f"TimeGPT selected but nixtla import failed: {type(e).__name__}: {e}")
try:
client = NixtlaClient(api_key=nixtla_api_key)
df_tgpt = df_exog if use_exog else df_y
tgpt_raw = client.forecast(
df=df_tgpt,
h=future_h,
freq=freq,
X_df=X_future_h if use_exog else None,
hist_exog_list=list(xcols) if use_exog else None,
)
tgpt = _normalize_timegpt_output(tgpt_raw, df_y, future_h)
future_parts.append(tgpt)
except BaseException as e:
return keep(f"TimeGPT failed: {type(e).__name__}: {e}")
if use_nf_lstm:
if use_exog and X_future_h is None:
return keep("LSTM selected with predictors but X_df is missing/invalid.")
try:
from neuralforecast import NeuralForecast
from neuralforecast.models import LSTM
except Exception as e:
return keep(f"LSTM selected but neuralforecast import failed: {type(e).__name__}: {e}")
try:
nf_df = df_exog if use_exog else df_y
futr_df = None
futr_exog_list = None
if use_exog:
futr_df = X_future_h.copy()
futr_exog_list = list(xcols)
model = LSTM(
h=future_h,
max_steps=80,
input_size=max(2 * future_h, 30),
encoder_hidden_size=32,
decoder_hidden_size=32,
batch_size=16,
futr_exog_list=futr_exog_list,
alias="lstm",
)
nf = NeuralForecast(models=[model], freq=freq)
nf.fit(df=nf_df)
pred = nf.predict(futr_df=futr_df).reset_index(drop=False) if use_exog else nf.predict().reset_index(drop=False)
cols = [c for c in pred.columns if c not in ["unique_id", "ds"]]
if not cols:
return keep("LSTM produced no forecast columns.")
main_col = cols[0]
lstm_df = pred[["unique_id", "ds", main_col]].rename(columns={main_col: "lstm"})
lstm_df["ds"] = pd.to_datetime(lstm_df["ds"])
future_parts.append(lstm_df)
except Exception as e:
return keep(f"LSTM failed: {type(e).__name__}: {e}")
if use_tc_prophet:
try:
from timecopilot.models.prophet import Prophet
except Exception as e:
return keep(f"Prophet selected but timecopilot import failed: {type(e).__name__}: {e}")
try:
p = Prophet(alias="prophet")
prop = p.forecast(df=df_y, h=future_h, freq=freq)
prop = _ensure_forecast_cols(prop, "prophet")
future_parts.append(prop)
except Exception as e:
return keep(f"Prophet failed: {type(e).__name__}: {e}")
if not future_parts and not eval_parts:
return keep("No models selected.")
eval_df = pd.concat(eval_parts, ignore_index=True) if eval_parts else None
future_df = future_parts[0].copy()
for part in future_parts[1:]:
overlap = set(future_df.columns).intersection(set(part.columns)) - {"unique_id", "ds"}
future_df = future_df.merge(part.drop(columns=list(overlap), errors="ignore"), on=["unique_id", "ds"], how="outer")
future_df = future_df.sort_values(["unique_id", "ds"]).reset_index(drop=True)
fig = create_future_plot(future_df, train_tail, title="Forecast")
files = export_results(eval_df, future_df)
status = msg
if use_exog:
status += f" | Predictors: {', '.join(xcols)}"
return (
eval_df,
fig,
future_df,
status,
files,
eval_df,
future_df,
files,
train_tail,
)
except Exception as e:
print(traceback.format_exc())
return keep(f"Run failed: {type(e).__name__}: {e}")
def on_train_upload(file_obj):
df, xchoices, msg = load_training_data(file_obj)
return msg, gr.update(choices=xchoices, value=[])
with gr.Blocks(title="Forecasting Demo") as demo:
gr.Markdown(
"""
# Forecasting Demo
Training CSV required columns: unique_id, ds, y
Optional predictors: extra columns in training CSV + X_df for horizon.
"""
)
with gr.Row():
train_file = gr.File(label="Upload training CSV", file_types=[".csv"])
status = gr.Textbox(label="Status", value="Upload a CSV to begin.", interactive=False)
xcols = gr.Dropdown(choices=[], multiselect=True, label="Predictor columns (optional)")
future_x_file = gr.File(label="Upload future predictors CSV (X_df)", file_types=[".csv"])
with gr.Accordion("Forecast settings", open=True):
with gr.Row():
freq = gr.Dropdown(choices=["H", "D", "B", "WS", "MS", "QS", "YS"], value="D", label="Frequency")
season_length = gr.Number(value=7, label="Season length", precision=0)
with gr.Row():
cv_h = gr.Number(value=14, label="CV horizon (h)", precision=0)
cv_step_size = gr.Number(value=1, label="CV step size", precision=0)
cv_windows = gr.Number(value=3, label="CV windows", precision=0)
future_h = gr.Number(value=30, label="Future forecast horizon", precision=0)
loss_name = gr.Dropdown(choices=["rmse", "mae", "mse", "mape", "smape", "mase"], value="rmse", label="Metric")
run_cv = gr.Checkbox(value=False, label="Run cross-validation (slower)")
with gr.Accordion("StatsForecast models", open=True):
with gr.Row():
use_histavg = gr.Checkbox(value=False, label="HistoricAverage")
use_naive = gr.Checkbox(value=True, label="Naive")
use_snaive = gr.Checkbox(value=False, label="SeasonalNaive")
use_winavg = gr.Checkbox(value=False, label="WindowAverage")
use_swinavg = gr.Checkbox(value=False, label="SeasonalWindowAverage")
use_autoets = gr.Checkbox(value=True, label="AutoETS")
use_autoarima = gr.Checkbox(value=True, label="AutoARIMA (supports predictors)")
with gr.Accordion("TimeGPT", open=False):
use_timegpt = gr.Checkbox(value=False, label="Enable TimeGPT")
nixtla_api_key = gr.Textbox(label="Nixtla API Key", type="password")
with gr.Accordion("Additional models", open=True):
with gr.Row():
use_nf_lstm = gr.Checkbox(value=False, label="NeuralForecast LSTM")
use_tc_prophet = gr.Checkbox(value=False, label="TimeCopilot Prophet")
last_eval = gr.State(None)
last_future = gr.State(None)
last_files = gr.State([])
last_train_tail = gr.State(None)
run_btn = gr.Button("Run Forecast", variant="primary")
eval_df_out = gr.Dataframe(label="Evaluation metrics")
plot_out = gr.Plot(label="Forecast plot")
future_df_out = gr.Dataframe(label="Future forecasts")
files_out = gr.Files(label="Download results")
train_file.change(fn=on_train_upload, inputs=[train_file], outputs=[status, xcols])
run_btn.click(
fn=run_forecast,
inputs=[
train_file,
freq,
season_length,
cv_h,
cv_step_size,
cv_windows,
future_h,
loss_name,
run_cv,
use_histavg,
use_naive,
use_snaive,
use_winavg,
use_swinavg,
use_autoets,
use_autoarima,
use_timegpt,
nixtla_api_key,
use_nf_lstm,
use_tc_prophet,
xcols,
future_x_file,
last_eval,
last_future,
last_files,
last_train_tail,
],
outputs=[
eval_df_out,
plot_out,
future_df_out,
status,
files_out,
last_eval,
last_future,
last_files,
last_train_tail,
],
)
demo.queue().launch()