import pandas as pd import gradio as gr import tempfile import os from datetime import datetime import numpy as np import matplotlib.pyplot as plt from statsforecast import StatsForecast from statsforecast.models import ( HistoricAverage, Naive, SeasonalNaive, WindowAverage, SeasonalWindowAverage, AutoETS, AutoARIMA, AutoCES, AutoTheta, DynamicOptimizedTheta, MSTL ) from utilsforecast.evaluation import evaluate from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error # Import for MLForecast from mlforecast import MLForecast from lightgbm import LGBMRegressor # Foundation Models try: from chronos import ChronosPipeline import torch CHRONOS_AVAILABLE = True except: CHRONOS_AVAILABLE = False try: from uni2ts.model.moirai import MoiraiForecast MOIRAI_AVAILABLE = True except: MOIRAI_AVAILABLE = False # Helper function to calculate date offset based on frequency and horizon def calculate_date_offset(freq, horizon): """Calculate a timedelta based on frequency code and horizon""" if freq == 'H': return pd.Timedelta(hours=horizon) elif freq == 'D': return pd.Timedelta(days=horizon) elif freq == 'B': return pd.Timedelta(days=int(horizon * 1.4)) elif freq == 'WS': return pd.Timedelta(weeks=horizon) elif freq == 'MS': return pd.Timedelta(days=horizon * 30) elif freq == 'QS': return pd.Timedelta(days=horizon * 90) elif freq == 'YS': return pd.Timedelta(days=horizon * 365) else: return pd.Timedelta(days=horizon) # Function to generate and return a plot for validation results def create_forecast_plot(forecast_df, original_df, title="Forecasting Results", horizon=None, freq='D'): plt.figure(figsize=(12, 7)) unique_ids = forecast_df['unique_id'].unique() forecast_cols = [col for col in forecast_df.columns if col not in ['unique_id', 'ds', 'cutoff', 'y']] colors = plt.cm.tab10.colors min_cutoff = None for i, unique_id in enumerate(unique_ids): original_data = original_df[original_df['unique_id'] == unique_id] plt.plot(original_data['ds'], original_data['y'], 'k-', linewidth=2, label=f'{unique_id} (Actual)') forecast_data = forecast_df[forecast_df['unique_id'] == unique_id] if 'cutoff' in forecast_data.columns: cutoffs = pd.to_datetime(forecast_data['cutoff'].unique()) if len(cutoffs) > 0: earliest_cutoff = cutoffs.min() if min_cutoff is None or earliest_cutoff < min_cutoff: min_cutoff = earliest_cutoff for cutoff in cutoffs: plt.axvline(x=cutoff, color='gray', linestyle='--', alpha=0.4) for j, col in enumerate(forecast_cols): if col in forecast_data.columns: model_name = col.replace('_', ' ').title() plt.plot(forecast_data['ds'], forecast_data[col], color=colors[j % len(colors)], linestyle='--', linewidth=1.5, label=f'{model_name}') plt.title(title, fontsize=16) plt.xlabel('Date', fontsize=12) plt.ylabel('Value', fontsize=12) plt.grid(True, alpha=0.3) plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15), ncol=3, fontsize=10) plt.tight_layout(rect=[0, 0.05, 1, 0.95]) if min_cutoff is not None and horizon is not None: date_offset = calculate_date_offset(freq, horizon) start_date = min_cutoff - date_offset max_date = forecast_df['ds'].max() plt.xlim(start_date, max_date) plt.annotate('Training | Test', xy=(min_cutoff, plt.ylim()[0]), xytext=(0, -40), textcoords='offset points', horizontalalignment='center', fontsize=10) fig = plt.gcf() ax = plt.gca() fig.autofmt_xdate() return fig # Function to load and process uploaded CSV def load_data(file): if file is None: return None, "Please upload a CSV file" try: df = pd.read_csv(file) required_cols = ['unique_id', 'ds', 'y'] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: return None, f"Missing required columns: {', '.join(missing_cols)}" df['ds'] = pd.to_datetime(df['ds']) df = df.sort_values(['unique_id', 'ds']).reset_index(drop=True) # Check for NaN values if df['y'].isna().any(): return None, "Data contains missing values in the 'y' column" return df, "Data loaded successfully!" except Exception as e: return None, f"Error loading data: {str(e)}" # Main forecasting function def run_forecast( file, frequency, eval_strategy, horizon, step_size, num_windows, use_historical_avg, use_naive, use_seasonal_naive, seasonality, use_window_avg, window_size, use_seasonal_window_avg, seasonal_window_size, use_autoets, use_autoarima, use_autoces, use_autotheta, use_lgbm, use_chronos, use_moirai, future_horizon ): """ Main function to run forecasting with all selected models. Now includes proper handling of models that don't support predictors. """ try: # Load data df, message = load_data(file) if df is None: return None, None, None, None, None, [], message # Prepare data - only required columns for models without predictors df_basic = df[['unique_id', 'ds', 'y']].copy() # Initialize models list models = [] models_need_predictors = [] # Basic models (no predictors needed) if use_historical_avg: models.append(HistoricAverage()) if use_naive: models.append(Naive()) if use_seasonal_naive: models.append(SeasonalNaive(season_length=int(seasonality))) if use_window_avg: models.append(WindowAverage(window_size=int(window_size))) if use_seasonal_window_avg: models.append(SeasonalWindowAverage(season_length=int(seasonality), window_size=int(seasonal_window_size))) if use_autoets: models.append(AutoETS(season_length=int(seasonality))) if use_autoces: models.append(AutoCES(season_length=int(seasonality))) if use_autotheta: models.append(AutoTheta(season_length=int(seasonality))) # Models that can use predictors if use_autoarima: models_need_predictors.append(AutoARIMA(season_length=int(seasonality))) # Run cross-validation or fixed window if eval_strategy == "Cross Validation": h = horizon validation_results = [] # Run models without predictors if models: sf = StatsForecast(models=models, freq=frequency, n_jobs=-1) cv_df = sf.cross_validation( df=df_basic, h=int(h), step_size=int(step_size), n_windows=int(num_windows) ) validation_results.append(cv_df) # Run models with predictors (if needed, add predictor handling here) # For now, we'll run them without predictors if models_need_predictors: sf_pred = StatsForecast(models=models_need_predictors, freq=frequency, n_jobs=-1) cv_df_pred = sf_pred.cross_validation( df=df_basic, h=int(h), step_size=int(step_size), n_windows=int(num_windows) ) validation_results.append(cv_df_pred) # Combine results if validation_results: validation_df = pd.concat(validation_results, axis=1) validation_df = validation_df.loc[:,~validation_df.columns.duplicated()] else: return None, None, None, None, None, [], "No models selected" else: # Fixed Window # Split data train_df = [] for uid in df_basic['unique_id'].unique(): uid_data = df_basic[df_basic['unique_id'] == uid].iloc[:-int(horizon)] train_df.append(uid_data) train_df = pd.concat(train_df) # Fit and predict all_models = models + models_need_predictors if all_models: sf = StatsForecast(models=all_models, freq=frequency, n_jobs=-1) sf.fit(train_df) validation_df = sf.predict(h=int(horizon), level=[90, 95]) else: return None, None, None, None, None, [], "No models selected" # Add ML model forecasts if selected if use_lgbm: mlf = MLForecast( models={'LightGBM': LGBMRegressor(verbose=-1)}, freq=frequency, lags=[1, 7, 14], num_threads=1 ) if eval_strategy == "Cross Validation": ml_cv = mlf.cross_validation( df=df_basic, h=int(horizon), step_size=int(step_size), n_windows=int(num_windows) ) validation_df = validation_df.merge(ml_cv, on=['unique_id', 'ds', 'cutoff'], how='outer') else: mlf.fit(train_df) ml_pred = mlf.predict(h=int(horizon)) validation_df = validation_df.merge(ml_pred, on=['unique_id', 'ds'], how='outer') # Add foundation model forecasts if use_chronos and CHRONOS_AVAILABLE: try: pipeline = ChronosPipeline.from_pretrained( "amazon/chronos-t5-tiny", device_map="auto", torch_dtype=torch.bfloat16, ) chronos_forecasts = [] for uid in df_basic['unique_id'].unique(): uid_data = train_df[train_df['unique_id'] == uid]['y'].values context = torch.tensor(uid_data) forecast = pipeline.predict(context, prediction_length=int(horizon)) forecast_median = np.median(forecast[0].numpy(), axis=0) uid_forecast = pd.DataFrame({ 'unique_id': uid, 'ds': pd.date_range( start=train_df[train_df['unique_id'] == uid]['ds'].max() + pd.Timedelta(days=1), periods=int(horizon), freq=frequency ), 'Chronos': forecast_median }) chronos_forecasts.append(uid_forecast) chronos_df = pd.concat(chronos_forecasts) validation_df = validation_df.merge(chronos_df, on=['unique_id', 'ds'], how='outer') except Exception as e: print(f"Chronos error: {e}") # Evaluate models eval_cols = [col for col in validation_df.columns if col not in ['unique_id', 'ds', 'cutoff', 'y']] if 'y' not in validation_df.columns: # Merge with actual values validation_df = validation_df.merge( df_basic[['unique_id', 'ds', 'y']], on=['unique_id', 'ds'], how='left' ) # Calculate metrics metrics_list = [] for col in eval_cols: if col in validation_df.columns and not validation_df[col].isna().all(): y_true = validation_df['y'].values y_pred = validation_df[col].values mask = ~(np.isnan(y_true) | np.isnan(y_pred)) if mask.sum() > 0: y_true_clean = y_true[mask] y_pred_clean = y_pred[mask] # Calculate RMSE manually rmse_value = np.sqrt(mean_squared_error(y_true_clean, y_pred_clean)) metrics_list.append({ 'Model': col, 'MAE': mean_absolute_error(y_true_clean, y_pred_clean), 'RMSE': rmse_value, 'MAPE': mean_absolute_percentage_error(y_true_clean, y_pred_clean) * 100 }) eval_metrics = pd.DataFrame(metrics_list) # Create validation plot validation_plot = create_forecast_plot( validation_df.reset_index() if 'index' not in validation_df.columns else validation_df, df_basic, "Validation Results", horizon, frequency ) # Future forecast future_models = models + models_need_predictors if future_models: sf_future = StatsForecast(models=future_models, freq=frequency, n_jobs=-1) sf_future.fit(df_basic) future_df = sf_future.predict(h=int(future_horizon), level=[90, 95]) else: future_df = pd.DataFrame() # Create future forecast plot future_plot = create_forecast_plot( future_df.reset_index() if not future_df.empty else pd.DataFrame(), df_basic, "Future Forecast", future_horizon, frequency ) # Export files export_files = [] # Save to temp files with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f: eval_metrics.to_csv(f, index=False) export_files.append(f.name) with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f: validation_df.to_csv(f, index=False) export_files.append(f.name) with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f: future_df.to_csv(f, index=False) export_files.append(f.name) return ( eval_metrics, validation_df, validation_plot, future_df, future_plot, export_files, " Forecasting completed successfully!" ) except Exception as e: import traceback error_msg = f"Error: {str(e)}\n\n{traceback.format_exc()}" return None, None, None, None, None, [], error_msg # Gradio Interface with gr.Blocks(title="Duke Energy Forecasting App") as app: gr.Markdown(""" # Time Series Forecasting Upload your time series data and select models to generate forecasts. Supports StatsForecast, MLForecast, and Foundation Models (Chronos, Moirai). """) with gr.Row(): with gr.Column(scale=1): file_input = gr.File(label="Upload CSV File", file_types=['.csv']) with gr.Accordion("Forecast Configuration", open=True): frequency = gr.Dropdown( choices=[ ("Hourly", "H"), ("Daily", "D"), ("Business Day", "B"), ("Weekly", "WS"), ("Monthly", "MS"), ("Quarterly", "QS"), ("Yearly", "YS") ], label="Data Frequency", value="D" ) eval_strategy = gr.Radio( choices=["Fixed Window", "Cross Validation"], label="Evaluation Strategy", value="Cross Validation" ) with gr.Group(visible=True) as fixed_window_box: gr.Markdown("### Fixed Window Settings") horizon = gr.Slider(1, 100, value=10, step=1, label="Validation Horizon") with gr.Group(visible=True) as cv_box: gr.Markdown("### Cross Validation Settings") with gr.Row(): step_size = gr.Slider(1, 50, value=10, step=1, label="Step Size") num_windows = gr.Slider(1, 20, value=5, step=1, label="Number of Windows") with gr.Group(): gr.Markdown("### Future Forecast Settings") future_horizon = gr.Slider(1, 100, value=10, step=1, label="Future Forecast Horizon") with gr.Accordion("Model Configuration", open=True): with gr.Tabs(): with gr.TabItem("Statistical Models"): gr.Markdown("## Basic Models") with gr.Row(): use_historical_avg = gr.Checkbox(label="Historical Average", value=True) use_naive = gr.Checkbox(label="Naive", value=True) with gr.Group(): gr.Markdown("### Seasonality Configuration") seasonality = gr.Number(label="Seasonality Period", value=7) gr.Markdown("### Seasonal Models") use_seasonal_naive = gr.Checkbox(label="Seasonal Naive", value=True) gr.Markdown("### Window-based Models") with gr.Row(): use_window_avg = gr.Checkbox(label="Window Average", value=False) window_size = gr.Number(label="Window Size", value=10) with gr.Row(): use_seasonal_window_avg = gr.Checkbox(label="Seasonal Window Average", value=False) seasonal_window_size = gr.Number(label="Seasonal Window Size", value=2) gr.Markdown("### Advanced Models") with gr.Row(): use_autoets = gr.Checkbox(label="AutoETS", value=False) use_autoarima = gr.Checkbox(label="AutoARIMA", value=False) with gr.Row(): use_autoces = gr.Checkbox(label="AutoCES", value=False) use_autotheta = gr.Checkbox(label="AutoTheta", value=False) with gr.TabItem("Machine Learning"): gr.Markdown("## Gradient Boosting Models") use_lgbm = gr.Checkbox(label="LightGBM", value=True) with gr.TabItem("Foundation Models"): gr.Markdown("## State-of-the-Art Foundation Models") with gr.Row(): use_chronos = gr.Checkbox( label="Chronos (Amazon)", value=CHRONOS_AVAILABLE, interactive=CHRONOS_AVAILABLE ) use_moirai = gr.Checkbox( label="Moirai (Salesforce)", value=False, interactive=MOIRAI_AVAILABLE ) if not CHRONOS_AVAILABLE: gr.Markdown(" Chronos not available. Install: `pip install chronos-forecasting`") if not MOIRAI_AVAILABLE: gr.Markdown(" Moirai not available. Install: `pip install uni2ts`") with gr.Column(scale=3): message_output = gr.Textbox(label="Status Message") with gr.Tabs(): with gr.TabItem("Validation Results"): eval_output = gr.Dataframe(label="Evaluation Metrics") validation_plot = gr.Plot(label="Validation Plot") validation_output = gr.Dataframe(label="Validation Data", visible=False) with gr.Row(): show_data_btn = gr.Button("Show Validation Data") hide_data_btn = gr.Button("Hide Validation Data", visible=False) with gr.TabItem("Future Forecast"): forecast_plot = gr.Plot(label="Future Forecast Plot") forecast_output = gr.Dataframe(label="Future Forecast Data", visible=False) with gr.Row(): show_forecast_btn = gr.Button("Show Forecast Data") hide_forecast_btn = gr.Button("Hide Forecast Data", visible=False) with gr.TabItem("Export Results"): export_files = gr.Files(label="Download Results") with gr.Row(): submit_btn = gr.Button("Run Validation and Forecast", variant="primary", size="lg") # Event handlers def update_eval_boxes(strategy): return ( gr.update(visible=strategy == "Fixed Window"), gr.update(visible=strategy == "Cross Validation") ) eval_strategy.change( fn=update_eval_boxes, inputs=[eval_strategy], outputs=[fixed_window_box, cv_box] ) def show_data(): return gr.update(visible=True), gr.update(visible=True), gr.update(visible=False) def hide_data(): return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) show_data_btn.click(fn=show_data, outputs=[validation_output, hide_data_btn, show_data_btn]) hide_data_btn.click(fn=hide_data, outputs=[validation_output, hide_data_btn, show_data_btn]) show_forecast_btn.click(fn=show_data, outputs=[forecast_output, hide_forecast_btn, show_forecast_btn]) hide_forecast_btn.click(fn=hide_data, outputs=[forecast_output, hide_forecast_btn, show_forecast_btn]) submit_btn.click( fn=run_forecast, inputs=[ file_input, frequency, eval_strategy, horizon, step_size, num_windows, use_historical_avg, use_naive, use_seasonal_naive, seasonality, use_window_avg, window_size, use_seasonal_window_avg, seasonal_window_size, use_autoets, use_autoarima, use_autoces, use_autotheta, use_lgbm, use_chronos, use_moirai, future_horizon ], outputs=[ eval_output, validation_output, validation_plot, forecast_output, forecast_plot, export_files, message_output ] ) if __name__ == "__main__": app.launch(share=True)