# ============================================ # CLASS 12: DATA VALIDATION # ============================================ from datetime import datetime import json from pathlib import Path from typing import Dict, List from venv import logger from config.config import Config import pandas as pd import numpy as np class DataValidator: """Class for data quality validation""" def __init__(self, config: Config): """ Initialise data validator Parameters: ----------- config : Config Experiment configuration """ self.config = config self.validation_results = {} self.quality_metrics = {} self.issues_found = {} def validate( self, data: pd.DataFrame, stage: str = 'final', rules: Dict = None, detailed: bool = True ) -> Dict: """ Validate data quality Parameters: ----------- data : pd.DataFrame Input data stage : str Validation stage: 'raw', 'processed', 'final' rules : Dict, optional Validation rules. If None, uses configuration defaults. detailed : bool Whether to perform detailed validation Returns: -------- Dict Validation results """ logger.info("\n" + "="*80) logger.info(f"DATA VALIDATION ({stage.upper()})") logger.info("="*80) rules = rules or self.config.validation_rules validation_results = { 'stage': stage, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'data_shape': list(data.shape), 'basic_checks': {}, 'quality_metrics': {}, 'issues': {}, 'recommendations': [], 'overall_score': 0, 'status': 'PASS' } # Basic checks validation_results['basic_checks'] = self._basic_checks(data, rules) # Quality checks validation_results['quality_metrics'] = self._quality_metrics(data, rules) # Problem detection if detailed: validation_results['issues'] = self._find_issues(data, rules) # Recommendation generation validation_results['recommendations'] = self._generate_recommendations( validation_results['basic_checks'], validation_results['quality_metrics'], validation_results['issues'] ) # Overall score calculation validation_results['overall_score'] = self._calculate_overall_score(validation_results) # Status determination if validation_results['overall_score'] >= 80: validation_results['status'] = 'PASS' elif validation_results['overall_score'] >= 60: validation_results['status'] = 'WARNING' else: validation_results['status'] = 'FAIL' # Save results self.validation_results[stage] = validation_results self.quality_metrics[stage] = validation_results['quality_metrics'] # Log results self._log_validation_results(validation_results) return validation_results def _basic_checks(self, data: pd.DataFrame, rules: Dict) -> Dict: """Basic data checks""" checks = {} # 1. Data size check checks['min_rows'] = { 'value': len(data), 'threshold': rules.get('min_rows', 100), 'passed': len(data) >= rules.get('min_rows', 100) } # 2. Target variable presence check target = self.config.target_column checks['has_target'] = { 'value': target in data.columns, 'passed': target in data.columns } # 3. Missing values check missing_percentage = (data.isnull().sum().sum() / data.size) * 100 checks['missing_percentage'] = { 'value': missing_percentage, 'threshold': rules.get('max_missing_percentage', 30), 'passed': missing_percentage <= rules.get('max_missing_percentage', 30) } # 4. Duplicates check duplicate_count = data.duplicated().sum() duplicate_percentage = (duplicate_count / len(data)) * 100 checks['duplicates'] = { 'value': duplicate_percentage, 'threshold': 5, # Maximum 5% duplicates 'passed': duplicate_percentage <= 5 } # 5. Data types check numeric_count = len(data.select_dtypes(include=[np.number]).columns) checks['numeric_features'] = { 'value': numeric_count, 'passed': numeric_count >= 1 # At least one numeric feature required } return checks def _quality_metrics(self, data: pd.DataFrame, rules: Dict) -> Dict: """Data quality metrics""" metrics = {} # 1. Numeric features statistics numeric_cols = data.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: numeric_stats = {} for col in numeric_cols: col_data = data[col].dropna() if len(col_data) > 0: numeric_stats[col] = { 'mean': float(col_data.mean()), 'std': float(col_data.std()), 'skewness': float(col_data.skew()), 'kurtosis': float(col_data.kurtosis()), 'zeros_percentage': float((col_data == 0).sum() / len(col_data) * 100), 'unique_percentage': float(col_data.nunique() / len(col_data) * 100) } metrics['numeric_statistics'] = numeric_stats # 2. Data stability (for time series) if isinstance(data.index, pd.DatetimeIndex): stability_metrics = self._calculate_temporal_stability(data) metrics['temporal_stability'] = stability_metrics # 3. Feature informativeness if self.config.target_column in data.columns: informativeness = self._calculate_feature_informativeness(data) metrics['feature_informativeness'] = informativeness # 4. Target variable quality target = self.config.target_column if target in data.columns: target_data = data[target].dropna() if len(target_data) > 0: target_metrics = { 'missing_percentage': float(target_data.isnull().sum() / len(data) * 100), 'unique_values': int(target_data.nunique()), 'is_constant': bool(target_data.nunique() <= 1), 'has_outliers': self._check_target_outliers(target_data), 'distribution_type': self._identify_distribution(target_data) } metrics['target_quality'] = target_metrics # 5. Class balance (for classification) - not applicable here, but kept as placeholder metrics['class_balance'] = {'note': 'Not applicable for regression'} return metrics def _calculate_temporal_stability(self, data: pd.DataFrame) -> Dict: """Calculate time series stability metrics""" stability = {} if not isinstance(data.index, pd.DatetimeIndex): return stability # Split into periods (e.g., by years) if 'year' not in data.columns: data_copy = data.copy() data_copy['year'] = data_copy.index.year else: data_copy = data years = sorted(data_copy['year'].unique()) if len(years) > 1: # Statistics by years for numeric columns year_stats = {} for col in data.select_dtypes(include=[np.number]).columns[:5]: # First 5 columns yearly_means = data_copy.groupby('year')[col].mean() yearly_stds = data_copy.groupby('year')[col].std() # Coefficient of variation between years if yearly_means.std() > 0: cv_between_years = yearly_means.std() / yearly_means.mean() else: cv_between_years = 0 year_stats[col] = { 'yearly_means': yearly_means.to_dict(), 'yearly_stds': yearly_stds.to_dict(), 'cv_between_years': float(cv_between_years), 'mean_stability': float(1 - cv_between_years) # 1 - CV, closer to 1 means more stable } stability['yearly_statistics'] = year_stats # Check for time gaps time_diff = pd.Series(data.index).diff().dropna() if len(time_diff) > 0: max_gap = time_diff.max() avg_gap = time_diff.mean() gap_std = time_diff.std() stability['time_gaps'] = { 'max_gap_days': float(max_gap.days if hasattr(max_gap, 'days') else max_gap), 'avg_gap_days': float(avg_gap.days if hasattr(avg_gap, 'days') else avg_gap), 'gap_std': float(gap_std.days if hasattr(gap_std, 'days') else gap_std), 'has_irregular_gaps': gap_std > avg_gap * 0.5 # If standard deviation > 50% of mean } # Seasonal stability if len(data) > 365: try: # Analyse seasonal patterns seasonal_stability = self._analyse_seasonal_stability(data) stability['seasonal_stability'] = seasonal_stability except: pass return stability def _analyse_seasonal_stability(self, data: pd.DataFrame) -> Dict: """Analyse seasonal patterns stability""" if not isinstance(data.index, pd.DatetimeIndex): return {} # For simplicity, analyse only target variable target = self.config.target_column if target not in data.columns: return {} series = data[target] # Split by years and compare seasonal patterns data_copy = data.copy() data_copy['year'] = data_copy.index.year data_copy['month'] = data_copy.index.month if 'year' in data_copy.columns and 'month' in data_copy.columns: monthly_means = data_copy.groupby(['year', 'month'])[target].mean().unstack() if not monthly_means.empty: # Correlation between years yearly_corr = monthly_means.corr().mean().mean() # Variation between years monthly_cv = monthly_means.std() / monthly_means.mean() avg_monthly_cv = monthly_cv.mean() return { 'yearly_correlation': float(yearly_corr), 'average_monthly_cv': float(avg_monthly_cv), 'seasonal_consistency': 'high' if yearly_corr > 0.8 and avg_monthly_cv < 0.3 else 'medium' if yearly_corr > 0.6 else 'low' } return {} def _calculate_feature_informativeness(self, data: pd.DataFrame) -> Dict: """Calculate feature informativeness""" informativeness = {} target = self.config.target_column if target not in data.columns: return informativeness numeric_cols = data.select_dtypes(include=[np.number]).columns numeric_cols = [col for col in numeric_cols if col != target] for col in numeric_cols[:20]: # Limit number of features for analysis try: # Correlation with target variable correlation = data[col].corr(data[target]) # Mutual information (approximated) # For simplicity, use absolute correlation as informativeness measure informativeness[col] = { 'correlation_with_target': float(correlation), 'abs_correlation': float(abs(correlation)), 'informativeness': 'high' if abs(correlation) > 0.5 else 'medium' if abs(correlation) > 0.3 else 'low' } except: continue return informativeness def _check_target_outliers(self, target_series: pd.Series) -> Dict: """Check target variable for outliers""" if len(target_series) < 10: return {'has_outliers': False, 'outlier_percentage': 0} q1 = target_series.quantile(0.25) q3 = target_series.quantile(0.75) iqr = q3 - q1 if iqr > 0: lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr outliers = target_series[(target_series < lower_bound) | (target_series > upper_bound)] outlier_percentage = len(outliers) / len(target_series) * 100 return { 'has_outliers': len(outliers) > 0, 'outlier_count': int(len(outliers)), 'outlier_percentage': float(outlier_percentage), 'outlier_bounds': {'lower': float(lower_bound), 'upper': float(upper_bound)} } return {'has_outliers': False, 'outlier_percentage': 0} def _identify_distribution(self, series: pd.Series) -> str: """Identify distribution type""" if len(series) < 30: return 'insufficient_data' skewness = series.skew() kurtosis = series.kurtosis() if abs(skewness) < 0.5 and abs(kurtosis) < 1: return 'normal_like' elif skewness > 1: return 'right_skewed' elif skewness < -1: return 'left_skewed' elif kurtosis > 3: return 'heavy_tailed' elif kurtosis < 2: return 'light_tailed' else: return 'unknown' def _find_issues(self, data: pd.DataFrame, rules: Dict) -> Dict: """Find data problems""" issues = { 'critical': [], 'warning': [], 'info': [] } # 1. Check missing values in important features missing_info = data.isnull().sum() high_missing_cols = missing_info[missing_info / len(data) * 100 > 20].index.tolist() for col in high_missing_cols: missing_pct = missing_info[col] / len(data) * 100 if missing_pct > 50: issues['critical'].append(f"Column '{col}': {missing_pct:.1f}% missing values (critical)") elif missing_pct > 20: issues['warning'].append(f"Column '{col}': {missing_pct:.1f}% missing values") # 2. Check constant features for col in data.columns: if data[col].nunique() <= 1: issues['critical'].append(f"Column '{col}': constant value") # 3. Check feature correlation with itself (lags) numeric_cols = data.select_dtypes(include=[np.number]).columns for col in numeric_cols: if '_lag_' in col or '_diff_' in col: base_col = col.split('_lag_')[0] if '_lag_' in col else col.split('_diff_')[0] if base_col in numeric_cols: correlation = data[col].corr(data[base_col]) if pd.notna(correlation) and abs(correlation) > 0.95: issues['info'].append(f"Column '{col}': high correlation with '{base_col}' ({correlation:.3f})") # 4. Check time gaps if isinstance(data.index, pd.DatetimeIndex): time_diff = pd.Series(data.index).diff().dropna() if len(time_diff) > 0: max_gap = time_diff.max() if hasattr(max_gap, 'days') and max_gap.days > 30: issues['warning'].append(f"Detected time gap: {max_gap.days} days") # 5. Check target variable target = self.config.target_column if target in data.columns: target_data = data[target].dropna() if len(target_data) > 0: if target_data.nunique() <= 1: issues['critical'].append(f"Target variable '{target}': constant value") # Check for outliers outlier_check = self._check_target_outliers(target_data) if outlier_check.get('has_outliers', False) and outlier_check.get('outlier_percentage', 0) > 10: issues['warning'].append(f"Target variable '{target}': {outlier_check['outlier_percentage']:.1f}% outliers") # 6. Check multicollinearity (simplified) if len(numeric_cols) > 5: corr_matrix = data[numeric_cols].corr().abs() high_corr_pairs = [] for i in range(len(corr_matrix.columns)): for j in range(i+1, len(corr_matrix.columns)): if corr_matrix.iloc[i, j] > 0.9: col1 = corr_matrix.columns[i] col2 = corr_matrix.columns[j] high_corr_pairs.append((col1, col2, corr_matrix.iloc[i, j])) if len(high_corr_pairs) > 5: issues['warning'].append(f"Detected multicollinearity: {len(high_corr_pairs)} pairs with correlation > 0.9") return issues def _generate_recommendations( self, basic_checks: Dict, quality_metrics: Dict, issues: Dict ) -> List[str]: """Generate data improvement recommendations""" recommendations = [] # Recommendations based on basic checks for check_name, check_info in basic_checks.items(): if not check_info.get('passed', True): if check_name == 'min_rows': recommendations.append(f"Increase data volume: current row count ({check_info['value']}) below minimum threshold ({check_info['threshold']})") elif check_name == 'has_target': recommendations.append(f"Add target variable '{self.config.target_column}' to data") elif check_name == 'missing_percentage': recommendations.append(f"Handle missing values: {check_info['value']:.1f}% missing exceeds threshold {check_info['threshold']}%") elif check_name == 'duplicates': recommendations.append(f"Remove duplicates: {check_info['value']:.1f}% duplicate rows") # Recommendations based on issues if issues.get('critical'): recommendations.append("Resolve critical issues before using data") if issues.get('warning'): recommendations.append("Consider addressing warnings to improve data quality") # Recommendations based on quality metrics target_metrics = quality_metrics.get('target_quality', {}) if target_metrics.get('is_constant', False): recommendations.append(f"Target variable '{self.config.target_column}' is constant, different target variable needed") if target_metrics.get('has_outliers', {}).get('has_outliers', False): outlier_pct = target_metrics['has_outliers'].get('outlier_percentage', 0) if outlier_pct > 5: recommendations.append(f"Handle outliers in target variable: {outlier_pct:.1f}% outliers") # Time series stability recommendations temporal_stability = quality_metrics.get('temporal_stability', {}) if temporal_stability.get('time_gaps', {}).get('has_irregular_gaps', False): recommendations.append("Detected irregular time intervals, consider resampling") return recommendations def _calculate_overall_score(self, validation_results: Dict) -> float: """Calculate overall data quality score""" score = 100 # Penalties for basic checks basic_checks = validation_results.get('basic_checks', {}) for check_name, check_info in basic_checks.items(): if not check_info.get('passed', True): if check_name == 'min_rows': score -= 30 elif check_name == 'has_target': score -= 50 elif check_name == 'missing_percentage': missing_pct = check_info.get('value', 0) if missing_pct > 50: score -= 40 elif missing_pct > 20: score -= 20 elif missing_pct > 5: score -= 10 elif check_name == 'duplicates': duplicate_pct = check_info.get('value', 0) if duplicate_pct > 20: score -= 30 elif duplicate_pct > 10: score -= 15 elif duplicate_pct > 5: score -= 5 # Penalties for issues issues = validation_results.get('issues', {}) if issues.get('critical'): score -= len(issues['critical']) * 20 if issues.get('warning'): score -= len(issues['warning']) * 5 # Bonuses for good metrics quality_metrics = validation_results.get('quality_metrics', {}) target_metrics = quality_metrics.get('target_quality', {}) if not target_metrics.get('is_constant', True): score += 10 if target_metrics.get('missing_percentage', 100) < 1: score += 5 # Limit score to 0-100 range return max(0, min(100, score)) def _log_validation_results(self, validation_results: Dict) -> None: """Log validation results""" stage = validation_results['stage'] status = validation_results['status'] score = validation_results['overall_score'] logger.info(f"VALIDATION RESULTS ({stage}):") logger.info(f" Status: {status}") logger.info(f" Overall score: {score}/100") logger.info(f" Data shape: {validation_results['data_shape'][0]}x{validation_results['data_shape'][1]}") # Basic checks logger.info("\nBASIC CHECKS:") for check_name, check_info in validation_results['basic_checks'].items(): status_icon = "✓" if check_info.get('passed', True) else "✗" logger.info(f" {status_icon} {check_name}: {check_info.get('value', 'N/A')}") # Issues issues = validation_results['issues'] if any(issues.values()): logger.info("\nDETECTED ISSUES:") for severity, issue_list in issues.items(): if issue_list: logger.info(f" {severity.upper()}:") for issue in issue_list[:5]: # Show only first 5 issues of each type logger.info(f" - {issue}") if len(issue_list) > 5: logger.info(f" ... and {len(issue_list) - 5} more issues") else: logger.info("\n✓ No issues detected") # Recommendations recommendations = validation_results['recommendations'] if recommendations: logger.info("\nRECOMMENDATIONS:") for i, rec in enumerate(recommendations, 1): logger.info(f" {i}. {rec}") # Conclusion if status == 'PASS': logger.info("\n✓ Data passed validation and is ready for use") elif status == 'WARNING': logger.info("\n⚠ Data requires attention, there are issues to address") else: logger.info("\n✗ Data requires significant improvement before use") def generate_report(self, stage: str = 'final') -> Dict: """Generate detailed validation report""" if stage not in self.validation_results: return {} report = self.validation_results[stage].copy() # Add metadata report['config'] = self.config.to_dict() report['validator_version'] = '1.0' report['generation_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Add detailed metrics quality_metrics = report.get('quality_metrics', {}) if 'numeric_statistics' in quality_metrics: # Numeric features summary numeric_stats = quality_metrics['numeric_statistics'] report['numeric_summary'] = { 'total_numeric_features': len(numeric_stats), 'features_with_high_skewness': sum(1 for s in numeric_stats.values() if abs(s.get('skewness', 0)) > 1), 'features_with_high_kurtosis': sum(1 for s in numeric_stats.values() if abs(s.get('kurtosis', 0)) > 3), 'features_with_many_zeros': sum(1 for s in numeric_stats.values() if s.get('zeros_percentage', 0) > 50) } return report def save_report(self, stage: str = 'final', path: str = None) -> None: """Save validation report to file""" if stage not in self.validation_results: logger.warning(f"Report for stage '{stage}' not found") return report = self.generate_report(stage) if path is None: path = f'{self.config.results_dir}/reports/validation_report_{stage}.json' # Create directory if needed Path(path).parent.mkdir(parents=True, exist_ok=True) # Custom JSON encoder class NumpyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, (np.integer, np.floating)): if np.isnan(obj): return None return float(obj) elif isinstance(obj, np.bool_): return bool(obj) elif isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, pd.Timestamp): return obj.strftime('%Y-%m-%d %H:%M:%S') return super().default(obj) with open(path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=4, ensure_ascii=False, cls=NumpyEncoder) logger.info(f"✓ Validation report saved: {path}")