abhaypratapsingh111's picture
Upload folder using huggingface_hub
6b9e3e8 verified
raw
history blame
5.89 kB
"""
Forecast evaluation metrics
"""
import logging
from typing import Dict, Any
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
def calculate_metrics(
actual: pd.Series,
forecast: pd.Series,
include_percentage: bool = True
) -> Dict[str, float]:
"""
Calculate forecast accuracy metrics
Args:
actual: Actual values
forecast: Forecasted values
include_percentage: Include percentage-based metrics
Returns:
Dictionary of metrics
"""
try:
# Ensure same length
min_len = min(len(actual), len(forecast))
actual = actual.iloc[:min_len].values
forecast = forecast.iloc[:min_len].values
# Remove NaN values
mask = ~(np.isnan(actual) | np.isnan(forecast))
actual = actual[mask]
forecast = forecast[mask]
if len(actual) == 0:
return {'error': 'No valid values for metric calculation'}
metrics = {}
# Mean Absolute Error
metrics['MAE'] = float(np.mean(np.abs(actual - forecast)))
# Root Mean Squared Error
metrics['RMSE'] = float(np.sqrt(np.mean((actual - forecast) ** 2)))
# Mean Error (bias)
metrics['ME'] = float(np.mean(forecast - actual))
if include_percentage:
# Mean Absolute Percentage Error
# Avoid division by zero
mask_nonzero = actual != 0
if mask_nonzero.any():
mape = np.mean(np.abs((actual[mask_nonzero] - forecast[mask_nonzero]) / actual[mask_nonzero])) * 100
metrics['MAPE'] = float(mape)
# Symmetric MAPE
denominator = (np.abs(actual) + np.abs(forecast)) / 2
mask_nonzero = denominator != 0
if mask_nonzero.any():
smape = np.mean(np.abs(actual[mask_nonzero] - forecast[mask_nonzero]) / denominator[mask_nonzero]) * 100
metrics['sMAPE'] = float(smape)
# R-squared
ss_res = np.sum((actual - forecast) ** 2)
ss_tot = np.sum((actual - np.mean(actual)) ** 2)
if ss_tot != 0:
metrics['R2'] = float(1 - (ss_res / ss_tot))
return metrics
except Exception as e:
logger.error(f"Error calculating metrics: {str(e)}", exc_info=True)
return {'error': str(e)}
def calculate_coverage(
actual: pd.Series,
lower_bound: pd.Series,
upper_bound: pd.Series
) -> float:
"""
Calculate coverage of prediction intervals
Args:
actual: Actual values
lower_bound: Lower bound of prediction interval
upper_bound: Upper bound of prediction interval
Returns:
Coverage percentage (0-100)
"""
try:
# Ensure same length
min_len = min(len(actual), len(lower_bound), len(upper_bound))
actual = actual.iloc[:min_len].values
lower_bound = lower_bound.iloc[:min_len].values
upper_bound = upper_bound.iloc[:min_len].values
# Count values within bounds
within_bounds = (actual >= lower_bound) & (actual <= upper_bound)
coverage = np.mean(within_bounds) * 100
return float(coverage)
except Exception as e:
logger.error(f"Error calculating coverage: {str(e)}", exc_info=True)
return 0.0
def calculate_interval_width(
lower_bound: pd.Series,
upper_bound: pd.Series
) -> Dict[str, float]:
"""
Calculate statistics about prediction interval width
Args:
lower_bound: Lower bound of prediction interval
upper_bound: Upper bound of prediction interval
Returns:
Dictionary with width statistics
"""
try:
widths = upper_bound - lower_bound
return {
'mean_width': float(widths.mean()),
'median_width': float(widths.median()),
'min_width': float(widths.min()),
'max_width': float(widths.max()),
'std_width': float(widths.std())
}
except Exception as e:
logger.error(f"Error calculating interval width: {str(e)}", exc_info=True)
return {}
def format_metric(value: float, metric_name: str) -> str:
"""
Format metric value for display
Args:
value: Metric value
metric_name: Name of the metric
Returns:
Formatted string
"""
if metric_name in ['MAPE', 'sMAPE', 'R2']:
return f"{value:.2f}%"
elif metric_name in ['MAE', 'RMSE', 'ME']:
if abs(value) >= 1000:
return f"{value:,.2f}"
else:
return f"{value:.4f}"
else:
return f"{value:.4f}"
def summarize_forecast_quality(
forecast_df: pd.DataFrame,
confidence_levels: list
) -> Dict[str, Any]:
"""
Summarize the quality of a forecast
Args:
forecast_df: DataFrame with forecast results
confidence_levels: List of confidence levels
Returns:
Summary dictionary
"""
try:
summary = {
'horizon': len(forecast_df),
'forecast_range': {
'min': float(forecast_df['forecast'].min()),
'max': float(forecast_df['forecast'].max()),
'mean': float(forecast_df['forecast'].mean())
}
}
# Analyze interval widths for each confidence level
interval_widths = {}
for cl in confidence_levels:
lower_col = f'lower_{cl}'
upper_col = f'upper_{cl}'
if lower_col in forecast_df.columns and upper_col in forecast_df.columns:
width = (forecast_df[upper_col] - forecast_df[lower_col]).mean()
interval_widths[f'{cl}%'] = float(width)
summary['interval_widths'] = interval_widths
return summary
except Exception as e:
logger.error(f"Error summarizing forecast: {str(e)}", exc_info=True)
return {}