Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import gradio as gr | |
| import tempfile | |
| import os | |
| from datetime import datetime | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from statsforecast import StatsForecast | |
| from statsforecast.models import ( | |
| HistoricAverage, | |
| Naive, | |
| SeasonalNaive, | |
| WindowAverage, | |
| SeasonalWindowAverage, | |
| AutoETS, | |
| AutoARIMA, | |
| AutoCES, | |
| AutoTheta, | |
| DynamicOptimizedTheta, | |
| MSTL | |
| ) | |
| from utilsforecast.evaluation import evaluate | |
| from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error | |
| # Import for MLForecast | |
| from mlforecast import MLForecast | |
| from lightgbm import LGBMRegressor | |
| # Foundation Models | |
| try: | |
| from chronos import ChronosPipeline | |
| import torch | |
| CHRONOS_AVAILABLE = True | |
| except: | |
| CHRONOS_AVAILABLE = False | |
| try: | |
| from uni2ts.model.moirai import MoiraiForecast | |
| MOIRAI_AVAILABLE = True | |
| except: | |
| MOIRAI_AVAILABLE = False | |
| # Helper function to calculate date offset based on frequency and horizon | |
| def calculate_date_offset(freq, horizon): | |
| """Calculate a timedelta based on frequency code and horizon""" | |
| if freq == 'H': | |
| return pd.Timedelta(hours=horizon) | |
| elif freq == 'D': | |
| return pd.Timedelta(days=horizon) | |
| elif freq == 'B': | |
| return pd.Timedelta(days=int(horizon * 1.4)) | |
| elif freq == 'WS': | |
| return pd.Timedelta(weeks=horizon) | |
| elif freq == 'MS': | |
| return pd.Timedelta(days=horizon * 30) | |
| elif freq == 'QS': | |
| return pd.Timedelta(days=horizon * 90) | |
| elif freq == 'YS': | |
| return pd.Timedelta(days=horizon * 365) | |
| else: | |
| return pd.Timedelta(days=horizon) | |
| # Function to generate and return a plot for validation results | |
| def create_forecast_plot(forecast_df, original_df, title="Forecasting Results", horizon=None, freq='D'): | |
| plt.figure(figsize=(12, 7)) | |
| unique_ids = forecast_df['unique_id'].unique() | |
| forecast_cols = [col for col in forecast_df.columns if col not in ['unique_id', 'ds', 'cutoff', 'y']] | |
| colors = plt.cm.tab10.colors | |
| min_cutoff = None | |
| for i, unique_id in enumerate(unique_ids): | |
| original_data = original_df[original_df['unique_id'] == unique_id] | |
| plt.plot(original_data['ds'], original_data['y'], 'k-', linewidth=2, label=f'{unique_id} (Actual)') | |
| forecast_data = forecast_df[forecast_df['unique_id'] == unique_id] | |
| if 'cutoff' in forecast_data.columns: | |
| cutoffs = pd.to_datetime(forecast_data['cutoff'].unique()) | |
| if len(cutoffs) > 0: | |
| earliest_cutoff = cutoffs.min() | |
| if min_cutoff is None or earliest_cutoff < min_cutoff: | |
| min_cutoff = earliest_cutoff | |
| for cutoff in cutoffs: | |
| plt.axvline(x=cutoff, color='gray', linestyle='--', alpha=0.4) | |
| for j, col in enumerate(forecast_cols): | |
| if col in forecast_data.columns: | |
| model_name = col.replace('_', ' ').title() | |
| plt.plot(forecast_data['ds'], forecast_data[col], | |
| color=colors[j % len(colors)], | |
| linestyle='--', | |
| linewidth=1.5, | |
| label=f'{model_name}') | |
| plt.title(title, fontsize=16) | |
| plt.xlabel('Date', fontsize=12) | |
| plt.ylabel('Value', fontsize=12) | |
| plt.grid(True, alpha=0.3) | |
| plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15), ncol=3, fontsize=10) | |
| plt.tight_layout(rect=[0, 0.05, 1, 0.95]) | |
| if min_cutoff is not None and horizon is not None: | |
| date_offset = calculate_date_offset(freq, horizon) | |
| start_date = min_cutoff - date_offset | |
| max_date = forecast_df['ds'].max() | |
| plt.xlim(start_date, max_date) | |
| plt.annotate('Training | Test', | |
| xy=(min_cutoff, plt.ylim()[0]), | |
| xytext=(0, -40), | |
| textcoords='offset points', | |
| horizontalalignment='center', | |
| fontsize=10) | |
| fig = plt.gcf() | |
| ax = plt.gca() | |
| fig.autofmt_xdate() | |
| return fig | |
| # Function to load and process uploaded CSV | |
| def load_data(file): | |
| if file is None: | |
| return None, "Please upload a CSV file" | |
| try: | |
| df = pd.read_csv(file) | |
| required_cols = ['unique_id', 'ds', 'y'] | |
| missing_cols = [col for col in required_cols if col not in df.columns] | |
| if missing_cols: | |
| return None, f"Missing required columns: {', '.join(missing_cols)}" | |
| df['ds'] = pd.to_datetime(df['ds']) | |
| df = df.sort_values(['unique_id', 'ds']).reset_index(drop=True) | |
| # Check for NaN values | |
| if df['y'].isna().any(): | |
| return None, "Data contains missing values in the 'y' column" | |
| return df, "Data loaded successfully!" | |
| except Exception as e: | |
| return None, f"Error loading data: {str(e)}" | |
| # Main forecasting function | |
| def run_forecast( | |
| file, frequency, eval_strategy, horizon, step_size, num_windows, | |
| use_historical_avg, use_naive, use_seasonal_naive, seasonality, | |
| use_window_avg, window_size, use_seasonal_window_avg, seasonal_window_size, | |
| use_autoets, use_autoarima, use_autoces, use_autotheta, | |
| use_lgbm, use_chronos, use_moirai, | |
| future_horizon | |
| ): | |
| """ | |
| Main function to run forecasting with all selected models. | |
| Now includes proper handling of models that don't support predictors. | |
| """ | |
| try: | |
| # Load data | |
| df, message = load_data(file) | |
| if df is None: | |
| return None, None, None, None, None, [], message | |
| # Prepare data - only required columns for models without predictors | |
| df_basic = df[['unique_id', 'ds', 'y']].copy() | |
| # Initialize models list | |
| models = [] | |
| models_need_predictors = [] | |
| # Basic models (no predictors needed) | |
| if use_historical_avg: | |
| models.append(HistoricAverage()) | |
| if use_naive: | |
| models.append(Naive()) | |
| if use_seasonal_naive: | |
| models.append(SeasonalNaive(season_length=int(seasonality))) | |
| if use_window_avg: | |
| models.append(WindowAverage(window_size=int(window_size))) | |
| if use_seasonal_window_avg: | |
| models.append(SeasonalWindowAverage(season_length=int(seasonality), window_size=int(seasonal_window_size))) | |
| if use_autoets: | |
| models.append(AutoETS(season_length=int(seasonality))) | |
| if use_autoces: | |
| models.append(AutoCES(season_length=int(seasonality))) | |
| if use_autotheta: | |
| models.append(AutoTheta(season_length=int(seasonality))) | |
| # Models that can use predictors | |
| if use_autoarima: | |
| models_need_predictors.append(AutoARIMA(season_length=int(seasonality))) | |
| # Run cross-validation or fixed window | |
| if eval_strategy == "Cross Validation": | |
| h = horizon | |
| validation_results = [] | |
| # Run models without predictors | |
| if models: | |
| sf = StatsForecast(models=models, freq=frequency, n_jobs=-1) | |
| cv_df = sf.cross_validation( | |
| df=df_basic, | |
| h=int(h), | |
| step_size=int(step_size), | |
| n_windows=int(num_windows) | |
| ) | |
| validation_results.append(cv_df) | |
| # Run models with predictors (if needed, add predictor handling here) | |
| # For now, we'll run them without predictors | |
| if models_need_predictors: | |
| sf_pred = StatsForecast(models=models_need_predictors, freq=frequency, n_jobs=-1) | |
| cv_df_pred = sf_pred.cross_validation( | |
| df=df_basic, | |
| h=int(h), | |
| step_size=int(step_size), | |
| n_windows=int(num_windows) | |
| ) | |
| validation_results.append(cv_df_pred) | |
| # Combine results | |
| if validation_results: | |
| validation_df = pd.concat(validation_results, axis=1) | |
| validation_df = validation_df.loc[:,~validation_df.columns.duplicated()] | |
| else: | |
| return None, None, None, None, None, [], "No models selected" | |
| else: # Fixed Window | |
| # Split data | |
| train_df = [] | |
| for uid in df_basic['unique_id'].unique(): | |
| uid_data = df_basic[df_basic['unique_id'] == uid].iloc[:-int(horizon)] | |
| train_df.append(uid_data) | |
| train_df = pd.concat(train_df) | |
| # Fit and predict | |
| all_models = models + models_need_predictors | |
| if all_models: | |
| sf = StatsForecast(models=all_models, freq=frequency, n_jobs=-1) | |
| sf.fit(train_df) | |
| validation_df = sf.predict(h=int(horizon), level=[90, 95]) | |
| else: | |
| return None, None, None, None, None, [], "No models selected" | |
| # Add ML model forecasts if selected | |
| if use_lgbm: | |
| mlf = MLForecast( | |
| models={'LightGBM': LGBMRegressor(verbose=-1)}, | |
| freq=frequency, | |
| lags=[1, 7, 14], | |
| num_threads=1 | |
| ) | |
| if eval_strategy == "Cross Validation": | |
| ml_cv = mlf.cross_validation( | |
| df=df_basic, | |
| h=int(horizon), | |
| step_size=int(step_size), | |
| n_windows=int(num_windows) | |
| ) | |
| validation_df = validation_df.merge(ml_cv, on=['unique_id', 'ds', 'cutoff'], how='outer') | |
| else: | |
| mlf.fit(train_df) | |
| ml_pred = mlf.predict(h=int(horizon)) | |
| validation_df = validation_df.merge(ml_pred, on=['unique_id', 'ds'], how='outer') | |
| # Add foundation model forecasts | |
| if use_chronos and CHRONOS_AVAILABLE: | |
| try: | |
| pipeline = ChronosPipeline.from_pretrained( | |
| "amazon/chronos-t5-tiny", | |
| device_map="auto", | |
| torch_dtype=torch.bfloat16, | |
| ) | |
| chronos_forecasts = [] | |
| for uid in df_basic['unique_id'].unique(): | |
| uid_data = train_df[train_df['unique_id'] == uid]['y'].values | |
| context = torch.tensor(uid_data) | |
| forecast = pipeline.predict(context, prediction_length=int(horizon)) | |
| forecast_median = np.median(forecast[0].numpy(), axis=0) | |
| uid_forecast = pd.DataFrame({ | |
| 'unique_id': uid, | |
| 'ds': pd.date_range( | |
| start=train_df[train_df['unique_id'] == uid]['ds'].max() + pd.Timedelta(days=1), | |
| periods=int(horizon), | |
| freq=frequency | |
| ), | |
| 'Chronos': forecast_median | |
| }) | |
| chronos_forecasts.append(uid_forecast) | |
| chronos_df = pd.concat(chronos_forecasts) | |
| validation_df = validation_df.merge(chronos_df, on=['unique_id', 'ds'], how='outer') | |
| except Exception as e: | |
| print(f"Chronos error: {e}") | |
| # Evaluate models | |
| eval_cols = [col for col in validation_df.columns if col not in ['unique_id', 'ds', 'cutoff', 'y']] | |
| if 'y' not in validation_df.columns: | |
| # Merge with actual values | |
| validation_df = validation_df.merge( | |
| df_basic[['unique_id', 'ds', 'y']], | |
| on=['unique_id', 'ds'], | |
| how='left' | |
| ) | |
| # Calculate metrics | |
| metrics_list = [] | |
| for col in eval_cols: | |
| if col in validation_df.columns and not validation_df[col].isna().all(): | |
| y_true = validation_df['y'].values | |
| y_pred = validation_df[col].values | |
| mask = ~(np.isnan(y_true) | np.isnan(y_pred)) | |
| if mask.sum() > 0: | |
| y_true_clean = y_true[mask] | |
| y_pred_clean = y_pred[mask] | |
| # Calculate RMSE manually | |
| rmse_value = np.sqrt(mean_squared_error(y_true_clean, y_pred_clean)) | |
| metrics_list.append({ | |
| 'Model': col, | |
| 'MAE': mean_absolute_error(y_true_clean, y_pred_clean), | |
| 'RMSE': rmse_value, | |
| 'MAPE': mean_absolute_percentage_error(y_true_clean, y_pred_clean) * 100 | |
| }) | |
| eval_metrics = pd.DataFrame(metrics_list) | |
| # Create validation plot | |
| validation_plot = create_forecast_plot( | |
| validation_df.reset_index() if 'index' not in validation_df.columns else validation_df, | |
| df_basic, | |
| "Validation Results", | |
| horizon, | |
| frequency | |
| ) | |
| # Future forecast | |
| future_models = models + models_need_predictors | |
| if future_models: | |
| sf_future = StatsForecast(models=future_models, freq=frequency, n_jobs=-1) | |
| sf_future.fit(df_basic) | |
| future_df = sf_future.predict(h=int(future_horizon), level=[90, 95]) | |
| else: | |
| future_df = pd.DataFrame() | |
| # Create future forecast plot | |
| future_plot = create_forecast_plot( | |
| future_df.reset_index() if not future_df.empty else pd.DataFrame(), | |
| df_basic, | |
| "Future Forecast", | |
| future_horizon, | |
| frequency | |
| ) | |
| # Export files | |
| export_files = [] | |
| # Save to temp files | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f: | |
| eval_metrics.to_csv(f, index=False) | |
| export_files.append(f.name) | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f: | |
| validation_df.to_csv(f, index=False) | |
| export_files.append(f.name) | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f: | |
| future_df.to_csv(f, index=False) | |
| export_files.append(f.name) | |
| return ( | |
| eval_metrics, | |
| validation_df, | |
| validation_plot, | |
| future_df, | |
| future_plot, | |
| export_files, | |
| " Forecasting completed successfully!" | |
| ) | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"Error: {str(e)}\n\n{traceback.format_exc()}" | |
| return None, None, None, None, None, [], error_msg | |
| # Gradio Interface | |
| with gr.Blocks(title="Duke Energy Forecasting App") as app: | |
| gr.Markdown(""" | |
| # Time Series Forecasting | |
| Upload your time series data and select models to generate forecasts. | |
| Supports StatsForecast, MLForecast, and Foundation Models (Chronos, Moirai). | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File(label="Upload CSV File", file_types=['.csv']) | |
| with gr.Accordion("Forecast Configuration", open=True): | |
| frequency = gr.Dropdown( | |
| choices=[ | |
| ("Hourly", "H"), | |
| ("Daily", "D"), | |
| ("Business Day", "B"), | |
| ("Weekly", "WS"), | |
| ("Monthly", "MS"), | |
| ("Quarterly", "QS"), | |
| ("Yearly", "YS") | |
| ], | |
| label="Data Frequency", | |
| value="D" | |
| ) | |
| eval_strategy = gr.Radio( | |
| choices=["Fixed Window", "Cross Validation"], | |
| label="Evaluation Strategy", | |
| value="Cross Validation" | |
| ) | |
| with gr.Group(visible=True) as fixed_window_box: | |
| gr.Markdown("### Fixed Window Settings") | |
| horizon = gr.Slider(1, 100, value=10, step=1, label="Validation Horizon") | |
| with gr.Group(visible=True) as cv_box: | |
| gr.Markdown("### Cross Validation Settings") | |
| with gr.Row(): | |
| step_size = gr.Slider(1, 50, value=10, step=1, label="Step Size") | |
| num_windows = gr.Slider(1, 20, value=5, step=1, label="Number of Windows") | |
| with gr.Group(): | |
| gr.Markdown("### Future Forecast Settings") | |
| future_horizon = gr.Slider(1, 100, value=10, step=1, label="Future Forecast Horizon") | |
| with gr.Accordion("Model Configuration", open=True): | |
| with gr.Tabs(): | |
| with gr.TabItem("Statistical Models"): | |
| gr.Markdown("## Basic Models") | |
| with gr.Row(): | |
| use_historical_avg = gr.Checkbox(label="Historical Average", value=True) | |
| use_naive = gr.Checkbox(label="Naive", value=True) | |
| with gr.Group(): | |
| gr.Markdown("### Seasonality Configuration") | |
| seasonality = gr.Number(label="Seasonality Period", value=7) | |
| gr.Markdown("### Seasonal Models") | |
| use_seasonal_naive = gr.Checkbox(label="Seasonal Naive", value=True) | |
| gr.Markdown("### Window-based Models") | |
| with gr.Row(): | |
| use_window_avg = gr.Checkbox(label="Window Average", value=False) | |
| window_size = gr.Number(label="Window Size", value=10) | |
| with gr.Row(): | |
| use_seasonal_window_avg = gr.Checkbox(label="Seasonal Window Average", value=False) | |
| seasonal_window_size = gr.Number(label="Seasonal Window Size", value=2) | |
| gr.Markdown("### Advanced Models") | |
| with gr.Row(): | |
| use_autoets = gr.Checkbox(label="AutoETS", value=False) | |
| use_autoarima = gr.Checkbox(label="AutoARIMA", value=False) | |
| with gr.Row(): | |
| use_autoces = gr.Checkbox(label="AutoCES", value=False) | |
| use_autotheta = gr.Checkbox(label="AutoTheta", value=False) | |
| with gr.TabItem("Machine Learning"): | |
| gr.Markdown("## Gradient Boosting Models") | |
| use_lgbm = gr.Checkbox(label="LightGBM", value=True) | |
| with gr.TabItem("Foundation Models"): | |
| gr.Markdown("## State-of-the-Art Foundation Models") | |
| with gr.Row(): | |
| use_chronos = gr.Checkbox( | |
| label="Chronos (Amazon)", | |
| value=CHRONOS_AVAILABLE, | |
| interactive=CHRONOS_AVAILABLE | |
| ) | |
| use_moirai = gr.Checkbox( | |
| label="Moirai (Salesforce)", | |
| value=False, | |
| interactive=MOIRAI_AVAILABLE | |
| ) | |
| if not CHRONOS_AVAILABLE: | |
| gr.Markdown(" Chronos not available. Install: `pip install chronos-forecasting`") | |
| if not MOIRAI_AVAILABLE: | |
| gr.Markdown(" Moirai not available. Install: `pip install uni2ts`") | |
| with gr.Column(scale=3): | |
| message_output = gr.Textbox(label="Status Message") | |
| with gr.Tabs(): | |
| with gr.TabItem("Validation Results"): | |
| eval_output = gr.Dataframe(label="Evaluation Metrics") | |
| validation_plot = gr.Plot(label="Validation Plot") | |
| validation_output = gr.Dataframe(label="Validation Data", visible=False) | |
| with gr.Row(): | |
| show_data_btn = gr.Button("Show Validation Data") | |
| hide_data_btn = gr.Button("Hide Validation Data", visible=False) | |
| with gr.TabItem("Future Forecast"): | |
| forecast_plot = gr.Plot(label="Future Forecast Plot") | |
| forecast_output = gr.Dataframe(label="Future Forecast Data", visible=False) | |
| with gr.Row(): | |
| show_forecast_btn = gr.Button("Show Forecast Data") | |
| hide_forecast_btn = gr.Button("Hide Forecast Data", visible=False) | |
| with gr.TabItem("Export Results"): | |
| export_files = gr.Files(label="Download Results") | |
| with gr.Row(): | |
| submit_btn = gr.Button("Run Validation and Forecast", variant="primary", size="lg") | |
| # Event handlers | |
| def update_eval_boxes(strategy): | |
| return ( | |
| gr.update(visible=strategy == "Fixed Window"), | |
| gr.update(visible=strategy == "Cross Validation") | |
| ) | |
| eval_strategy.change( | |
| fn=update_eval_boxes, | |
| inputs=[eval_strategy], | |
| outputs=[fixed_window_box, cv_box] | |
| ) | |
| def show_data(): | |
| return gr.update(visible=True), gr.update(visible=True), gr.update(visible=False) | |
| def hide_data(): | |
| return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) | |
| show_data_btn.click(fn=show_data, outputs=[validation_output, hide_data_btn, show_data_btn]) | |
| hide_data_btn.click(fn=hide_data, outputs=[validation_output, hide_data_btn, show_data_btn]) | |
| show_forecast_btn.click(fn=show_data, outputs=[forecast_output, hide_forecast_btn, show_forecast_btn]) | |
| hide_forecast_btn.click(fn=hide_data, outputs=[forecast_output, hide_forecast_btn, show_forecast_btn]) | |
| submit_btn.click( | |
| fn=run_forecast, | |
| inputs=[ | |
| file_input, frequency, eval_strategy, horizon, step_size, num_windows, | |
| use_historical_avg, use_naive, use_seasonal_naive, seasonality, | |
| use_window_avg, window_size, use_seasonal_window_avg, seasonal_window_size, | |
| use_autoets, use_autoarima, use_autoces, use_autotheta, | |
| use_lgbm, use_chronos, use_moirai, | |
| future_horizon | |
| ], | |
| outputs=[ | |
| eval_output, | |
| validation_output, | |
| validation_plot, | |
| forecast_output, | |
| forecast_plot, | |
| export_files, | |
| message_output | |
| ] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch(share=True) |