| | """ |
| | Statistical Modeling Module |
| | Advanced statistical analysis for economic indicators including regression, correlation, and diagnostics |
| | """ |
| |
|
| | import logging |
| | from typing import Dict, List, Optional, Tuple, Union |
| |
|
| | import numpy as np |
| | import pandas as pd |
| | from scipy import stats |
| | from sklearn.linear_model import LinearRegression |
| | from sklearn.metrics import r2_score, mean_squared_error |
| | from sklearn.preprocessing import StandardScaler |
| | from statsmodels.stats.diagnostic import het_breuschpagan |
| | from statsmodels.stats.outliers_influence import variance_inflation_factor |
| | from statsmodels.stats.stattools import durbin_watson |
| | from statsmodels.tsa.stattools import adfuller, kpss |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class StatisticalModeling: |
| | """ |
| | Advanced statistical modeling for economic indicators |
| | including regression analysis, correlation analysis, and diagnostic testing |
| | """ |
| | |
| | def __init__(self, data: pd.DataFrame): |
| | """ |
| | Initialize statistical modeling with economic data |
| | |
| | Args: |
| | data: DataFrame with economic indicators |
| | """ |
| | self.data = data.copy() |
| | self.models = {} |
| | self.diagnostics = {} |
| | self.correlations = {} |
| | |
| | def prepare_regression_data(self, target: str, predictors: List[str] = None, |
| | lag_periods: int = 4) -> Tuple[pd.DataFrame, pd.Series]: |
| | """ |
| | Prepare data for regression analysis with lagged variables |
| | |
| | Args: |
| | target: Target variable name |
| | predictors: List of predictor variables. If None, use all other numeric columns |
| | lag_periods: Number of lag periods to include |
| | |
| | Returns: |
| | Tuple of (features DataFrame, target Series) |
| | """ |
| | if target not in self.data.columns: |
| | raise ValueError(f"Target variable {target} not found in data") |
| | |
| | if predictors is None: |
| | predictors = [col for col in self.data.select_dtypes(include=[np.number]).columns |
| | if col != target] |
| | |
| | |
| | growth_data = self.data[[target] + predictors].pct_change().dropna() |
| | |
| | |
| | feature_data = {} |
| | |
| | for predictor in predictors: |
| | |
| | feature_data[predictor] = growth_data[predictor] |
| | |
| | |
| | for lag in range(1, lag_periods + 1): |
| | feature_data[f"{predictor}_lag{lag}"] = growth_data[predictor].shift(lag) |
| | |
| | |
| | for lag in range(1, lag_periods + 1): |
| | feature_data[f"{target}_lag{lag}"] = growth_data[target].shift(lag) |
| | |
| | |
| | features_df = pd.DataFrame(feature_data) |
| | features_df = features_df.dropna() |
| | |
| | |
| | target_series = growth_data[target].iloc[features_df.index] |
| | |
| | return features_df, target_series |
| | |
| | def fit_regression_model(self, target: str, predictors: List[str] = None, |
| | lag_periods: int = 4, include_interactions: bool = False) -> Dict: |
| | """ |
| | Fit linear regression model with diagnostic testing |
| | |
| | Args: |
| | target: Target variable name |
| | predictors: List of predictor variables |
| | lag_periods: Number of lag periods to include |
| | include_interactions: Whether to include interaction terms |
| | |
| | Returns: |
| | Dictionary with model results and diagnostics |
| | """ |
| | |
| | features_df, target_series = self.prepare_regression_data(target, predictors, lag_periods) |
| | |
| | if include_interactions: |
| | |
| | interaction_features = [] |
| | feature_cols = features_df.columns.tolist() |
| | |
| | for i, col1 in enumerate(feature_cols): |
| | for col2 in feature_cols[i+1:]: |
| | interaction_name = f"{col1}_x_{col2}" |
| | interaction_features.append(features_df[col1] * features_df[col2]) |
| | features_df[interaction_name] = interaction_features[-1] |
| | |
| | |
| | scaler = StandardScaler() |
| | features_scaled = scaler.fit_transform(features_df) |
| | features_scaled_df = pd.DataFrame(features_scaled, |
| | index=features_df.index, |
| | columns=features_df.columns) |
| | |
| | |
| | model = LinearRegression() |
| | model.fit(features_scaled_df, target_series) |
| | |
| | |
| | predictions = model.predict(features_scaled_df) |
| | residuals = target_series - predictions |
| | |
| | |
| | r2 = r2_score(target_series, predictions) |
| | mse = mean_squared_error(target_series, predictions) |
| | rmse = np.sqrt(mse) |
| | |
| | |
| | coefficients = pd.DataFrame({ |
| | 'variable': features_df.columns, |
| | 'coefficient': model.coef_, |
| | 'abs_coefficient': np.abs(model.coef_) |
| | }).sort_values('abs_coefficient', ascending=False) |
| | |
| | |
| | diagnostics = self.perform_regression_diagnostics(features_scaled_df, target_series, |
| | predictions, residuals) |
| | |
| | return { |
| | 'model': model, |
| | 'scaler': scaler, |
| | 'features': features_df, |
| | 'target': target_series, |
| | 'predictions': predictions, |
| | 'residuals': residuals, |
| | 'coefficients': coefficients, |
| | 'performance': { |
| | 'r2': r2, |
| | 'mse': mse, |
| | 'rmse': rmse, |
| | 'mae': np.mean(np.abs(residuals)) |
| | }, |
| | 'diagnostics': diagnostics |
| | } |
| | |
| | def perform_regression_diagnostics(self, features: pd.DataFrame, target: pd.Series, |
| | predictions: np.ndarray, residuals: pd.Series) -> Dict: |
| | """ |
| | Perform comprehensive regression diagnostics |
| | |
| | Args: |
| | features: Feature matrix |
| | target: Target variable |
| | predictions: Model predictions |
| | residuals: Model residuals |
| | |
| | Returns: |
| | Dictionary with diagnostic test results |
| | """ |
| | diagnostics = {} |
| | |
| | |
| | try: |
| | normality_stat, normality_p = stats.shapiro(residuals) |
| | diagnostics['normality'] = { |
| | 'statistic': normality_stat, |
| | 'p_value': normality_p, |
| | 'is_normal': normality_p > 0.05 |
| | } |
| | except: |
| | diagnostics['normality'] = {'error': 'Test failed'} |
| | |
| | |
| | try: |
| | bp_stat, bp_p, bp_f, bp_f_p = het_breuschpagan(residuals, features) |
| | diagnostics['homoscedasticity'] = { |
| | 'statistic': bp_stat, |
| | 'p_value': bp_p, |
| | 'f_statistic': bp_f, |
| | 'f_p_value': bp_f_p, |
| | 'is_homoscedastic': bp_p > 0.05 |
| | } |
| | except: |
| | diagnostics['homoscedasticity'] = {'error': 'Test failed'} |
| | |
| | |
| | try: |
| | dw_stat = durbin_watson(residuals) |
| | diagnostics['autocorrelation'] = { |
| | 'statistic': dw_stat, |
| | 'interpretation': self._interpret_durbin_watson(dw_stat) |
| | } |
| | except: |
| | diagnostics['autocorrelation'] = {'error': 'Test failed'} |
| | |
| | |
| | try: |
| | vif_scores = {} |
| | for i, col in enumerate(features.columns): |
| | vif = variance_inflation_factor(features.values, i) |
| | vif_scores[col] = vif |
| | |
| | diagnostics['multicollinearity'] = { |
| | 'vif_scores': vif_scores, |
| | 'high_vif_variables': [var for var, vif in vif_scores.items() if vif > 10], |
| | 'mean_vif': np.mean(list(vif_scores.values())) |
| | } |
| | except: |
| | diagnostics['multicollinearity'] = {'error': 'Test failed'} |
| | |
| | |
| | try: |
| | |
| | adf_result = adfuller(target) |
| | diagnostics['stationarity_adf'] = { |
| | 'statistic': adf_result[0], |
| | 'p_value': adf_result[1], |
| | 'is_stationary': adf_result[1] < 0.05 |
| | } |
| | |
| | |
| | kpss_result = kpss(target, regression='c') |
| | diagnostics['stationarity_kpss'] = { |
| | 'statistic': kpss_result[0], |
| | 'p_value': kpss_result[1], |
| | 'is_stationary': kpss_result[1] > 0.05 |
| | } |
| | except: |
| | diagnostics['stationarity'] = {'error': 'Test failed'} |
| | |
| | return diagnostics |
| | |
| | def _interpret_durbin_watson(self, dw_stat: float) -> str: |
| | """Interpret Durbin-Watson statistic""" |
| | if dw_stat < 1.5: |
| | return "Positive autocorrelation" |
| | elif dw_stat > 2.5: |
| | return "Negative autocorrelation" |
| | else: |
| | return "No significant autocorrelation" |
| | |
| | def analyze_correlations(self, indicators: List[str] = None, |
| | method: str = 'pearson') -> Dict: |
| | """ |
| | Perform comprehensive correlation analysis |
| | |
| | Args: |
| | indicators: List of indicators to analyze. If None, use all numeric columns |
| | method: Correlation method ('pearson', 'spearman', 'kendall') |
| | |
| | Returns: |
| | Dictionary with correlation analysis results |
| | """ |
| | if indicators is None: |
| | indicators = self.data.select_dtypes(include=[np.number]).columns.tolist() |
| | |
| | |
| | growth_data = self.data[indicators].pct_change().dropna() |
| | |
| | |
| | corr_matrix = growth_data.corr(method=method) |
| | |
| | |
| | significant_correlations = [] |
| | for i in range(len(corr_matrix.columns)): |
| | for j in range(i+1, len(corr_matrix.columns)): |
| | var1 = corr_matrix.columns[i] |
| | var2 = corr_matrix.columns[j] |
| | corr_value = corr_matrix.iloc[i, j] |
| | |
| | |
| | n = len(growth_data) |
| | t_stat = corr_value * np.sqrt((n-2) / (1-corr_value**2)) |
| | p_value = 2 * (1 - stats.t.cdf(abs(t_stat), n-2)) |
| | |
| | if p_value < 0.05: |
| | significant_correlations.append({ |
| | 'variable1': var1, |
| | 'variable2': var2, |
| | 'correlation': corr_value, |
| | 'p_value': p_value, |
| | 'strength': self._interpret_correlation_strength(abs(corr_value)) |
| | }) |
| | |
| | |
| | significant_correlations.sort(key=lambda x: abs(x['correlation']), reverse=True) |
| | |
| | |
| | try: |
| | pca = self._perform_pca_analysis(growth_data) |
| | except Exception as e: |
| | logger.warning(f"PCA analysis failed: {e}") |
| | pca = {'error': str(e)} |
| | |
| | return { |
| | 'correlation_matrix': corr_matrix, |
| | 'significant_correlations': significant_correlations, |
| | 'method': method, |
| | 'pca_analysis': pca |
| | } |
| | |
| | def _interpret_correlation_strength(self, corr_value: float) -> str: |
| | """Interpret correlation strength""" |
| | if corr_value >= 0.8: |
| | return "Very Strong" |
| | elif corr_value >= 0.6: |
| | return "Strong" |
| | elif corr_value >= 0.4: |
| | return "Moderate" |
| | elif corr_value >= 0.2: |
| | return "Weak" |
| | else: |
| | return "Very Weak" |
| | |
| | def _perform_pca_analysis(self, data: pd.DataFrame) -> Dict: |
| | """Perform Principal Component Analysis""" |
| | from sklearn.decomposition import PCA |
| | |
| | |
| | scaler = StandardScaler() |
| | data_scaled = scaler.fit_transform(data) |
| | |
| | |
| | pca = PCA() |
| | pca_result = pca.fit_transform(data_scaled) |
| | |
| | |
| | explained_variance = pca.explained_variance_ratio_ |
| | cumulative_variance = np.cumsum(explained_variance) |
| | |
| | |
| | loadings = pd.DataFrame( |
| | pca.components_.T, |
| | columns=[f'PC{i+1}' for i in range(pca.n_components_)], |
| | index=data.columns |
| | ) |
| | |
| | return { |
| | 'explained_variance': explained_variance, |
| | 'cumulative_variance': cumulative_variance, |
| | 'loadings': loadings, |
| | 'n_components': pca.n_components_, |
| | 'components_to_explain_80_percent': np.argmax(cumulative_variance >= 0.8) + 1 |
| | } |
| | |
| | def perform_granger_causality(self, target: str, predictor: str, |
| | max_lags: int = 4) -> Dict: |
| | """ |
| | Perform Granger causality test |
| | |
| | Args: |
| | target: Target variable |
| | predictor: Predictor variable |
| | max_lags: Maximum number of lags to test |
| | |
| | Returns: |
| | Dictionary with Granger causality test results |
| | """ |
| | try: |
| | from statsmodels.tsa.stattools import grangercausalitytests |
| | |
| | |
| | growth_data = self.data[[target, predictor]].pct_change().dropna() |
| | |
| | |
| | test_data = growth_data[[predictor, target]] |
| | gc_result = grangercausalitytests(test_data, maxlag=max_lags, verbose=False) |
| | |
| | |
| | results = {} |
| | for lag in range(1, max_lags + 1): |
| | if lag in gc_result: |
| | lag_result = gc_result[lag] |
| | results[lag] = { |
| | 'f_statistic': lag_result[0]['ssr_ftest'][0], |
| | 'p_value': lag_result[0]['ssr_ftest'][1], |
| | 'is_significant': lag_result[0]['ssr_ftest'][1] < 0.05 |
| | } |
| | |
| | |
| | min_p_value = min([result['p_value'] for result in results.values()]) |
| | overall_significant = min_p_value < 0.05 |
| | |
| | return { |
| | 'results_by_lag': results, |
| | 'min_p_value': min_p_value, |
| | 'is_causal': overall_significant, |
| | 'optimal_lag': min(results.keys(), key=lambda k: results[k]['p_value']) |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Granger causality test failed: {e}") |
| | return {'error': str(e)} |
| | |
| | def generate_statistical_report(self, regression_results: Dict = None, |
| | correlation_results: Dict = None, |
| | causality_results: Dict = None) -> str: |
| | """ |
| | Generate comprehensive statistical analysis report |
| | |
| | Args: |
| | regression_results: Results from regression analysis |
| | correlation_results: Results from correlation analysis |
| | causality_results: Results from causality analysis |
| | |
| | Returns: |
| | Formatted report string |
| | """ |
| | report = "STATISTICAL MODELING REPORT\n" |
| | report += "=" * 50 + "\n\n" |
| | |
| | if regression_results: |
| | report += "REGRESSION ANALYSIS\n" |
| | report += "-" * 30 + "\n" |
| | |
| | |
| | performance = regression_results['performance'] |
| | report += f"Model Performance:\n" |
| | report += f" R²: {performance['r2']:.4f}\n" |
| | report += f" RMSE: {performance['rmse']:.4f}\n" |
| | report += f" MAE: {performance['mae']:.4f}\n\n" |
| | |
| | |
| | coefficients = regression_results['coefficients'] |
| | report += f"Top 5 Most Important Variables:\n" |
| | for i, row in coefficients.head().iterrows(): |
| | report += f" {row['variable']}: {row['coefficient']:.4f}\n" |
| | report += "\n" |
| | |
| | |
| | diagnostics = regression_results['diagnostics'] |
| | report += f"Model Diagnostics:\n" |
| | |
| | if 'normality' in diagnostics and 'error' not in diagnostics['normality']: |
| | norm = diagnostics['normality'] |
| | report += f" Normality (Shapiro-Wilk): p={norm['p_value']:.4f} " |
| | report += f"({'Normal' if norm['is_normal'] else 'Not Normal'})\n" |
| | |
| | if 'homoscedasticity' in diagnostics and 'error' not in diagnostics['homoscedasticity']: |
| | hom = diagnostics['homoscedasticity'] |
| | report += f" Homoscedasticity (Breusch-Pagan): p={hom['p_value']:.4f} " |
| | report += f"({'Homoscedastic' if hom['is_homoscedastic'] else 'Heteroscedastic'})\n" |
| | |
| | if 'autocorrelation' in diagnostics and 'error' not in diagnostics['autocorrelation']: |
| | autocorr = diagnostics['autocorrelation'] |
| | report += f" Autocorrelation (Durbin-Watson): {autocorr['statistic']:.4f} " |
| | report += f"({autocorr['interpretation']})\n" |
| | |
| | if 'multicollinearity' in diagnostics and 'error' not in diagnostics['multicollinearity']: |
| | mult = diagnostics['multicollinearity'] |
| | report += f" Multicollinearity (VIF): Mean VIF = {mult['mean_vif']:.2f}\n" |
| | if mult['high_vif_variables']: |
| | report += f" High VIF variables: {', '.join(mult['high_vif_variables'])}\n" |
| | |
| | report += "\n" |
| | |
| | if correlation_results: |
| | report += "CORRELATION ANALYSIS\n" |
| | report += "-" * 30 + "\n" |
| | report += f"Method: {correlation_results['method'].title()}\n" |
| | report += f"Significant Correlations: {len(correlation_results['significant_correlations'])}\n\n" |
| | |
| | |
| | report += f"Top 5 Strongest Correlations:\n" |
| | for i, corr in enumerate(correlation_results['significant_correlations'][:5]): |
| | report += f" {corr['variable1']} ↔ {corr['variable2']}: " |
| | report += f"{corr['correlation']:.4f} ({corr['strength']}, p={corr['p_value']:.4f})\n" |
| | |
| | |
| | if 'pca_analysis' in correlation_results and 'error' not in correlation_results['pca_analysis']: |
| | pca = correlation_results['pca_analysis'] |
| | report += f"\nPrincipal Component Analysis:\n" |
| | report += f" Components to explain 80% variance: {pca['components_to_explain_80_percent']}\n" |
| | report += f" Total components: {pca['n_components']}\n" |
| | |
| | report += "\n" |
| | |
| | if causality_results: |
| | report += "GRANGER CAUSALITY ANALYSIS\n" |
| | report += "-" * 30 + "\n" |
| | |
| | for target, results in causality_results.items(): |
| | if 'error' not in results: |
| | report += f"{target}:\n" |
| | report += f" Is causal: {results['is_causal']}\n" |
| | report += f" Minimum p-value: {results['min_p_value']:.4f}\n" |
| | report += f" Optimal lag: {results['optimal_lag']}\n\n" |
| | |
| | return report |