| """ |
| Enhanced FRED Client |
| Advanced data collection for comprehensive economic indicators |
| """ |
|
|
| import logging |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Optional, Union |
|
|
| import pandas as pd |
| from fredapi import Fred |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class EnhancedFREDClient: |
| """ |
| Enhanced FRED API client for comprehensive economic data collection |
| with support for multiple frequencies and advanced data processing |
| """ |
| |
| |
| ECONOMIC_INDICATORS = { |
| |
| 'GDPC1': 'Real Gross Domestic Product (chained 2012 dollars)', |
| 'INDPRO': 'Industrial Production Index', |
| 'RSAFS': 'Retail Sales', |
| 'TCU': 'Capacity Utilization', |
| 'PAYEMS': 'Total Nonfarm Payrolls', |
| |
| |
| 'CPIAUCSL': 'Consumer Price Index for All Urban Consumers', |
| 'PCE': 'Personal Consumption Expenditures', |
| |
| |
| 'FEDFUNDS': 'Federal Funds Rate', |
| 'DGS10': '10-Year Treasury Rate', |
| 'M2SL': 'M2 Money Stock', |
| |
| |
| 'DEXUSEU': 'US/Euro Exchange Rate', |
| |
| |
| 'UNRATE': 'Unemployment Rate' |
| } |
| |
| def __init__(self, api_key: str): |
| """ |
| Initialize enhanced FRED client |
| |
| Args: |
| api_key: FRED API key |
| """ |
| self.fred = Fred(api_key=api_key) |
| self.data_cache = {} |
| |
| def fetch_economic_data(self, indicators: List[str] = None, |
| start_date: str = '1990-01-01', |
| end_date: str = None, |
| frequency: str = 'auto') -> pd.DataFrame: |
| """ |
| Fetch comprehensive economic data |
| |
| Args: |
| indicators: List of indicators to fetch. If None, fetch all available |
| start_date: Start date for data collection |
| end_date: End date for data collection. If None, use current date |
| frequency: Data frequency ('auto', 'M', 'Q', 'A') |
| |
| Returns: |
| DataFrame with economic indicators |
| """ |
| if indicators is None: |
| indicators = list(self.ECONOMIC_INDICATORS.keys()) |
| |
| if end_date is None: |
| end_date = datetime.now().strftime('%Y-%m-%d') |
| |
| logger.info(f"Fetching economic data for {len(indicators)} indicators") |
| logger.info(f"Date range: {start_date} to {end_date}") |
| |
| data_dict = {} |
| |
| for indicator in indicators: |
| try: |
| if indicator in self.ECONOMIC_INDICATORS: |
| series_data = self._fetch_series(indicator, start_date, end_date, frequency) |
| if series_data is not None and not series_data.empty: |
| data_dict[indicator] = series_data |
| logger.info(f"Successfully fetched {indicator}: {len(series_data)} observations") |
| else: |
| logger.warning(f"No data available for {indicator}") |
| else: |
| logger.warning(f"Unknown indicator: {indicator}") |
| |
| except Exception as e: |
| logger.error(f"Failed to fetch {indicator}: {e}") |
| |
| if not data_dict: |
| raise ValueError("No data could be fetched for any indicators") |
| |
| |
| combined_data = pd.concat(data_dict.values(), axis=1) |
| combined_data.columns = list(data_dict.keys()) |
| |
| |
| combined_data = combined_data.sort_index() |
| |
| logger.info(f"Combined data shape: {combined_data.shape}") |
| logger.info(f"Date range: {combined_data.index.min()} to {combined_data.index.max()}") |
| |
| return combined_data |
| |
| def _fetch_series(self, series_id: str, start_date: str, end_date: str, |
| frequency: str) -> Optional[pd.Series]: |
| """ |
| Fetch individual series with frequency handling |
| |
| Args: |
| series_id: FRED series ID |
| start_date: Start date |
| end_date: End date |
| frequency: Data frequency |
| |
| Returns: |
| Series data or None if failed |
| """ |
| try: |
| |
| if frequency == 'auto': |
| freq = self._get_appropriate_frequency(series_id) |
| else: |
| freq = frequency |
| |
| |
| series = self.fred.get_series( |
| series_id, |
| observation_start=start_date, |
| observation_end=end_date, |
| frequency=freq |
| ) |
| |
| if series.empty: |
| logger.warning(f"No data returned for {series_id}") |
| return None |
| |
| |
| if frequency == 'auto': |
| series = self._standardize_frequency(series, series_id) |
| |
| return series |
| |
| except Exception as e: |
| logger.error(f"Error fetching {series_id}: {e}") |
| return None |
| |
| def _get_appropriate_frequency(self, series_id: str) -> str: |
| """ |
| Get appropriate frequency for a series based on its characteristics |
| |
| Args: |
| series_id: FRED series ID |
| |
| Returns: |
| Appropriate frequency string |
| """ |
| |
| quarterly_series = ['GDPC1', 'PCE'] |
| |
| |
| monthly_series = ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', |
| 'FEDFUNDS', 'DGS10', 'M2SL', 'DEXUSEU', 'UNRATE'] |
| |
| if series_id in quarterly_series: |
| return 'Q' |
| elif series_id in monthly_series: |
| return 'M' |
| else: |
| return 'M' |
| |
| def _standardize_frequency(self, series: pd.Series, series_id: str) -> pd.Series: |
| """ |
| Standardize frequency for consistent analysis |
| |
| Args: |
| series: Time series data |
| series_id: Series ID for context |
| |
| Returns: |
| Standardized series |
| """ |
| |
| if series_id in ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', |
| 'FEDFUNDS', 'DGS10', 'M2SL', 'DEXUSEU', 'UNRATE']: |
| |
| if series_id in ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', 'M2SL']: |
| return series.resample('Q').last() |
| else: |
| |
| return series.resample('Q').mean() |
| |
| return series |
| |
| def fetch_quarterly_data(self, indicators: List[str] = None, |
| start_date: str = '1990-01-01', |
| end_date: str = None) -> pd.DataFrame: |
| """ |
| Fetch data standardized to quarterly frequency |
| |
| Args: |
| indicators: List of indicators to fetch |
| start_date: Start date |
| end_date: End date |
| |
| Returns: |
| Quarterly DataFrame |
| """ |
| return self.fetch_economic_data(indicators, start_date, end_date, frequency='Q') |
| |
| def fetch_monthly_data(self, indicators: List[str] = None, |
| start_date: str = '1990-01-01', |
| end_date: str = None) -> pd.DataFrame: |
| """ |
| Fetch data standardized to monthly frequency |
| |
| Args: |
| indicators: List of indicators to fetch |
| start_date: Start date |
| end_date: End date |
| |
| Returns: |
| Monthly DataFrame |
| """ |
| return self.fetch_economic_data(indicators, start_date, end_date, frequency='M') |
| |
| def get_series_info(self, series_id: str) -> Dict: |
| """ |
| Get detailed information about a series |
| |
| Args: |
| series_id: FRED series ID |
| |
| Returns: |
| Dictionary with series information |
| """ |
| try: |
| info = self.fred.get_series_info(series_id) |
| return { |
| 'id': info.id, |
| 'title': info.title, |
| 'units': info.units, |
| 'frequency': info.frequency, |
| 'seasonal_adjustment': info.seasonal_adjustment, |
| 'last_updated': info.last_updated, |
| 'notes': info.notes |
| } |
| except Exception as e: |
| logger.error(f"Failed to get info for {series_id}: {e}") |
| return {'error': str(e)} |
| |
| def get_all_series_info(self, indicators: List[str] = None) -> Dict: |
| """ |
| Get information for all indicators |
| |
| Args: |
| indicators: List of indicators. If None, use all available |
| |
| Returns: |
| Dictionary with series information |
| """ |
| if indicators is None: |
| indicators = list(self.ECONOMIC_INDICATORS.keys()) |
| |
| series_info = {} |
| |
| for indicator in indicators: |
| if indicator in self.ECONOMIC_INDICATORS: |
| info = self.get_series_info(indicator) |
| series_info[indicator] = info |
| logger.info(f"Retrieved info for {indicator}") |
| |
| return series_info |
| |
| def validate_data_quality(self, data: pd.DataFrame) -> Dict: |
| """ |
| Validate data quality and completeness |
| |
| Args: |
| data: Economic data DataFrame |
| |
| Returns: |
| Dictionary with quality metrics |
| """ |
| quality_report = { |
| 'total_series': len(data.columns), |
| 'total_observations': len(data), |
| 'date_range': { |
| 'start': data.index.min().strftime('%Y-%m-%d'), |
| 'end': data.index.max().strftime('%Y-%m-%d') |
| }, |
| 'missing_data': {}, |
| 'data_quality': {} |
| } |
| |
| for column in data.columns: |
| series = data[column] |
| |
| |
| missing_count = series.isna().sum() |
| missing_pct = (missing_count / len(series)) * 100 |
| |
| quality_report['missing_data'][column] = { |
| 'missing_count': missing_count, |
| 'missing_percentage': missing_pct, |
| 'completeness': 100 - missing_pct |
| } |
| |
| |
| if not series.isna().all(): |
| non_null_series = series.dropna() |
| quality_report['data_quality'][column] = { |
| 'mean': non_null_series.mean(), |
| 'std': non_null_series.std(), |
| 'min': non_null_series.min(), |
| 'max': non_null_series.max(), |
| 'skewness': non_null_series.skew(), |
| 'kurtosis': non_null_series.kurtosis() |
| } |
| |
| return quality_report |
| |
| def generate_data_summary(self, data: pd.DataFrame) -> str: |
| """ |
| Generate comprehensive data summary report |
| |
| Args: |
| data: Economic data DataFrame |
| |
| Returns: |
| Formatted summary report |
| """ |
| quality_report = self.validate_data_quality(data) |
| |
| summary = "ECONOMIC DATA SUMMARY\n" |
| summary += "=" * 50 + "\n\n" |
| |
| summary += f"Dataset Overview:\n" |
| summary += f" Total Series: {quality_report['total_series']}\n" |
| summary += f" Total Observations: {quality_report['total_observations']}\n" |
| summary += f" Date Range: {quality_report['date_range']['start']} to {quality_report['date_range']['end']}\n\n" |
| |
| summary += f"Series Information:\n" |
| for indicator in data.columns: |
| if indicator in self.ECONOMIC_INDICATORS: |
| summary += f" {indicator}: {self.ECONOMIC_INDICATORS[indicator]}\n" |
| summary += "\n" |
| |
| summary += f"Data Quality:\n" |
| for series, metrics in quality_report['missing_data'].items(): |
| summary += f" {series}: {metrics['completeness']:.1f}% complete " |
| summary += f"({metrics['missing_count']} missing observations)\n" |
| |
| summary += "\n" |
| |
| return summary |