Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import traceback | |
| from datetime import datetime | |
| import gradio as gr | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| from statsforecast import StatsForecast | |
| from statsforecast.models import ( | |
| HistoricAverage, | |
| Naive, | |
| SeasonalNaive, | |
| WindowAverage, | |
| SeasonalWindowAverage, | |
| AutoETS, | |
| AutoARIMA, | |
| ) | |
| from utilsforecast.evaluation import evaluate | |
| from utilsforecast.losses import mae, mape, mase, mse, rmse, smape | |
| REQUIRED_COLS = ["unique_id", "ds", "y"] | |
| PLOT_TAIL_POINTS = 300 | |
| def _to_int(x, name): | |
| if x is None: | |
| raise ValueError(f"{name} is None") | |
| return int(round(float(x))) | |
| def _ensure_pandas_df(obj): | |
| if obj is None: | |
| return None | |
| if isinstance(obj, pd.DataFrame): | |
| return obj | |
| if hasattr(obj, "to_pandas"): | |
| return obj.to_pandas() | |
| return pd.DataFrame(obj) | |
| def _tail_history(df_y, n=PLOT_TAIL_POINTS): | |
| if df_y is None or df_y.empty: | |
| return df_y | |
| return ( | |
| df_y.sort_values(["unique_id", "ds"]) | |
| .groupby("unique_id", as_index=False, group_keys=False) | |
| .tail(n) | |
| .reset_index(drop=True) | |
| ) | |
| def load_training_data(file_obj): | |
| if file_obj is None: | |
| return None, [], "Please upload a CSV file." | |
| try: | |
| path = file_obj.name if hasattr(file_obj, "name") else file_obj | |
| df = pd.read_csv(path) | |
| missing = [c for c in REQUIRED_COLS if c not in df.columns] | |
| if missing: | |
| return None, [], f"Missing required columns: {', '.join(missing)}" | |
| df["ds"] = pd.to_datetime(df["ds"]) | |
| df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True) | |
| if df["y"].isna().any(): | |
| return None, [], "Data contains missing values in the 'y' column." | |
| candidate_xcols = [c for c in df.columns if c not in REQUIRED_COLS] | |
| return df, candidate_xcols, "Training data loaded successfully!" | |
| except Exception as e: | |
| return None, [], f"Error loading data: {type(e).__name__}: {e}" | |
| def load_future_x(file_obj, xcols): | |
| if not xcols: | |
| return None, "No predictors selected." | |
| if file_obj is None: | |
| return None, "Future predictors CSV (X_df) is required to use predictors." | |
| try: | |
| path = file_obj.name if hasattr(file_obj, "name") else file_obj | |
| X_df = pd.read_csv(path) | |
| required = ["unique_id", "ds"] + list(xcols) | |
| missing = [c for c in required if c not in X_df.columns] | |
| if missing: | |
| return None, f"Future predictors CSV missing: {', '.join(missing)}" | |
| X_df["ds"] = pd.to_datetime(X_df["ds"]) | |
| X_df = X_df.sort_values(["unique_id", "ds"]).reset_index(drop=True) | |
| if X_df.duplicated(["unique_id", "ds"]).any(): | |
| return None, "Future predictors CSV has duplicate (unique_id, ds) rows." | |
| return X_df, "Future predictors loaded." | |
| except Exception as e: | |
| return None, f"Error loading future predictors: {type(e).__name__}: {e}" | |
| def align_future_x_to_horizon(df_train, X_future, xcols, h): | |
| last_ds = df_train.groupby("unique_id", as_index=False)["ds"].max().rename(columns={"ds": "last_ds"}) | |
| X = X_future.merge(last_ds, on="unique_id", how="inner") | |
| X = X[X["ds"] > X["last_ds"]].copy() | |
| X = X.sort_values(["unique_id", "ds"]).reset_index(drop=True) | |
| X_h = X.groupby("unique_id", as_index=False).head(h).copy() | |
| counts = X_h.groupby("unique_id")["ds"].size() | |
| missing_ids = sorted(set(last_ds["unique_id"]) - set(counts.index)) | |
| short_ids = sorted([uid for uid, c in counts.items() if c < h]) | |
| if missing_ids or short_ids: | |
| parts = [] | |
| if missing_ids: | |
| parts.append(f"Missing future X rows for: {', '.join(missing_ids)}") | |
| if short_ids: | |
| parts.append(f"Not enough future X rows (need {h}) for: {', '.join(short_ids)}") | |
| raise ValueError(" | ".join(parts)) | |
| X_h = X_h[["unique_id", "ds"] + list(xcols)] | |
| for c in xcols: | |
| X_h[c] = pd.to_numeric(X_h[c], errors="raise") | |
| return X_h.reset_index(drop=True) | |
| def _normalize_timegpt_output(tgpt_raw, df_y, h): | |
| tgpt = _ensure_pandas_df(tgpt_raw) | |
| if tgpt is None or tgpt.empty: | |
| raise ValueError("TimeGPT returned empty output.") | |
| if "unique_id" not in tgpt.columns or "ds" not in tgpt.columns: | |
| raise ValueError(f"TimeGPT output missing required columns. Got: {list(tgpt.columns)}") | |
| tgpt["ds"] = pd.to_datetime(tgpt["ds"], errors="coerce") | |
| if tgpt["ds"].isna().any(): | |
| raise ValueError("TimeGPT output has invalid ds values.") | |
| out_col = None | |
| for c in tgpt.columns: | |
| if c.lower() in ("timegpt", "yhat", "y_hat", "forecast"): | |
| out_col = c | |
| break | |
| if out_col is None: | |
| non_id = [c for c in tgpt.columns if c not in ["unique_id", "ds"]] | |
| if non_id: | |
| out_col = non_id[0] | |
| if out_col is None: | |
| raise ValueError("TimeGPT output has no forecast column.") | |
| if out_col != "timegpt": | |
| tgpt = tgpt.rename(columns={out_col: "timegpt"}) | |
| last_ds = df_y.groupby("unique_id", as_index=False)["ds"].max().rename(columns={"ds": "last_ds"}) | |
| tgpt = tgpt.merge(last_ds, on="unique_id", how="inner") | |
| tgpt = tgpt[tgpt["ds"] > tgpt["last_ds"]].copy() | |
| tgpt = tgpt.sort_values(["unique_id", "ds"]).reset_index(drop=True) | |
| tgpt = tgpt.groupby("unique_id", as_index=False).head(h).copy() | |
| counts = tgpt.groupby("unique_id")["ds"].size() | |
| missing_ids = sorted(set(last_ds["unique_id"]) - set(counts.index)) | |
| short_ids = sorted([uid for uid, c in counts.items() if c < h]) | |
| if missing_ids or short_ids: | |
| parts = [] | |
| if missing_ids: | |
| parts.append(f"Missing TimeGPT future rows for: {', '.join(missing_ids)}") | |
| if short_ids: | |
| parts.append(f"Not enough TimeGPT rows (need {h}) for: {', '.join(short_ids)}") | |
| raise ValueError(" | ".join(parts)) | |
| tgpt = tgpt[["unique_id", "ds", "timegpt"]].copy() | |
| return tgpt | |
| def _ensure_forecast_cols(df_fc, colname): | |
| if df_fc is None or df_fc.empty: | |
| return None | |
| df_fc = _ensure_pandas_df(df_fc) | |
| if df_fc is None or df_fc.empty: | |
| return None | |
| if colname not in df_fc.columns: | |
| non_id = [c for c in df_fc.columns if c not in ["unique_id", "ds"]] | |
| if not non_id: | |
| raise ValueError(f"Forecast output missing forecast column. Got: {list(df_fc.columns)}") | |
| df_fc = df_fc.rename(columns={non_id[0]: colname}) | |
| df_fc = df_fc[["unique_id", "ds", colname]].copy() | |
| df_fc["ds"] = pd.to_datetime(df_fc["ds"]) | |
| return df_fc | |
| def create_future_plot(fcst_df, original_df, title="Forecast"): | |
| if fcst_df is None or fcst_df.empty: | |
| return None | |
| plt.figure(figsize=(12, 7)) | |
| forecast_cols = [c for c in fcst_df.columns if c not in ["unique_id", "ds"]] | |
| unique_ids = fcst_df["unique_id"].unique() | |
| colors = plt.cm.tab10.colors | |
| for uid in unique_ids: | |
| hist = original_df[original_df["unique_id"] == uid] | |
| f = fcst_df[fcst_df["unique_id"] == uid] | |
| plt.plot(hist["ds"], hist["y"], "k-", linewidth=2, label=f"{uid} (Historical)") | |
| if not f.empty: | |
| start = f["ds"].min() | |
| plt.axvline(x=start, color="gray", linestyle="--", alpha=0.5) | |
| plt.axvspan(start, f["ds"].max(), alpha=0.1) | |
| for j, col in enumerate(forecast_cols): | |
| name = col.replace("_", " ").title() | |
| if name == "Timegpt": | |
| name = "TimeGPT" | |
| plt.plot( | |
| f["ds"], | |
| f[col], | |
| linestyle="--", | |
| linewidth=1.5, | |
| color=colors[j % len(colors)], | |
| label=name, | |
| ) | |
| plt.title(title, fontsize=16) | |
| plt.xlabel("Date") | |
| plt.ylabel("Value") | |
| plt.grid(True, alpha=0.3) | |
| plt.legend(loc="upper center", bbox_to_anchor=(0.5, -0.15), ncol=3, fontsize=10) | |
| plt.tight_layout(rect=[0, 0.05, 1, 0.95]) | |
| fig = plt.gcf() | |
| fig.autofmt_xdate() | |
| plt.close(fig) | |
| return fig | |
| def export_results(eval_df, future_df): | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| temp_dir = tempfile.mkdtemp() | |
| out = [] | |
| if eval_df is not None and not eval_df.empty: | |
| p = os.path.join(temp_dir, f"evaluation_metrics_{ts}.csv") | |
| eval_df.to_csv(p, index=False) | |
| out.append(p) | |
| if future_df is not None and not future_df.empty: | |
| p = os.path.join(temp_dir, f"future_forecasts_{ts}.csv") | |
| future_df.to_csv(p, index=False) | |
| out.append(p) | |
| return out | |
| def run_forecast( | |
| train_file, | |
| freq, | |
| season_length, | |
| cv_h, | |
| cv_step_size, | |
| cv_windows, | |
| future_h, | |
| loss_name, | |
| run_cv, | |
| use_histavg, | |
| use_naive, | |
| use_snaive, | |
| use_winavg, | |
| use_swinavg, | |
| use_autoets, | |
| use_autoarima, | |
| use_timegpt, | |
| nixtla_api_key, | |
| use_nf_lstm, | |
| use_tc_prophet, | |
| xcols, | |
| future_x_file, | |
| last_eval, | |
| last_future, | |
| last_files, | |
| last_train_tail, | |
| ): | |
| def rebuild_plot(): | |
| if isinstance(last_future, pd.DataFrame) and isinstance(last_train_tail, pd.DataFrame) and (not last_future.empty): | |
| return create_future_plot(last_future, last_train_tail, title="Forecast") | |
| return None | |
| def keep(msg): | |
| return ( | |
| last_eval, | |
| rebuild_plot(), | |
| last_future, | |
| msg, | |
| last_files, | |
| last_eval, | |
| last_future, | |
| last_files, | |
| last_train_tail, | |
| ) | |
| try: | |
| season_length = _to_int(season_length, "season_length") | |
| cv_h = _to_int(cv_h, "cv_h") | |
| cv_step_size = _to_int(cv_step_size, "cv_step_size") | |
| cv_windows = _to_int(cv_windows, "cv_windows") | |
| future_h = _to_int(future_h, "future_h") | |
| df, candidate_xcols, msg = load_training_data(train_file) | |
| if df is None: | |
| return keep(msg) | |
| df_y = df[["unique_id", "ds", "y"]].copy() | |
| train_tail = _tail_history(df_y, n=PLOT_TAIL_POINTS) | |
| use_exog = bool(xcols) | |
| needs_future_x = use_exog and (use_timegpt or use_nf_lstm or use_autoarima) | |
| X_future_h = None | |
| df_exog = None | |
| if needs_future_x: | |
| X_future, xmsg = load_future_x(future_x_file, xcols) | |
| if X_future is None: | |
| return keep(xmsg) | |
| try: | |
| X_future_h = align_future_x_to_horizon(df_y, X_future, xcols, future_h) | |
| except Exception as e: | |
| return keep(f"Future predictors invalid: {type(e).__name__}: {e}") | |
| if use_exog: | |
| df_exog = df[["unique_id", "ds", "y"] + list(xcols)].copy() | |
| for c in xcols: | |
| df_exog[c] = pd.to_numeric(df_exog[c], errors="raise") | |
| loss_fn = { | |
| "rmse": rmse, | |
| "mae": mae, | |
| "mse": mse, | |
| "mape": mape, | |
| "smape": smape, | |
| "mase": mase, | |
| }.get(loss_name, rmse) | |
| uni_models = [] | |
| exog_models = [] | |
| if use_histavg: | |
| uni_models.append(HistoricAverage(alias="hist_avg")) | |
| if use_naive: | |
| uni_models.append(Naive(alias="naive")) | |
| if use_snaive: | |
| uni_models.append(SeasonalNaive(alias="seasonal_naive", season_length=season_length)) | |
| if use_winavg: | |
| uni_models.append(WindowAverage(alias="window_avg", window_size=season_length)) | |
| if use_swinavg: | |
| uni_models.append(SeasonalWindowAverage(alias="seasonal_window_avg", season_length=season_length)) | |
| if use_autoets: | |
| uni_models.append(AutoETS(alias="autoets", season_length=season_length)) | |
| if use_autoarima: | |
| m = AutoARIMA(alias="autoarima", season_length=season_length) | |
| if use_exog: | |
| exog_models.append(m) | |
| else: | |
| uni_models.append(m) | |
| eval_parts = [] | |
| future_parts = [] | |
| if uni_models: | |
| sf_u = StatsForecast(models=uni_models, freq=freq, n_jobs=1) | |
| if run_cv: | |
| cv_u = sf_u.cross_validation(df=df_y, h=cv_h, step_size=cv_step_size, n_windows=cv_windows) | |
| ev_u = evaluate(cv_u, metrics=[loss_fn]) | |
| ev_u["pipeline"] = "univariate" | |
| eval_parts.append(ev_u) | |
| fc_u = sf_u.forecast(df=df_y, h=future_h) | |
| fc_u = _ensure_forecast_cols(fc_u, "statsforecast") | |
| future_parts.append(fc_u) | |
| if exog_models: | |
| if not use_exog or X_future_h is None or df_exog is None: | |
| return keep("Exogenous model selected but predictors/X_df are missing.") | |
| sf_x = StatsForecast(models=exog_models, freq=freq, n_jobs=1) | |
| fc_x = sf_x.forecast(df=df_exog, h=future_h, X_df=X_future_h) | |
| fc_x = _ensure_forecast_cols(fc_x, "autoarima") | |
| future_parts.append(fc_x) | |
| if use_timegpt: | |
| if not nixtla_api_key: | |
| return keep("TimeGPT selected but no Nixtla API key provided.") | |
| if use_exog and X_future_h is None: | |
| return keep("TimeGPT predictors selected but X_df is missing/invalid.") | |
| try: | |
| from nixtla import NixtlaClient | |
| except Exception as e: | |
| return keep(f"TimeGPT selected but nixtla import failed: {type(e).__name__}: {e}") | |
| try: | |
| client = NixtlaClient(api_key=nixtla_api_key) | |
| df_tgpt = df_exog if use_exog else df_y | |
| tgpt_raw = client.forecast( | |
| df=df_tgpt, | |
| h=future_h, | |
| freq=freq, | |
| X_df=X_future_h if use_exog else None, | |
| hist_exog_list=list(xcols) if use_exog else None, | |
| ) | |
| tgpt = _normalize_timegpt_output(tgpt_raw, df_y, future_h) | |
| future_parts.append(tgpt) | |
| except BaseException as e: | |
| return keep(f"TimeGPT failed: {type(e).__name__}: {e}") | |
| if use_nf_lstm: | |
| if use_exog and X_future_h is None: | |
| return keep("LSTM selected with predictors but X_df is missing/invalid.") | |
| try: | |
| from neuralforecast import NeuralForecast | |
| from neuralforecast.models import LSTM | |
| except Exception as e: | |
| return keep(f"LSTM selected but neuralforecast import failed: {type(e).__name__}: {e}") | |
| try: | |
| nf_df = df_exog if use_exog else df_y | |
| futr_df = None | |
| futr_exog_list = None | |
| if use_exog: | |
| futr_df = X_future_h.copy() | |
| futr_exog_list = list(xcols) | |
| model = LSTM( | |
| h=future_h, | |
| max_steps=80, | |
| input_size=max(2 * future_h, 30), | |
| encoder_hidden_size=32, | |
| decoder_hidden_size=32, | |
| batch_size=16, | |
| futr_exog_list=futr_exog_list, | |
| alias="lstm", | |
| ) | |
| nf = NeuralForecast(models=[model], freq=freq) | |
| nf.fit(df=nf_df) | |
| pred = nf.predict(futr_df=futr_df).reset_index(drop=False) if use_exog else nf.predict().reset_index(drop=False) | |
| cols = [c for c in pred.columns if c not in ["unique_id", "ds"]] | |
| if not cols: | |
| return keep("LSTM produced no forecast columns.") | |
| main_col = cols[0] | |
| lstm_df = pred[["unique_id", "ds", main_col]].rename(columns={main_col: "lstm"}) | |
| lstm_df["ds"] = pd.to_datetime(lstm_df["ds"]) | |
| future_parts.append(lstm_df) | |
| except Exception as e: | |
| return keep(f"LSTM failed: {type(e).__name__}: {e}") | |
| if use_tc_prophet: | |
| try: | |
| from timecopilot.models.prophet import Prophet | |
| except Exception as e: | |
| return keep(f"Prophet selected but timecopilot import failed: {type(e).__name__}: {e}") | |
| try: | |
| p = Prophet(alias="prophet") | |
| prop = p.forecast(df=df_y, h=future_h, freq=freq) | |
| prop = _ensure_forecast_cols(prop, "prophet") | |
| future_parts.append(prop) | |
| except Exception as e: | |
| return keep(f"Prophet failed: {type(e).__name__}: {e}") | |
| if not future_parts and not eval_parts: | |
| return keep("No models selected.") | |
| eval_df = pd.concat(eval_parts, ignore_index=True) if eval_parts else None | |
| future_df = future_parts[0].copy() | |
| for part in future_parts[1:]: | |
| overlap = set(future_df.columns).intersection(set(part.columns)) - {"unique_id", "ds"} | |
| future_df = future_df.merge(part.drop(columns=list(overlap), errors="ignore"), on=["unique_id", "ds"], how="outer") | |
| future_df = future_df.sort_values(["unique_id", "ds"]).reset_index(drop=True) | |
| fig = create_future_plot(future_df, train_tail, title="Forecast") | |
| files = export_results(eval_df, future_df) | |
| status = msg | |
| if use_exog: | |
| status += f" | Predictors: {', '.join(xcols)}" | |
| return ( | |
| eval_df, | |
| fig, | |
| future_df, | |
| status, | |
| files, | |
| eval_df, | |
| future_df, | |
| files, | |
| train_tail, | |
| ) | |
| except Exception as e: | |
| print(traceback.format_exc()) | |
| return keep(f"Run failed: {type(e).__name__}: {e}") | |
| def on_train_upload(file_obj): | |
| df, xchoices, msg = load_training_data(file_obj) | |
| return msg, gr.update(choices=xchoices, value=[]) | |
| with gr.Blocks(title="Forecasting Demo") as demo: | |
| gr.Markdown( | |
| """ | |
| # Forecasting Demo | |
| Training CSV required columns: unique_id, ds, y | |
| Optional predictors: extra columns in training CSV + X_df for horizon. | |
| """ | |
| ) | |
| with gr.Row(): | |
| train_file = gr.File(label="Upload training CSV", file_types=[".csv"]) | |
| status = gr.Textbox(label="Status", value="Upload a CSV to begin.", interactive=False) | |
| xcols = gr.Dropdown(choices=[], multiselect=True, label="Predictor columns (optional)") | |
| future_x_file = gr.File(label="Upload future predictors CSV (X_df)", file_types=[".csv"]) | |
| with gr.Accordion("Forecast settings", open=True): | |
| with gr.Row(): | |
| freq = gr.Dropdown(choices=["H", "D", "B", "WS", "MS", "QS", "YS"], value="D", label="Frequency") | |
| season_length = gr.Number(value=7, label="Season length", precision=0) | |
| with gr.Row(): | |
| cv_h = gr.Number(value=14, label="CV horizon (h)", precision=0) | |
| cv_step_size = gr.Number(value=1, label="CV step size", precision=0) | |
| cv_windows = gr.Number(value=3, label="CV windows", precision=0) | |
| future_h = gr.Number(value=30, label="Future forecast horizon", precision=0) | |
| loss_name = gr.Dropdown(choices=["rmse", "mae", "mse", "mape", "smape", "mase"], value="rmse", label="Metric") | |
| run_cv = gr.Checkbox(value=False, label="Run cross-validation (slower)") | |
| with gr.Accordion("StatsForecast models", open=True): | |
| with gr.Row(): | |
| use_histavg = gr.Checkbox(value=False, label="HistoricAverage") | |
| use_naive = gr.Checkbox(value=True, label="Naive") | |
| use_snaive = gr.Checkbox(value=False, label="SeasonalNaive") | |
| use_winavg = gr.Checkbox(value=False, label="WindowAverage") | |
| use_swinavg = gr.Checkbox(value=False, label="SeasonalWindowAverage") | |
| use_autoets = gr.Checkbox(value=True, label="AutoETS") | |
| use_autoarima = gr.Checkbox(value=True, label="AutoARIMA (supports predictors)") | |
| with gr.Accordion("TimeGPT", open=False): | |
| use_timegpt = gr.Checkbox(value=False, label="Enable TimeGPT") | |
| nixtla_api_key = gr.Textbox(label="Nixtla API Key", type="password") | |
| with gr.Accordion("Additional models", open=True): | |
| with gr.Row(): | |
| use_nf_lstm = gr.Checkbox(value=False, label="NeuralForecast LSTM") | |
| use_tc_prophet = gr.Checkbox(value=False, label="TimeCopilot Prophet") | |
| last_eval = gr.State(None) | |
| last_future = gr.State(None) | |
| last_files = gr.State([]) | |
| last_train_tail = gr.State(None) | |
| run_btn = gr.Button("Run Forecast", variant="primary") | |
| eval_df_out = gr.Dataframe(label="Evaluation metrics") | |
| plot_out = gr.Plot(label="Forecast plot") | |
| future_df_out = gr.Dataframe(label="Future forecasts") | |
| files_out = gr.Files(label="Download results") | |
| train_file.change(fn=on_train_upload, inputs=[train_file], outputs=[status, xcols]) | |
| run_btn.click( | |
| fn=run_forecast, | |
| inputs=[ | |
| train_file, | |
| freq, | |
| season_length, | |
| cv_h, | |
| cv_step_size, | |
| cv_windows, | |
| future_h, | |
| loss_name, | |
| run_cv, | |
| use_histavg, | |
| use_naive, | |
| use_snaive, | |
| use_winavg, | |
| use_swinavg, | |
| use_autoets, | |
| use_autoarima, | |
| use_timegpt, | |
| nixtla_api_key, | |
| use_nf_lstm, | |
| use_tc_prophet, | |
| xcols, | |
| future_x_file, | |
| last_eval, | |
| last_future, | |
| last_files, | |
| last_train_tail, | |
| ], | |
| outputs=[ | |
| eval_df_out, | |
| plot_out, | |
| future_df_out, | |
| status, | |
| files_out, | |
| last_eval, | |
| last_future, | |
| last_files, | |
| last_train_tail, | |
| ], | |
| ) | |
| demo.queue().launch() |