| """ |
| Economic Forecasting Module |
| Advanced time series forecasting for economic indicators using ARIMA/ETS models |
| """ |
|
|
| import logging |
| import warnings |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Optional, Tuple, Union |
|
|
| import numpy as np |
| import pandas as pd |
| from scipy import stats |
| from sklearn.metrics import mean_absolute_error, mean_squared_error |
| from statsmodels.tsa.arima.model import ARIMA |
| from statsmodels.tsa.holtwinters import ExponentialSmoothing |
| from statsmodels.tsa.seasonal import seasonal_decompose |
| from statsmodels.tsa.stattools import adfuller |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class EconomicForecaster: |
| """ |
| Advanced economic forecasting using ARIMA and ETS models |
| with comprehensive backtesting and performance evaluation |
| """ |
| |
| def __init__(self, data: pd.DataFrame): |
| """ |
| Initialize forecaster with economic data |
| |
| Args: |
| data: DataFrame with economic indicators (GDPC1, INDPRO, RSAFS, etc.) |
| """ |
| self.data = data.copy() |
| self.forecasts = {} |
| self.backtest_results = {} |
| self.model_performance = {} |
| |
| def prepare_data(self, target_series: str, frequency: str = 'Q') -> pd.Series: |
| """ |
| Prepare time series data for forecasting |
| |
| Args: |
| target_series: Series name to forecast |
| frequency: Data frequency ('Q' for quarterly, 'M' for monthly) |
| |
| Returns: |
| Prepared time series |
| """ |
| if target_series not in self.data.columns: |
| raise ValueError(f"Series {target_series} not found in data") |
| |
| series = self.data[target_series].dropna() |
| |
| |
| if frequency == 'Q': |
| series = series.resample('Q').mean() |
| elif frequency == 'M': |
| series = series.resample('M').mean() |
| |
| |
| if target_series in ['GDPC1', 'INDPRO', 'RSAFS']: |
| series = series.pct_change().dropna() |
| |
| return series |
| |
| def check_stationarity(self, series: pd.Series) -> Dict: |
| """ |
| Perform Augmented Dickey-Fuller test for stationarity |
| |
| Args: |
| series: Time series to test |
| |
| Returns: |
| Dictionary with test results |
| """ |
| result = adfuller(series.dropna()) |
| |
| return { |
| 'adf_statistic': result[0], |
| 'p_value': result[1], |
| 'critical_values': result[4], |
| 'is_stationary': result[1] < 0.05 |
| } |
| |
| def decompose_series(self, series: pd.Series, period: int = 4) -> Dict: |
| """ |
| Decompose time series into trend, seasonal, and residual components |
| |
| Args: |
| series: Time series to decompose |
| period: Seasonal period (4 for quarterly, 12 for monthly) |
| |
| Returns: |
| Dictionary with decomposition components |
| """ |
| decomposition = seasonal_decompose(series.dropna(), period=period, extrapolate_trend='freq') |
| |
| return { |
| 'trend': decomposition.trend, |
| 'seasonal': decomposition.seasonal, |
| 'residual': decomposition.resid, |
| 'observed': decomposition.observed |
| } |
| |
| def fit_arima_model(self, series: pd.Series, order: Tuple[int, int, int] = None) -> ARIMA: |
| """ |
| Fit ARIMA model to time series |
| |
| Args: |
| series: Time series data |
| order: ARIMA order (p, d, q). If None, auto-detect |
| |
| Returns: |
| Fitted ARIMA model |
| """ |
| if order is None: |
| |
| best_aic = np.inf |
| best_order = (1, 1, 1) |
| |
| for p in range(0, 3): |
| for d in range(0, 2): |
| for q in range(0, 3): |
| try: |
| model = ARIMA(series, order=(p, d, q)) |
| fitted_model = model.fit() |
| if fitted_model.aic < best_aic: |
| best_aic = fitted_model.aic |
| best_order = (p, d, q) |
| except: |
| continue |
| |
| order = best_order |
| logger.info(f"Auto-detected ARIMA order: {order}") |
| |
| model = ARIMA(series, order=order) |
| fitted_model = model.fit() |
| |
| return fitted_model |
| |
| def fit_ets_model(self, series: pd.Series, seasonal_periods: int = 4) -> ExponentialSmoothing: |
| """ |
| Fit ETS (Exponential Smoothing) model to time series |
| |
| Args: |
| series: Time series data |
| seasonal_periods: Number of seasonal periods |
| |
| Returns: |
| Fitted ETS model |
| """ |
| model = ExponentialSmoothing( |
| series, |
| seasonal_periods=seasonal_periods, |
| trend='add', |
| seasonal='add' |
| ) |
| fitted_model = model.fit() |
| |
| return fitted_model |
| |
| def forecast_series(self, series: pd.Series, model_type: str = 'auto', |
| forecast_periods: int = 4) -> Dict: |
| """ |
| Forecast time series using specified model |
| |
| Args: |
| series: Time series to forecast |
| model_type: 'arima', 'ets', or 'auto' |
| forecast_periods: Number of periods to forecast |
| |
| Returns: |
| Dictionary with forecast results |
| """ |
| if model_type == 'auto': |
| |
| try: |
| arima_model = self.fit_arima_model(series) |
| arima_aic = arima_model.aic |
| except: |
| arima_aic = np.inf |
| |
| try: |
| ets_model = self.fit_ets_model(series) |
| ets_aic = ets_model.aic |
| except: |
| ets_aic = np.inf |
| |
| if arima_aic < ets_aic: |
| model_type = 'arima' |
| model = arima_model |
| else: |
| model_type = 'ets' |
| model = ets_model |
| elif model_type == 'arima': |
| model = self.fit_arima_model(series) |
| elif model_type == 'ets': |
| model = self.fit_ets_model(series) |
| else: |
| raise ValueError("model_type must be 'arima', 'ets', or 'auto'") |
| |
| |
| forecast = model.forecast(steps=forecast_periods) |
| |
| |
| if model_type == 'arima': |
| forecast_ci = model.get_forecast(steps=forecast_periods).conf_int() |
| else: |
| |
| forecast_std = series.std() |
| forecast_ci = pd.DataFrame({ |
| 'lower': forecast - 1.96 * forecast_std, |
| 'upper': forecast + 1.96 * forecast_std |
| }) |
| |
| return { |
| 'model': model, |
| 'model_type': model_type, |
| 'forecast': forecast, |
| 'confidence_intervals': forecast_ci, |
| 'aic': model.aic if hasattr(model, 'aic') else None |
| } |
| |
| def backtest_forecast(self, series: pd.Series, model_type: str = 'auto', |
| train_size: float = 0.8, test_periods: int = 8) -> Dict: |
| """ |
| Perform backtesting of forecasting models |
| |
| Args: |
| series: Time series to backtest |
| model_type: Model type to use |
| train_size: Proportion of data for training |
| test_periods: Number of periods to test |
| |
| Returns: |
| Dictionary with backtest results |
| """ |
| n = len(series) |
| train_end = int(n * train_size) |
| |
| actual_values = [] |
| predicted_values = [] |
| errors = [] |
| |
| for i in range(test_periods): |
| if train_end + i >= n: |
| break |
| |
| |
| train_data = series.iloc[:train_end + i] |
| test_value = series.iloc[train_end + i] |
| |
| try: |
| forecast_result = self.forecast_series(train_data, model_type, 1) |
| prediction = forecast_result['forecast'].iloc[0] |
| |
| actual_values.append(test_value) |
| predicted_values.append(prediction) |
| errors.append(test_value - prediction) |
| |
| except Exception as e: |
| logger.warning(f"Forecast failed at step {i}: {e}") |
| continue |
| |
| if not actual_values: |
| return {'error': 'No successful forecasts generated'} |
| |
| |
| mae = mean_absolute_error(actual_values, predicted_values) |
| mse = mean_squared_error(actual_values, predicted_values) |
| rmse = np.sqrt(mse) |
| mape = np.mean(np.abs(np.array(actual_values) - np.array(predicted_values)) / np.abs(actual_values)) * 100 |
| |
| return { |
| 'actual_values': actual_values, |
| 'predicted_values': predicted_values, |
| 'errors': errors, |
| 'mae': mae, |
| 'mse': mse, |
| 'rmse': rmse, |
| 'mape': mape, |
| 'test_periods': len(actual_values) |
| } |
| |
| def forecast_economic_indicators(self, indicators: List[str] = None) -> Dict: |
| """ |
| Forecast multiple economic indicators |
| |
| Args: |
| indicators: List of indicators to forecast. If None, use default set |
| |
| Returns: |
| Dictionary with forecasts for all indicators |
| """ |
| if indicators is None: |
| indicators = ['GDPC1', 'INDPRO', 'RSAFS'] |
| |
| results = {} |
| |
| for indicator in indicators: |
| try: |
| |
| series = self.prepare_data(indicator) |
| |
| |
| stationarity = self.check_stationarity(series) |
| |
| |
| decomposition = self.decompose_series(series) |
| |
| |
| forecast_result = self.forecast_series(series) |
| |
| |
| backtest_result = self.backtest_forecast(series) |
| |
| results[indicator] = { |
| 'stationarity': stationarity, |
| 'decomposition': decomposition, |
| 'forecast': forecast_result, |
| 'backtest': backtest_result, |
| 'series': series |
| } |
| |
| logger.info(f"Successfully forecasted {indicator}") |
| |
| except Exception as e: |
| logger.error(f"Failed to forecast {indicator}: {e}") |
| results[indicator] = {'error': str(e)} |
| |
| return results |
| |
| def generate_forecast_report(self, forecasts: Dict) -> str: |
| """ |
| Generate comprehensive forecast report |
| |
| Args: |
| forecasts: Dictionary with forecast results |
| |
| Returns: |
| Formatted report string |
| """ |
| report = "ECONOMIC FORECASTING REPORT\n" |
| report += "=" * 50 + "\n\n" |
| |
| for indicator, result in forecasts.items(): |
| if 'error' in result: |
| report += f"{indicator}: ERROR - {result['error']}\n\n" |
| continue |
| |
| report += f"INDICATOR: {indicator}\n" |
| report += "-" * 30 + "\n" |
| |
| |
| stationarity = result['stationarity'] |
| report += f"Stationarity Test (ADF):\n" |
| report += f" ADF Statistic: {stationarity['adf_statistic']:.4f}\n" |
| report += f" P-value: {stationarity['p_value']:.4f}\n" |
| report += f" Is Stationary: {stationarity['is_stationary']}\n\n" |
| |
| |
| forecast = result['forecast'] |
| report += f"Model: {forecast['model_type'].upper()}\n" |
| if forecast['aic']: |
| report += f"AIC: {forecast['aic']:.4f}\n" |
| report += f"Forecast Periods: {len(forecast['forecast'])}\n\n" |
| |
| |
| backtest = result['backtest'] |
| if 'error' not in backtest: |
| report += f"Backtest Performance:\n" |
| report += f" MAE: {backtest['mae']:.4f}\n" |
| report += f" RMSE: {backtest['rmse']:.4f}\n" |
| report += f" MAPE: {backtest['mape']:.2f}%\n" |
| report += f" Test Periods: {backtest['test_periods']}\n\n" |
| |
| |
| report += f"Forecast Values:\n" |
| for i, value in enumerate(forecast['forecast']): |
| ci = forecast['confidence_intervals'] |
| lower = ci.iloc[i]['lower'] if 'lower' in ci.columns else 'N/A' |
| upper = ci.iloc[i]['upper'] if 'upper' in ci.columns else 'N/A' |
| report += f" Period {i+1}: {value:.4f} [{lower:.4f}, {upper:.4f}]\n" |
| |
| report += "\n" + "=" * 50 + "\n\n" |
| |
| return report |