| | """ |
| | Comprehensive Analytics Pipeline |
| | Orchestrates advanced analytics including forecasting, segmentation, statistical modeling, and insights |
| | """ |
| |
|
| | import logging |
| | import os |
| | from datetime import datetime |
| | from typing import Dict, List, Optional, Tuple |
| |
|
| | import matplotlib.pyplot as plt |
| | import numpy as np |
| | import pandas as pd |
| | import seaborn as sns |
| | from pathlib import Path |
| |
|
| | from src.analysis.economic_forecasting import EconomicForecaster |
| | from src.analysis.economic_segmentation import EconomicSegmentation |
| | from src.analysis.statistical_modeling import StatisticalModeling |
| | from src.core.enhanced_fred_client import EnhancedFREDClient |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class ComprehensiveAnalytics: |
| | """ |
| | Comprehensive analytics pipeline for economic data analysis |
| | combining forecasting, segmentation, statistical modeling, and insights extraction |
| | """ |
| | |
| | def __init__(self, api_key: str, output_dir: str = "data/exports"): |
| | """ |
| | Initialize comprehensive analytics pipeline |
| | |
| | Args: |
| | api_key: FRED API key |
| | output_dir: Output directory for results |
| | """ |
| | self.client = EnhancedFREDClient(api_key) |
| | self.output_dir = Path(output_dir) |
| | self.output_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | self.forecaster = None |
| | self.segmentation = None |
| | self.statistical_modeling = None |
| | |
| | |
| | self.data = None |
| | self.results = {} |
| | self.reports = {} |
| | |
| | def run_complete_analysis(self, indicators: List[str] = None, |
| | start_date: str = '1990-01-01', |
| | end_date: str = None, |
| | forecast_periods: int = 4, |
| | include_visualizations: bool = True) -> Dict: |
| | """ |
| | Run complete advanced analytics pipeline |
| | |
| | Args: |
| | indicators: List of economic indicators to analyze |
| | start_date: Start date for analysis |
| | end_date: End date for analysis |
| | forecast_periods: Number of periods to forecast |
| | include_visualizations: Whether to generate visualizations |
| | |
| | Returns: |
| | Dictionary with all analysis results |
| | """ |
| | logger.info("Starting comprehensive economic analytics pipeline") |
| | |
| | |
| | logger.info("Step 1: Collecting economic data") |
| | self.data = self.client.fetch_economic_data( |
| | indicators=indicators, |
| | start_date=start_date, |
| | end_date=end_date, |
| | frequency='auto' |
| | ) |
| | |
| | |
| | logger.info("Step 2: Assessing data quality") |
| | quality_report = self.client.validate_data_quality(self.data) |
| | self.results['data_quality'] = quality_report |
| | |
| | |
| | logger.info("Step 3: Initializing analytics modules") |
| | self.forecaster = EconomicForecaster(self.data) |
| | self.segmentation = EconomicSegmentation(self.data) |
| | self.statistical_modeling = StatisticalModeling(self.data) |
| | |
| | |
| | logger.info("Step 4: Performing statistical modeling") |
| | statistical_results = self._run_statistical_analysis() |
| | self.results['statistical_modeling'] = statistical_results |
| | |
| | |
| | logger.info("Step 5: Performing economic forecasting") |
| | forecasting_results = self._run_forecasting_analysis(forecast_periods) |
| | self.results['forecasting'] = forecasting_results |
| | |
| | |
| | logger.info("Step 6: Performing economic segmentation") |
| | segmentation_results = self._run_segmentation_analysis() |
| | self.results['segmentation'] = segmentation_results |
| | |
| | |
| | logger.info("Step 7: Extracting insights") |
| | insights = self._extract_insights() |
| | self.results['insights'] = insights |
| | |
| | |
| | logger.info("Step 8: Generating reports and visualizations") |
| | if include_visualizations: |
| | self._generate_visualizations() |
| | |
| | self._generate_comprehensive_report() |
| | |
| | logger.info("Comprehensive analytics pipeline completed successfully") |
| | return self.results |
| | |
| | def _run_statistical_analysis(self) -> Dict: |
| | """Run comprehensive statistical analysis""" |
| | results = {} |
| | |
| | |
| | logger.info(" - Performing correlation analysis") |
| | correlation_results = self.statistical_modeling.analyze_correlations() |
| | results['correlation'] = correlation_results |
| | |
| | |
| | key_indicators = ['GDPC1', 'INDPRO', 'RSAFS'] |
| | regression_results = {} |
| | |
| | for target in key_indicators: |
| | if target in self.data.columns: |
| | logger.info(f" - Fitting regression model for {target}") |
| | try: |
| | regression_result = self.statistical_modeling.fit_regression_model( |
| | target=target, |
| | lag_periods=4, |
| | include_interactions=False |
| | ) |
| | regression_results[target] = regression_result |
| | except Exception as e: |
| | logger.warning(f"Regression failed for {target}: {e}") |
| | regression_results[target] = {'error': str(e)} |
| | |
| | results['regression'] = regression_results |
| | |
| | |
| | logger.info(" - Performing Granger causality analysis") |
| | causality_results = {} |
| | for target in key_indicators: |
| | if target in self.data.columns: |
| | causality_results[target] = {} |
| | for predictor in self.data.columns: |
| | if predictor != target: |
| | try: |
| | causality_result = self.statistical_modeling.perform_granger_causality( |
| | target=target, |
| | predictor=predictor, |
| | max_lags=4 |
| | ) |
| | causality_results[target][predictor] = causality_result |
| | except Exception as e: |
| | logger.warning(f"Causality test failed for {target} -> {predictor}: {e}") |
| | causality_results[target][predictor] = {'error': str(e)} |
| | |
| | results['causality'] = causality_results |
| | |
| | return results |
| | |
| | def _run_forecasting_analysis(self, forecast_periods: int) -> Dict: |
| | """Run comprehensive forecasting analysis""" |
| | logger.info(" - Forecasting economic indicators") |
| | |
| | |
| | key_indicators = ['GDPC1', 'INDPRO', 'RSAFS'] |
| | available_indicators = [ind for ind in key_indicators if ind in self.data.columns] |
| | |
| | if not available_indicators: |
| | logger.warning("No key indicators available for forecasting") |
| | return {'error': 'No suitable indicators for forecasting'} |
| | |
| | |
| | forecasting_results = self.forecaster.forecast_economic_indicators(available_indicators) |
| | |
| | return forecasting_results |
| | |
| | def _run_segmentation_analysis(self) -> Dict: |
| | """Run comprehensive segmentation analysis""" |
| | results = {} |
| | |
| | |
| | logger.info(" - Clustering time periods") |
| | try: |
| | time_period_clusters = self.segmentation.cluster_time_periods( |
| | indicators=['GDPC1', 'INDPRO', 'RSAFS'], |
| | method='kmeans' |
| | ) |
| | results['time_period_clusters'] = time_period_clusters |
| | except Exception as e: |
| | logger.warning(f"Time period clustering failed: {e}") |
| | results['time_period_clusters'] = {'error': str(e)} |
| | |
| | |
| | logger.info(" - Clustering economic series") |
| | try: |
| | series_clusters = self.segmentation.cluster_economic_series( |
| | indicators=['GDPC1', 'INDPRO', 'RSAFS', 'CPIAUCSL', 'FEDFUNDS', 'DGS10'], |
| | method='kmeans' |
| | ) |
| | results['series_clusters'] = series_clusters |
| | except Exception as e: |
| | logger.warning(f"Series clustering failed: {e}") |
| | results['series_clusters'] = {'error': str(e)} |
| | |
| | return results |
| | |
| | def _extract_insights(self) -> Dict: |
| | """Extract key insights from all analyses""" |
| | insights = { |
| | 'key_findings': [], |
| | 'economic_indicators': {}, |
| | 'forecasting_insights': [], |
| | 'segmentation_insights': [], |
| | 'statistical_insights': [] |
| | } |
| | |
| | |
| | if 'forecasting' in self.results: |
| | forecasting_results = self.results['forecasting'] |
| | for indicator, result in forecasting_results.items(): |
| | if 'error' not in result: |
| | |
| | backtest = result.get('backtest', {}) |
| | if 'error' not in backtest: |
| | mape = backtest.get('mape', 0) |
| | if mape < 5: |
| | insights['forecasting_insights'].append( |
| | f"{indicator} forecasting shows excellent accuracy (MAPE: {mape:.2f}%)" |
| | ) |
| | elif mape < 10: |
| | insights['forecasting_insights'].append( |
| | f"{indicator} forecasting shows good accuracy (MAPE: {mape:.2f}%)" |
| | ) |
| | else: |
| | insights['forecasting_insights'].append( |
| | f"{indicator} forecasting shows moderate accuracy (MAPE: {mape:.2f}%)" |
| | ) |
| | |
| | |
| | stationarity = result.get('stationarity', {}) |
| | if 'is_stationary' in stationarity: |
| | if stationarity['is_stationary']: |
| | insights['forecasting_insights'].append( |
| | f"{indicator} series is stationary, suitable for time series modeling" |
| | ) |
| | else: |
| | insights['forecasting_insights'].append( |
| | f"{indicator} series is non-stationary, may require differencing" |
| | ) |
| | |
| | |
| | if 'segmentation' in self.results: |
| | segmentation_results = self.results['segmentation'] |
| | |
| | |
| | if 'time_period_clusters' in segmentation_results: |
| | time_clusters = segmentation_results['time_period_clusters'] |
| | if 'error' not in time_clusters: |
| | n_clusters = time_clusters.get('n_clusters', 0) |
| | insights['segmentation_insights'].append( |
| | f"Time periods clustered into {n_clusters} distinct economic regimes" |
| | ) |
| | |
| | |
| | if 'series_clusters' in segmentation_results: |
| | series_clusters = segmentation_results['series_clusters'] |
| | if 'error' not in series_clusters: |
| | n_clusters = series_clusters.get('n_clusters', 0) |
| | insights['segmentation_insights'].append( |
| | f"Economic series clustered into {n_clusters} groups based on behavior patterns" |
| | ) |
| | |
| | |
| | if 'statistical_modeling' in self.results: |
| | stat_results = self.results['statistical_modeling'] |
| | |
| | |
| | if 'correlation' in stat_results: |
| | corr_results = stat_results['correlation'] |
| | significant_correlations = corr_results.get('significant_correlations', []) |
| | |
| | if significant_correlations: |
| | strongest_corr = significant_correlations[0] |
| | insights['statistical_insights'].append( |
| | f"Strongest correlation: {strongest_corr['variable1']} ↔ {strongest_corr['variable2']} " |
| | f"(r={strongest_corr['correlation']:.3f})" |
| | ) |
| | |
| | |
| | if 'regression' in stat_results: |
| | reg_results = stat_results['regression'] |
| | for target, result in reg_results.items(): |
| | if 'error' not in result: |
| | performance = result.get('performance', {}) |
| | r2 = performance.get('r2', 0) |
| | if r2 > 0.7: |
| | insights['statistical_insights'].append( |
| | f"{target} regression model shows strong explanatory power (R² = {r2:.3f})" |
| | ) |
| | elif r2 > 0.5: |
| | insights['statistical_insights'].append( |
| | f"{target} regression model shows moderate explanatory power (R² = {r2:.3f})" |
| | ) |
| | |
| | |
| | insights['key_findings'] = [ |
| | f"Analysis covers {len(self.data.columns)} economic indicators from {self.data.index.min().strftime('%Y-%m')} to {self.data.index.max().strftime('%Y-%m')}", |
| | f"Dataset contains {len(self.data)} observations with {self.data.shape[0] * self.data.shape[1]} total data points", |
| | f"Generated {len(insights['forecasting_insights'])} forecasting insights", |
| | f"Generated {len(insights['segmentation_insights'])} segmentation insights", |
| | f"Generated {len(insights['statistical_insights'])} statistical insights" |
| | ] |
| | |
| | return insights |
| | |
| | def _generate_visualizations(self): |
| | """Generate comprehensive visualizations""" |
| | logger.info("Generating visualizations") |
| | |
| | |
| | plt.style.use('seaborn-v0_8') |
| | sns.set_palette("husl") |
| | |
| | |
| | self._plot_time_series() |
| | |
| | |
| | self._plot_correlation_heatmap() |
| | |
| | |
| | self._plot_forecasting_results() |
| | |
| | |
| | self._plot_segmentation_results() |
| | |
| | |
| | self._plot_statistical_diagnostics() |
| | |
| | logger.info("Visualizations generated successfully") |
| | |
| | def _plot_time_series(self): |
| | """Plot time series of economic indicators""" |
| | fig, axes = plt.subplots(3, 2, figsize=(15, 12)) |
| | axes = axes.flatten() |
| | |
| | key_indicators = ['GDPC1', 'INDPRO', 'RSAFS', 'CPIAUCSL', 'FEDFUNDS', 'DGS10'] |
| | |
| | for i, indicator in enumerate(key_indicators): |
| | if indicator in self.data.columns and i < len(axes): |
| | series = self.data[indicator].dropna() |
| | axes[i].plot(series.index, series.values, linewidth=1.5) |
| | axes[i].set_title(f'{indicator} - {self.client.ECONOMIC_INDICATORS.get(indicator, indicator)}') |
| | axes[i].set_xlabel('Date') |
| | axes[i].set_ylabel('Value') |
| | axes[i].grid(True, alpha=0.3) |
| | |
| | plt.tight_layout() |
| | plt.savefig(self.output_dir / 'economic_indicators_time_series.png', dpi=300, bbox_inches='tight') |
| | plt.close() |
| | |
| | def _plot_correlation_heatmap(self): |
| | """Plot correlation heatmap""" |
| | if 'statistical_modeling' in self.results: |
| | corr_results = self.results['statistical_modeling'].get('correlation', {}) |
| | if 'correlation_matrix' in corr_results: |
| | corr_matrix = corr_results['correlation_matrix'] |
| | |
| | plt.figure(figsize=(12, 10)) |
| | mask = np.triu(np.ones_like(corr_matrix, dtype=bool)) |
| | sns.heatmap(corr_matrix, mask=mask, annot=True, cmap='RdBu_r', center=0, |
| | square=True, linewidths=0.5, cbar_kws={"shrink": .8}) |
| | plt.title('Economic Indicators Correlation Matrix') |
| | plt.tight_layout() |
| | plt.savefig(self.output_dir / 'correlation_heatmap.png', dpi=300, bbox_inches='tight') |
| | plt.close() |
| | |
| | def _plot_forecasting_results(self): |
| | """Plot forecasting results""" |
| | if 'forecasting' in self.results: |
| | forecasting_results = self.results['forecasting'] |
| | |
| | n_indicators = len([k for k, v in forecasting_results.items() if 'error' not in v]) |
| | if n_indicators > 0: |
| | fig, axes = plt.subplots(n_indicators, 1, figsize=(15, 5*n_indicators)) |
| | if n_indicators == 1: |
| | axes = [axes] |
| | |
| | i = 0 |
| | for indicator, result in forecasting_results.items(): |
| | if 'error' not in result and i < len(axes): |
| | series = result.get('series', pd.Series()) |
| | forecast = result.get('forecast', {}) |
| | |
| | if not series.empty and 'forecast' in forecast: |
| | |
| | axes[i].plot(series.index, series.values, label='Historical', linewidth=2) |
| | |
| | |
| | if hasattr(forecast['forecast'], 'index'): |
| | forecast_values = forecast['forecast'] |
| | forecast_index = pd.date_range( |
| | start=series.index[-1] + pd.DateOffset(months=3), |
| | periods=len(forecast_values), |
| | freq='Q' |
| | ) |
| | axes[i].plot(forecast_index, forecast_values, 'r--', |
| | label='Forecast', linewidth=2) |
| | |
| | axes[i].set_title(f'{indicator} - Forecast') |
| | axes[i].set_xlabel('Date') |
| | axes[i].set_ylabel('Growth Rate') |
| | axes[i].legend() |
| | axes[i].grid(True, alpha=0.3) |
| | i += 1 |
| | |
| | plt.tight_layout() |
| | plt.savefig(self.output_dir / 'forecasting_results.png', dpi=300, bbox_inches='tight') |
| | plt.close() |
| | |
| | def _plot_segmentation_results(self): |
| | """Plot segmentation results""" |
| | if 'segmentation' in self.results: |
| | segmentation_results = self.results['segmentation'] |
| | |
| | |
| | if 'time_period_clusters' in segmentation_results: |
| | time_clusters = segmentation_results['time_period_clusters'] |
| | if 'error' not in time_clusters and 'pca_data' in time_clusters: |
| | pca_data = time_clusters['pca_data'] |
| | cluster_labels = time_clusters['cluster_labels'] |
| | |
| | plt.figure(figsize=(10, 8)) |
| | scatter = plt.scatter(pca_data[:, 0], pca_data[:, 1], |
| | c=cluster_labels, cmap='viridis', alpha=0.7) |
| | plt.colorbar(scatter) |
| | plt.title('Time Period Clustering (PCA)') |
| | plt.xlabel('Principal Component 1') |
| | plt.ylabel('Principal Component 2') |
| | plt.tight_layout() |
| | plt.savefig(self.output_dir / 'time_period_clustering.png', dpi=300, bbox_inches='tight') |
| | plt.close() |
| | |
| | def _plot_statistical_diagnostics(self): |
| | """Plot statistical diagnostics""" |
| | if 'statistical_modeling' in self.results: |
| | stat_results = self.results['statistical_modeling'] |
| | |
| | |
| | if 'regression' in stat_results: |
| | reg_results = stat_results['regression'] |
| | |
| | for target, result in reg_results.items(): |
| | if 'error' not in result and 'residuals' in result: |
| | residuals = result['residuals'] |
| | |
| | fig, axes = plt.subplots(2, 2, figsize=(12, 10)) |
| | |
| | |
| | predictions = result.get('predictions', []) |
| | if len(predictions) == len(residuals): |
| | axes[0, 0].scatter(predictions, residuals, alpha=0.6) |
| | axes[0, 0].axhline(y=0, color='r', linestyle='--') |
| | axes[0, 0].set_title('Residuals vs Fitted') |
| | axes[0, 0].set_xlabel('Fitted Values') |
| | axes[0, 0].set_ylabel('Residuals') |
| | |
| | |
| | from scipy import stats |
| | stats.probplot(residuals, dist="norm", plot=axes[0, 1]) |
| | axes[0, 1].set_title('Q-Q Plot') |
| | |
| | |
| | axes[1, 0].hist(residuals, bins=20, alpha=0.7, edgecolor='black') |
| | axes[1, 0].set_title('Residuals Distribution') |
| | axes[1, 0].set_xlabel('Residuals') |
| | axes[1, 0].set_ylabel('Frequency') |
| | |
| | |
| | axes[1, 1].plot(residuals.index, residuals.values) |
| | axes[1, 1].axhline(y=0, color='r', linestyle='--') |
| | axes[1, 1].set_title('Residuals Time Series') |
| | axes[1, 1].set_xlabel('Time') |
| | axes[1, 1].set_ylabel('Residuals') |
| | |
| | plt.suptitle(f'Regression Diagnostics - {target}') |
| | plt.tight_layout() |
| | plt.savefig(self.output_dir / f'regression_diagnostics_{target}.png', |
| | dpi=300, bbox_inches='tight') |
| | plt.close() |
| | |
| | def _generate_comprehensive_report(self): |
| | """Generate comprehensive analysis report""" |
| | logger.info("Generating comprehensive report") |
| | |
| | |
| | if 'statistical_modeling' in self.results: |
| | stat_report = self.statistical_modeling.generate_statistical_report( |
| | regression_results=self.results['statistical_modeling'].get('regression'), |
| | correlation_results=self.results['statistical_modeling'].get('correlation'), |
| | causality_results=self.results['statistical_modeling'].get('causality') |
| | ) |
| | self.reports['statistical'] = stat_report |
| | |
| | if 'forecasting' in self.results: |
| | forecast_report = self.forecaster.generate_forecast_report(self.results['forecasting']) |
| | self.reports['forecasting'] = forecast_report |
| | |
| | if 'segmentation' in self.results: |
| | segmentation_report = self.segmentation.generate_segmentation_report( |
| | time_period_clusters=self.results['segmentation'].get('time_period_clusters'), |
| | series_clusters=self.results['segmentation'].get('series_clusters') |
| | ) |
| | self.reports['segmentation'] = segmentation_report |
| | |
| | |
| | comprehensive_report = self._generate_comprehensive_summary() |
| | |
| | |
| | timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| | |
| | with open(self.output_dir / f'comprehensive_analysis_report_{timestamp}.txt', 'w') as f: |
| | f.write(comprehensive_report) |
| | |
| | |
| | for report_name, report_content in self.reports.items(): |
| | with open(self.output_dir / f'{report_name}_report_{timestamp}.txt', 'w') as f: |
| | f.write(report_content) |
| | |
| | logger.info(f"Reports saved to {self.output_dir}") |
| | |
| | def _generate_comprehensive_summary(self) -> str: |
| | """Generate comprehensive summary report""" |
| | summary = "COMPREHENSIVE ECONOMIC ANALYTICS REPORT\n" |
| | summary += "=" * 60 + "\n\n" |
| | |
| | |
| | summary += "EXECUTIVE SUMMARY\n" |
| | summary += "-" * 30 + "\n" |
| | |
| | if 'insights' in self.results: |
| | insights = self.results['insights'] |
| | summary += f"Key Findings:\n" |
| | for finding in insights.get('key_findings', []): |
| | summary += f" • {finding}\n" |
| | summary += "\n" |
| | |
| | |
| | summary += "DATA OVERVIEW\n" |
| | summary += "-" * 30 + "\n" |
| | summary += self.client.generate_data_summary(self.data) |
| | |
| | |
| | summary += "ANALYSIS RESULTS SUMMARY\n" |
| | summary += "-" * 30 + "\n" |
| | |
| | |
| | if 'forecasting' in self.results: |
| | summary += "Forecasting Results:\n" |
| | forecasting_results = self.results['forecasting'] |
| | for indicator, result in forecasting_results.items(): |
| | if 'error' not in result: |
| | backtest = result.get('backtest', {}) |
| | if 'error' not in backtest: |
| | mape = backtest.get('mape', 0) |
| | summary += f" • {indicator}: MAPE = {mape:.2f}%\n" |
| | summary += "\n" |
| | |
| | |
| | if 'segmentation' in self.results: |
| | summary += "Segmentation Results:\n" |
| | segmentation_results = self.results['segmentation'] |
| | |
| | if 'time_period_clusters' in segmentation_results: |
| | time_clusters = segmentation_results['time_period_clusters'] |
| | if 'error' not in time_clusters: |
| | n_clusters = time_clusters.get('n_clusters', 0) |
| | summary += f" • Time periods clustered into {n_clusters} economic regimes\n" |
| | |
| | if 'series_clusters' in segmentation_results: |
| | series_clusters = segmentation_results['series_clusters'] |
| | if 'error' not in series_clusters: |
| | n_clusters = series_clusters.get('n_clusters', 0) |
| | summary += f" • Economic series clustered into {n_clusters} groups\n" |
| | summary += "\n" |
| | |
| | |
| | if 'statistical_modeling' in self.results: |
| | summary += "Statistical Analysis Results:\n" |
| | stat_results = self.results['statistical_modeling'] |
| | |
| | if 'correlation' in stat_results: |
| | corr_results = stat_results['correlation'] |
| | significant_correlations = corr_results.get('significant_correlations', []) |
| | summary += f" • {len(significant_correlations)} significant correlations identified\n" |
| | |
| | if 'regression' in stat_results: |
| | reg_results = stat_results['regression'] |
| | successful_models = [k for k, v in reg_results.items() if 'error' not in v] |
| | summary += f" • {len(successful_models)} regression models successfully fitted\n" |
| | summary += "\n" |
| | |
| | |
| | if 'insights' in self.results: |
| | insights = self.results['insights'] |
| | summary += "KEY INSIGHTS\n" |
| | summary += "-" * 30 + "\n" |
| | |
| | for insight_type, insight_list in insights.items(): |
| | if insight_type != 'key_findings' and insight_list: |
| | summary += f"{insight_type.replace('_', ' ').title()}:\n" |
| | for insight in insight_list[:3]: |
| | summary += f" • {insight}\n" |
| | summary += "\n" |
| | |
| | summary += "=" * 60 + "\n" |
| | summary += f"Report generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" |
| | summary += f"Analysis period: {self.data.index.min().strftime('%Y-%m')} to {self.data.index.max().strftime('%Y-%m')}\n" |
| | |
| | return summary |