Edwin Salguero
Enhanced FRED ML with improved Reports & Insights page, fixed alignment analysis, and comprehensive analytics improvements
2469150
| """ | |
| Mathematical Fixes Module | |
| Addresses key mathematical issues in economic data analysis: | |
| 1. Unit normalization and scaling | |
| 2. Frequency alignment and resampling | |
| 3. Correct growth rate calculation | |
| 4. Stationarity enforcement | |
| 5. Forecast period scaling | |
| 6. Safe error metrics | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Tuple, Optional | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class MathematicalFixes: | |
| """ | |
| Comprehensive mathematical fixes for economic data analysis | |
| """ | |
| def __init__(self): | |
| """Initialize mathematical fixes""" | |
| self.frequency_map = { | |
| 'D': 30, # Daily -> 30 periods per quarter | |
| 'M': 3, # Monthly -> 3 periods per quarter | |
| 'Q': 1 # Quarterly -> 1 period per quarter | |
| } | |
| # Unit normalization factors - CORRECTED based on actual FRED data | |
| self.unit_factors = { | |
| 'GDPC1': 1, # FRED GDPC1 is already in correct units (billions) | |
| 'INDPRO': 1, # Index, no change | |
| 'RSAFS': 1e3, # FRED RSAFS is in millions, convert to billions | |
| 'CPIAUCSL': 1, # Index, no change (should be ~316, not 21.9) | |
| 'FEDFUNDS': 1, # Percent, no change | |
| 'DGS10': 1, # Percent, no change | |
| 'UNRATE': 1, # Percent, no change | |
| 'PAYEMS': 1e3, # Convert to thousands | |
| 'PCE': 1e9, # Convert to billions | |
| 'M2SL': 1e9, # Convert to billions | |
| 'TCU': 1, # Percent, no change | |
| 'DEXUSEU': 1 # Exchange rate, no change | |
| } | |
| def normalize_units(self, data: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Normalize units across all economic indicators | |
| Args: | |
| data: DataFrame with economic indicators | |
| Returns: | |
| DataFrame with normalized units | |
| """ | |
| logger.info("Normalizing units across economic indicators") | |
| normalized_data = data.copy() | |
| for column in data.columns: | |
| if column in self.unit_factors: | |
| factor = self.unit_factors[column] | |
| if factor != 1: # Only convert if factor is not 1 | |
| normalized_data[column] = data[column] * factor | |
| logger.debug(f"Normalized {column} by factor {factor}") | |
| else: | |
| # Keep original values for factors of 1 | |
| normalized_data[column] = data[column] | |
| logger.debug(f"Kept {column} as original value") | |
| return normalized_data | |
| def align_frequencies(self, data: pd.DataFrame, target_freq: str = 'Q') -> pd.DataFrame: | |
| """ | |
| Align all series to a common frequency | |
| Args: | |
| data: DataFrame with economic indicators | |
| target_freq: Target frequency ('D', 'M', 'Q') | |
| Returns: | |
| DataFrame with aligned frequencies | |
| """ | |
| logger.info(f"Aligning frequencies to {target_freq}") | |
| aligned_data = pd.DataFrame() | |
| for column in data.columns: | |
| series = data[column].dropna() | |
| if not series.empty: | |
| # Resample to target frequency | |
| if target_freq == 'Q': | |
| # For quarterly, use mean for most series, last value for rates | |
| if column in ['FEDFUNDS', 'DGS10', 'UNRATE', 'TCU']: | |
| resampled = series.resample('QE').last() | |
| else: | |
| resampled = series.resample('QE').mean() | |
| elif target_freq == 'M': | |
| # For monthly, use mean for most series, last value for rates | |
| if column in ['FEDFUNDS', 'DGS10', 'UNRATE', 'TCU']: | |
| resampled = series.resample('ME').last() | |
| else: | |
| resampled = series.resample('ME').mean() | |
| else: | |
| # For daily, forward fill | |
| resampled = series.resample('D').ffill() | |
| aligned_data[column] = resampled | |
| return aligned_data | |
| def calculate_growth_rates(self, data: pd.DataFrame, method: str = 'pct_change') -> pd.DataFrame: | |
| """ | |
| Calculate growth rates with proper handling | |
| Args: | |
| data: DataFrame with economic indicators | |
| method: Method for growth calculation ('pct_change', 'log_diff') | |
| Returns: | |
| DataFrame with growth rates | |
| """ | |
| logger.info(f"Calculating growth rates using {method} method") | |
| growth_data = pd.DataFrame() | |
| for column in data.columns: | |
| series = data[column].dropna() | |
| if len(series) > 1: | |
| if method == 'pct_change': | |
| # Calculate percent change | |
| growth = series.pct_change() * 100 | |
| elif method == 'log_diff': | |
| # Calculate log difference | |
| growth = np.log(series / series.shift(1)) * 100 | |
| else: | |
| # Default to percent change | |
| growth = series.pct_change() * 100 | |
| growth_data[column] = growth | |
| return growth_data | |
| def enforce_stationarity(self, data: pd.DataFrame, max_diffs: int = 2) -> Tuple[pd.DataFrame, Dict]: | |
| """ | |
| Enforce stationarity through differencing | |
| Args: | |
| data: DataFrame with economic indicators | |
| max_diffs: Maximum number of differences to apply | |
| Returns: | |
| Tuple of (stationary_data, differencing_info) | |
| """ | |
| logger.info("Enforcing stationarity through differencing") | |
| stationary_data = pd.DataFrame() | |
| differencing_info = {} | |
| for column in data.columns: | |
| series = data[column].dropna() | |
| if len(series) > 1: | |
| # Apply differencing until stationary | |
| diff_count = 0 | |
| current_series = series | |
| while diff_count < max_diffs: | |
| # Simple stationarity check (can be enhanced with ADF test) | |
| if self._is_stationary(current_series): | |
| break | |
| current_series = current_series.diff().dropna() | |
| diff_count += 1 | |
| stationary_data[column] = current_series | |
| differencing_info[column] = { | |
| 'diffs_applied': diff_count, | |
| 'is_stationary': self._is_stationary(current_series) | |
| } | |
| return stationary_data, differencing_info | |
| def _is_stationary(self, series: pd.Series, threshold: float = 0.05) -> bool: | |
| """ | |
| Simple stationarity check based on variance | |
| Args: | |
| series: Time series to check | |
| threshold: Variance threshold for stationarity | |
| Returns: | |
| True if series appears stationary | |
| """ | |
| if len(series) < 10: | |
| return True | |
| # Split series into halves and compare variance | |
| mid = len(series) // 2 | |
| first_half = series[:mid] | |
| second_half = series[mid:] | |
| var_ratio = second_half.var() / first_half.var() | |
| # If variance ratio is close to 1, series is likely stationary | |
| return 0.5 <= var_ratio <= 2.0 | |
| def scale_forecast_periods(self, forecast_periods: int, indicator: str, data: pd.DataFrame) -> int: | |
| """ | |
| Scale forecast periods based on indicator frequency | |
| Args: | |
| forecast_periods: Base forecast periods | |
| indicator: Economic indicator name | |
| data: DataFrame with economic data | |
| Returns: | |
| Scaled forecast periods | |
| """ | |
| if indicator not in data.columns: | |
| return forecast_periods | |
| series = data[indicator].dropna() | |
| if len(series) < 2: | |
| return forecast_periods | |
| # Determine frequency from data | |
| freq = self._infer_frequency(series) | |
| # Scale forecast periods | |
| if freq == 'D': | |
| return forecast_periods * 30 # 30 days per quarter | |
| elif freq == 'M': | |
| return forecast_periods * 3 # 3 months per quarter | |
| else: | |
| return forecast_periods # Already quarterly | |
| def _infer_frequency(self, series: pd.Series) -> str: | |
| """ | |
| Infer frequency from time series | |
| Args: | |
| series: Time series | |
| Returns: | |
| Frequency string ('D', 'M', 'Q') | |
| """ | |
| if len(series) < 2: | |
| return 'Q' | |
| # Calculate average time difference | |
| time_diff = series.index.to_series().diff().dropna() | |
| avg_diff = time_diff.mean() | |
| if avg_diff.days <= 1: | |
| return 'D' | |
| elif avg_diff.days <= 35: | |
| return 'M' | |
| else: | |
| return 'Q' | |
| def safe_mape(self, actual: np.ndarray, forecast: np.ndarray) -> float: | |
| """ | |
| Calculate safe MAPE with protection against division by zero | |
| Args: | |
| actual: Actual values | |
| forecast: Forecasted values | |
| Returns: | |
| MAPE value | |
| """ | |
| actual = np.array(actual) | |
| forecast = np.array(forecast) | |
| # Avoid division by zero | |
| denominator = np.maximum(np.abs(actual), 1e-8) | |
| mape = np.mean(np.abs((actual - forecast) / denominator)) * 100 | |
| return mape | |
| def safe_mae(self, actual: np.ndarray, forecast: np.ndarray) -> float: | |
| """ | |
| Calculate MAE (Mean Absolute Error) | |
| Args: | |
| actual: Actual values | |
| forecast: Forecasted values | |
| Returns: | |
| MAE value | |
| """ | |
| actual = np.array(actual) | |
| forecast = np.array(forecast) | |
| return np.mean(np.abs(actual - forecast)) | |
| def safe_rmse(self, actual: np.ndarray, forecast: np.ndarray) -> float: | |
| """Calculate RMSE safely handling edge cases""" | |
| if len(actual) == 0 or len(forecast) == 0: | |
| return np.inf | |
| # Ensure same length | |
| min_len = min(len(actual), len(forecast)) | |
| if min_len == 0: | |
| return np.inf | |
| actual_trimmed = actual[:min_len] | |
| forecast_trimmed = forecast[:min_len] | |
| # Remove any infinite or NaN values | |
| mask = np.isfinite(actual_trimmed) & np.isfinite(forecast_trimmed) | |
| if not np.any(mask): | |
| return np.inf | |
| actual_clean = actual_trimmed[mask] | |
| forecast_clean = forecast_trimmed[mask] | |
| if len(actual_clean) == 0: | |
| return np.inf | |
| return np.sqrt(np.mean((actual_clean - forecast_clean) ** 2)) | |
| def validate_scaling(self, series: pd.Series, | |
| unit_hint: str, | |
| expected_min: float, | |
| expected_max: float): | |
| """ | |
| Checks if values fall within expected magnitude range. | |
| Args: | |
| series: pandas Series of numeric data. | |
| unit_hint: description, e.g., "Real GDP". | |
| expected_min / expected_max: plausible lower/upper bounds (same units). | |
| Raises: | |
| ValueError if data outside range for >5% of values. | |
| """ | |
| vals = series.dropna() | |
| mask = (vals < expected_min) | (vals > expected_max) | |
| if mask.mean() > 0.05: | |
| raise ValueError(f"{unit_hint}: {mask.mean():.1%} of data " | |
| f"outside [{expected_min}, {expected_max}]. " | |
| "Check for scaling/unit issues.") | |
| print(f"{unit_hint}: data within expected range.") | |
| def apply_comprehensive_fixes(self, data: pd.DataFrame, | |
| target_freq: str = 'Q', | |
| growth_method: str = 'pct_change', | |
| normalize_units: bool = True, | |
| preserve_absolute_values: bool = False) -> Tuple[pd.DataFrame, Dict]: | |
| """ | |
| Apply comprehensive mathematical fixes to economic data | |
| Args: | |
| data: DataFrame with economic indicators | |
| target_freq: Target frequency ('D', 'M', 'Q') | |
| growth_method: Method for growth calculation ('pct_change', 'log_diff') | |
| normalize_units: Whether to normalize units | |
| preserve_absolute_values: Whether to preserve absolute values for display | |
| Returns: | |
| Tuple of (processed_data, fix_info) | |
| """ | |
| logger.info("Applying comprehensive mathematical fixes") | |
| fix_info = { | |
| 'original_shape': data.shape, | |
| 'frequency_alignment': {}, | |
| 'unit_normalization': {}, | |
| 'growth_calculation': {}, | |
| 'stationarity_enforcement': {}, | |
| 'validation_results': {} | |
| } | |
| processed_data = data.copy() | |
| # Step 1: Align frequencies | |
| if target_freq != 'auto': | |
| processed_data = self.align_frequencies(processed_data, target_freq) | |
| fix_info['frequency_alignment'] = { | |
| 'target_frequency': target_freq, | |
| 'final_shape': processed_data.shape | |
| } | |
| # Step 2: Normalize units | |
| if normalize_units: | |
| processed_data = self.normalize_units(processed_data) | |
| fix_info['unit_normalization'] = { | |
| 'normalized_indicators': list(processed_data.columns) | |
| } | |
| # Step 3: Calculate growth rates if requested | |
| if growth_method in ['pct_change', 'log_diff']: | |
| growth_data = self.calculate_growth_rates(processed_data, growth_method) | |
| fix_info['growth_calculation'] = { | |
| 'method': growth_method, | |
| 'growth_indicators': list(growth_data.columns) | |
| } | |
| # For now, keep both absolute and growth data | |
| if not preserve_absolute_values: | |
| processed_data = growth_data | |
| # Step 4: Enforce stationarity | |
| stationary_data, differencing_info = self.enforce_stationarity(processed_data) | |
| fix_info['stationarity_enforcement'] = differencing_info | |
| # Step 5: Validate processed data | |
| validation_results = self._validate_processed_data(processed_data) | |
| fix_info['validation_results'] = validation_results | |
| logger.info(f"Comprehensive fixes applied. Final shape: {processed_data.shape}") | |
| return processed_data, fix_info | |
| def _validate_processed_data(self, data: pd.DataFrame) -> Dict: | |
| """ | |
| Validate processed data for scaling and quality issues | |
| Args: | |
| data: Processed DataFrame | |
| Returns: | |
| Dictionary with validation results | |
| """ | |
| validation_results = { | |
| 'scaling_issues': [], | |
| 'quality_warnings': [], | |
| 'validation_score': 100.0 | |
| } | |
| for column in data.columns: | |
| series = data[column].dropna() | |
| if len(series) == 0: | |
| validation_results['quality_warnings'].append(f"{column}: No data available") | |
| continue | |
| # Check for extreme values that might indicate scaling issues | |
| mean_val = series.mean() | |
| std_val = series.std() | |
| # Check for values that are too large or too small | |
| if abs(mean_val) > 1e6: | |
| validation_results['scaling_issues'].append( | |
| f"{column}: Mean value {mean_val:.2e} is extremely large - possible scaling issue" | |
| ) | |
| if std_val > 1e5: | |
| validation_results['scaling_issues'].append( | |
| f"{column}: Standard deviation {std_val:.2e} is extremely large - possible scaling issue" | |
| ) | |
| # Check for values that are too close to zero (might indicate unit conversion issues) | |
| if abs(mean_val) < 1e-6 and std_val < 1e-6: | |
| validation_results['scaling_issues'].append( | |
| f"{column}: Values are extremely small - possible unit conversion issue" | |
| ) | |
| # Calculate validation score | |
| total_checks = len(data.columns) | |
| failed_checks = len(validation_results['scaling_issues']) + len(validation_results['quality_warnings']) | |
| if total_checks > 0: | |
| validation_results['validation_score'] = max(0, 100 - (failed_checks / total_checks) * 100) | |
| return validation_results |