Edwin Salguero
Enhanced FRED ML with improved Reports & Insights page, fixed alignment analysis, and comprehensive analytics improvements
2469150
| """ | |
| Enhanced FRED Client | |
| Advanced data collection for comprehensive economic indicators | |
| """ | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Union | |
| import pandas as pd | |
| from fredapi import Fred | |
| logger = logging.getLogger(__name__) | |
| class EnhancedFREDClient: | |
| """ | |
| Enhanced FRED API client for comprehensive economic data collection | |
| with support for multiple frequencies and advanced data processing | |
| """ | |
| # Economic indicators mapping | |
| ECONOMIC_INDICATORS = { | |
| # Output & Activity | |
| 'GDPC1': 'Real Gross Domestic Product (chained 2012 dollars)', | |
| 'INDPRO': 'Industrial Production Index', | |
| 'RSAFS': 'Retail Sales', | |
| 'TCU': 'Capacity Utilization', | |
| 'PAYEMS': 'Total Nonfarm Payrolls', | |
| # Prices & Inflation | |
| 'CPIAUCSL': 'Consumer Price Index for All Urban Consumers', | |
| 'PCE': 'Personal Consumption Expenditures', | |
| # Financial & Monetary | |
| 'FEDFUNDS': 'Federal Funds Rate', | |
| 'DGS10': '10-Year Treasury Rate', | |
| 'M2SL': 'M2 Money Stock', | |
| # International | |
| 'DEXUSEU': 'US/Euro Exchange Rate', | |
| # Labor | |
| 'UNRATE': 'Unemployment Rate' | |
| } | |
| def __init__(self, api_key: str): | |
| """ | |
| Initialize enhanced FRED client | |
| Args: | |
| api_key: FRED API key | |
| """ | |
| self.fred = Fred(api_key=api_key) | |
| self.data_cache = {} | |
| def fetch_economic_data(self, indicators: List[str] = None, | |
| start_date: str = '1990-01-01', | |
| end_date: str = None, | |
| frequency: str = 'auto') -> pd.DataFrame: | |
| """ | |
| Fetch comprehensive economic data | |
| Args: | |
| indicators: List of indicators to fetch. If None, fetch all available | |
| start_date: Start date for data collection | |
| end_date: End date for data collection. If None, use current date | |
| frequency: Data frequency ('auto', 'M', 'Q', 'A') | |
| Returns: | |
| DataFrame with economic indicators | |
| """ | |
| if indicators is None: | |
| indicators = list(self.ECONOMIC_INDICATORS.keys()) | |
| if end_date is None: | |
| end_date = datetime.now().strftime('%Y-%m-%d') | |
| logger.info(f"Fetching economic data for {len(indicators)} indicators") | |
| logger.info(f"Date range: {start_date} to {end_date}") | |
| data_dict = {} | |
| for indicator in indicators: | |
| try: | |
| if indicator in self.ECONOMIC_INDICATORS: | |
| series_data = self._fetch_series(indicator, start_date, end_date, frequency) | |
| if series_data is not None and not series_data.empty: | |
| data_dict[indicator] = series_data | |
| logger.info(f"Successfully fetched {indicator}: {len(series_data)} observations") | |
| else: | |
| logger.warning(f"No data available for {indicator}") | |
| else: | |
| logger.warning(f"Unknown indicator: {indicator}") | |
| except Exception as e: | |
| logger.error(f"Failed to fetch {indicator}: {e}") | |
| if not data_dict: | |
| raise ValueError("No data could be fetched for any indicators") | |
| # Combine all series into a single DataFrame | |
| combined_data = pd.concat(data_dict.values(), axis=1) | |
| combined_data.columns = list(data_dict.keys()) | |
| # Sort by date | |
| combined_data = combined_data.sort_index() | |
| logger.info(f"Combined data shape: {combined_data.shape}") | |
| logger.info(f"Date range: {combined_data.index.min()} to {combined_data.index.max()}") | |
| return combined_data | |
| def _fetch_series(self, series_id: str, start_date: str, end_date: str, | |
| frequency: str) -> Optional[pd.Series]: | |
| """ | |
| Fetch individual series with frequency handling | |
| Args: | |
| series_id: FRED series ID | |
| start_date: Start date | |
| end_date: End date | |
| frequency: Data frequency (for post-processing) | |
| Returns: | |
| Series data or None if failed | |
| """ | |
| try: | |
| # Fetch data without frequency parameter (FRED API doesn't support it) | |
| series = self.fred.get_series( | |
| series_id, | |
| observation_start=start_date, | |
| observation_end=end_date | |
| ) | |
| if series.empty: | |
| logger.warning(f"No data returned for {series_id}") | |
| return None | |
| # Handle frequency conversion if needed | |
| if frequency == 'auto': | |
| series = self._standardize_frequency(series, series_id) | |
| elif frequency == 'Q': | |
| # Convert to quarterly if requested | |
| series = self._convert_to_quarterly(series, series_id) | |
| elif frequency == 'M': | |
| # Convert to monthly if requested | |
| series = self._convert_to_monthly(series, series_id) | |
| return series | |
| except Exception as e: | |
| logger.error(f"Error fetching {series_id}: {e}") | |
| return None | |
| def _convert_to_quarterly(self, series: pd.Series, series_id: str) -> pd.Series: | |
| """Convert series to quarterly frequency""" | |
| if series_id in ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', 'M2SL']: | |
| return series.resample('Q').last() | |
| else: | |
| return series.resample('Q').mean() | |
| def _convert_to_monthly(self, series: pd.Series, series_id: str) -> pd.Series: | |
| """Convert series to monthly frequency""" | |
| return series.resample('M').last() | |
| def _get_appropriate_frequency(self, series_id: str) -> str: | |
| """ | |
| Get appropriate frequency for a series based on its characteristics | |
| Args: | |
| series_id: FRED series ID | |
| Returns: | |
| Appropriate frequency string | |
| """ | |
| # Quarterly series | |
| quarterly_series = ['GDPC1', 'PCE'] | |
| # Monthly series (most common) | |
| monthly_series = ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', | |
| 'FEDFUNDS', 'DGS10', 'M2SL', 'DEXUSEU', 'UNRATE'] | |
| if series_id in quarterly_series: | |
| return 'Q' | |
| elif series_id in monthly_series: | |
| return 'M' | |
| else: | |
| return 'M' # Default to monthly | |
| def _standardize_frequency(self, series: pd.Series, series_id: str) -> pd.Series: | |
| """ | |
| Standardize frequency for consistent analysis | |
| Args: | |
| series: Time series data | |
| series_id: Series ID for context | |
| Returns: | |
| Standardized series | |
| """ | |
| # For quarterly analysis, convert monthly to quarterly | |
| if series_id in ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', | |
| 'FEDFUNDS', 'DGS10', 'M2SL', 'DEXUSEU', 'UNRATE']: | |
| # Use end-of-quarter values for most series | |
| if series_id in ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', 'M2SL']: | |
| return series.resample('Q').last() | |
| else: | |
| # For rates, use mean | |
| return series.resample('Q').mean() | |
| return series | |
| def fetch_quarterly_data(self, indicators: List[str] = None, | |
| start_date: str = '1990-01-01', | |
| end_date: str = None) -> pd.DataFrame: | |
| """ | |
| Fetch data standardized to quarterly frequency | |
| Args: | |
| indicators: List of indicators to fetch | |
| start_date: Start date | |
| end_date: End date | |
| Returns: | |
| Quarterly DataFrame | |
| """ | |
| return self.fetch_economic_data(indicators, start_date, end_date, frequency='Q') | |
| def fetch_monthly_data(self, indicators: List[str] = None, | |
| start_date: str = '1990-01-01', | |
| end_date: str = None) -> pd.DataFrame: | |
| """ | |
| Fetch data standardized to monthly frequency | |
| Args: | |
| indicators: List of indicators to fetch | |
| start_date: Start date | |
| end_date: End date | |
| Returns: | |
| Monthly DataFrame | |
| """ | |
| return self.fetch_economic_data(indicators, start_date, end_date, frequency='M') | |
| def get_series_info(self, series_id: str) -> Dict: | |
| """ | |
| Get detailed information about a series | |
| Args: | |
| series_id: FRED series ID | |
| Returns: | |
| Dictionary with series information | |
| """ | |
| try: | |
| info = self.fred.get_series_info(series_id) | |
| return { | |
| 'id': info.id, | |
| 'title': info.title, | |
| 'units': info.units, | |
| 'frequency': info.frequency, | |
| 'seasonal_adjustment': info.seasonal_adjustment, | |
| 'last_updated': info.last_updated, | |
| 'notes': info.notes | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get info for {series_id}: {e}") | |
| return {'error': str(e)} | |
| def get_all_series_info(self, indicators: List[str] = None) -> Dict: | |
| """ | |
| Get information for all indicators | |
| Args: | |
| indicators: List of indicators. If None, use all available | |
| Returns: | |
| Dictionary with series information | |
| """ | |
| if indicators is None: | |
| indicators = list(self.ECONOMIC_INDICATORS.keys()) | |
| series_info = {} | |
| for indicator in indicators: | |
| if indicator in self.ECONOMIC_INDICATORS: | |
| info = self.get_series_info(indicator) | |
| series_info[indicator] = info | |
| logger.info(f"Retrieved info for {indicator}") | |
| return series_info | |
| def validate_data_quality(self, data: pd.DataFrame) -> Dict: | |
| """ | |
| Validate data quality and check for common issues | |
| Args: | |
| data: DataFrame with economic indicators | |
| Returns: | |
| Dictionary with validation results | |
| """ | |
| validation_results = { | |
| 'missing_data': {}, | |
| 'outliers': {}, | |
| 'data_quality_score': 0.0, | |
| 'warnings': [], | |
| 'errors': [] | |
| } | |
| total_series = len(data.columns) | |
| valid_series = 0 | |
| for column in data.columns: | |
| series = data[column].dropna() | |
| if len(series) == 0: | |
| validation_results['missing_data'][column] = 'No data available' | |
| validation_results['errors'].append(f"{column}: No data available") | |
| continue | |
| # Check for missing data | |
| missing_pct = (data[column].isna().sum() / len(data)) * 100 | |
| if missing_pct > 20: | |
| validation_results['missing_data'][column] = f"{missing_pct:.1f}% missing" | |
| validation_results['warnings'].append(f"{column}: {missing_pct:.1f}% missing data") | |
| # Check for outliers using IQR method | |
| Q1 = series.quantile(0.25) | |
| Q3 = series.quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| outliers = series[(series < lower_bound) | (series > upper_bound)] | |
| outlier_pct = (len(outliers) / len(series)) * 100 | |
| if outlier_pct > 5: | |
| validation_results['outliers'][column] = f"{outlier_pct:.1f}% outliers" | |
| validation_results['warnings'].append(f"{column}: {outlier_pct:.1f}% outliers detected") | |
| # Validate scaling for known indicators | |
| self._validate_economic_scaling(series, column, validation_results) | |
| valid_series += 1 | |
| # Calculate overall data quality score | |
| if total_series > 0: | |
| validation_results['data_quality_score'] = (valid_series / total_series) * 100 | |
| return validation_results | |
| def _validate_economic_scaling(self, series: pd.Series, indicator: str, validation_results: Dict): | |
| """ | |
| Validate economic indicator scaling using expected ranges | |
| Args: | |
| series: Time series data | |
| indicator: Indicator name | |
| validation_results: Validation results dictionary to update | |
| """ | |
| # Expected ranges for common economic indicators | |
| scaling_ranges = { | |
| 'GDPC1': (15000, 25000), # Real GDP in billions (2020-2024 range) | |
| 'INDPRO': (90, 110), # Industrial Production Index | |
| 'CPIAUCSL': (250, 350), # Consumer Price Index | |
| 'FEDFUNDS': (0, 10), # Federal Funds Rate (%) | |
| 'DGS10': (0, 8), # 10-Year Treasury Rate (%) | |
| 'UNRATE': (3, 15), # Unemployment Rate (%) | |
| 'PAYEMS': (140000, 160000), # Total Nonfarm Payrolls (thousands) | |
| 'PCE': (15000, 25000), # Personal Consumption Expenditures (billions) | |
| 'M2SL': (20000, 25000), # M2 Money Stock (billions) | |
| 'TCU': (60, 90), # Capacity Utilization (%) | |
| 'DEXUSEU': (0.8, 1.2), # US/Euro Exchange Rate | |
| 'RSAFS': (400000, 600000) # Retail Sales (millions) | |
| } | |
| if indicator in scaling_ranges: | |
| expected_min, expected_max = scaling_ranges[indicator] | |
| # Check if values fall within expected range | |
| vals = series.dropna() | |
| if len(vals) > 0: | |
| mask = (vals < expected_min) | (vals > expected_max) | |
| outlier_pct = mask.mean() * 100 | |
| if outlier_pct > 5: | |
| validation_results['warnings'].append( | |
| f"{indicator}: {outlier_pct:.1f}% of data outside expected range " | |
| f"[{expected_min}, {expected_max}]. Check for scaling/unit issues." | |
| ) | |
| else: | |
| logger.debug(f"{indicator}: data within expected range [{expected_min}, {expected_max}]") | |
| def generate_data_summary(self, data: pd.DataFrame) -> str: | |
| """ | |
| Generate comprehensive data summary report | |
| Args: | |
| data: Economic data DataFrame | |
| Returns: | |
| Formatted summary report | |
| """ | |
| quality_report = self.validate_data_quality(data) | |
| summary = "ECONOMIC DATA SUMMARY\n" | |
| summary += "=" * 50 + "\n\n" | |
| summary += f"Dataset Overview:\n" | |
| summary += f" Total Series: {quality_report['total_series']}\n" | |
| summary += f" Total Observations: {quality_report['total_observations']}\n" | |
| summary += f" Date Range: {quality_report['date_range']['start']} to {quality_report['date_range']['end']}\n\n" | |
| summary += f"Series Information:\n" | |
| for indicator in data.columns: | |
| if indicator in self.ECONOMIC_INDICATORS: | |
| summary += f" {indicator}: {self.ECONOMIC_INDICATORS[indicator]}\n" | |
| summary += "\n" | |
| summary += f"Data Quality:\n" | |
| for series, metrics in quality_report['missing_data'].items(): | |
| summary += f" {series}: {metrics['completeness']:.1f}% complete " | |
| summary += f"({metrics['missing_count']} missing observations)\n" | |
| summary += "\n" | |
| return summary |