Spaces:
Runtime error
Runtime error
| # ============================================ | |
| # CLASS 12: DATA VALIDATION | |
| # ============================================ | |
| from datetime import datetime | |
| import json | |
| from pathlib import Path | |
| from typing import Dict, List | |
| from venv import logger | |
| from config.config import Config | |
| import pandas as pd | |
| import numpy as np | |
| class DataValidator: | |
| """Class for data quality validation""" | |
| def __init__(self, config: Config): | |
| """ | |
| Initialise data validator | |
| Parameters: | |
| ----------- | |
| config : Config | |
| Experiment configuration | |
| """ | |
| self.config = config | |
| self.validation_results = {} | |
| self.quality_metrics = {} | |
| self.issues_found = {} | |
| def validate( | |
| self, | |
| data: pd.DataFrame, | |
| stage: str = 'final', | |
| rules: Dict = None, | |
| detailed: bool = True | |
| ) -> Dict: | |
| """ | |
| Validate data quality | |
| Parameters: | |
| ----------- | |
| data : pd.DataFrame | |
| Input data | |
| stage : str | |
| Validation stage: 'raw', 'processed', 'final' | |
| rules : Dict, optional | |
| Validation rules. If None, uses configuration defaults. | |
| detailed : bool | |
| Whether to perform detailed validation | |
| Returns: | |
| -------- | |
| Dict | |
| Validation results | |
| """ | |
| logger.info("\n" + "="*80) | |
| logger.info(f"DATA VALIDATION ({stage.upper()})") | |
| logger.info("="*80) | |
| rules = rules or self.config.validation_rules | |
| validation_results = { | |
| 'stage': stage, | |
| 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
| 'data_shape': list(data.shape), | |
| 'basic_checks': {}, | |
| 'quality_metrics': {}, | |
| 'issues': {}, | |
| 'recommendations': [], | |
| 'overall_score': 0, | |
| 'status': 'PASS' | |
| } | |
| # Basic checks | |
| validation_results['basic_checks'] = self._basic_checks(data, rules) | |
| # Quality checks | |
| validation_results['quality_metrics'] = self._quality_metrics(data, rules) | |
| # Problem detection | |
| if detailed: | |
| validation_results['issues'] = self._find_issues(data, rules) | |
| # Recommendation generation | |
| validation_results['recommendations'] = self._generate_recommendations( | |
| validation_results['basic_checks'], | |
| validation_results['quality_metrics'], | |
| validation_results['issues'] | |
| ) | |
| # Overall score calculation | |
| validation_results['overall_score'] = self._calculate_overall_score(validation_results) | |
| # Status determination | |
| if validation_results['overall_score'] >= 80: | |
| validation_results['status'] = 'PASS' | |
| elif validation_results['overall_score'] >= 60: | |
| validation_results['status'] = 'WARNING' | |
| else: | |
| validation_results['status'] = 'FAIL' | |
| # Save results | |
| self.validation_results[stage] = validation_results | |
| self.quality_metrics[stage] = validation_results['quality_metrics'] | |
| # Log results | |
| self._log_validation_results(validation_results) | |
| return validation_results | |
| def _basic_checks(self, data: pd.DataFrame, rules: Dict) -> Dict: | |
| """Basic data checks""" | |
| checks = {} | |
| # 1. Data size check | |
| checks['min_rows'] = { | |
| 'value': len(data), | |
| 'threshold': rules.get('min_rows', 100), | |
| 'passed': len(data) >= rules.get('min_rows', 100) | |
| } | |
| # 2. Target variable presence check | |
| target = self.config.target_column | |
| checks['has_target'] = { | |
| 'value': target in data.columns, | |
| 'passed': target in data.columns | |
| } | |
| # 3. Missing values check | |
| missing_percentage = (data.isnull().sum().sum() / data.size) * 100 | |
| checks['missing_percentage'] = { | |
| 'value': missing_percentage, | |
| 'threshold': rules.get('max_missing_percentage', 30), | |
| 'passed': missing_percentage <= rules.get('max_missing_percentage', 30) | |
| } | |
| # 4. Duplicates check | |
| duplicate_count = data.duplicated().sum() | |
| duplicate_percentage = (duplicate_count / len(data)) * 100 | |
| checks['duplicates'] = { | |
| 'value': duplicate_percentage, | |
| 'threshold': 5, # Maximum 5% duplicates | |
| 'passed': duplicate_percentage <= 5 | |
| } | |
| # 5. Data types check | |
| numeric_count = len(data.select_dtypes(include=[np.number]).columns) | |
| checks['numeric_features'] = { | |
| 'value': numeric_count, | |
| 'passed': numeric_count >= 1 # At least one numeric feature required | |
| } | |
| return checks | |
| def _quality_metrics(self, data: pd.DataFrame, rules: Dict) -> Dict: | |
| """Data quality metrics""" | |
| metrics = {} | |
| # 1. Numeric features statistics | |
| numeric_cols = data.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) > 0: | |
| numeric_stats = {} | |
| for col in numeric_cols: | |
| col_data = data[col].dropna() | |
| if len(col_data) > 0: | |
| numeric_stats[col] = { | |
| 'mean': float(col_data.mean()), | |
| 'std': float(col_data.std()), | |
| 'skewness': float(col_data.skew()), | |
| 'kurtosis': float(col_data.kurtosis()), | |
| 'zeros_percentage': float((col_data == 0).sum() / len(col_data) * 100), | |
| 'unique_percentage': float(col_data.nunique() / len(col_data) * 100) | |
| } | |
| metrics['numeric_statistics'] = numeric_stats | |
| # 2. Data stability (for time series) | |
| if isinstance(data.index, pd.DatetimeIndex): | |
| stability_metrics = self._calculate_temporal_stability(data) | |
| metrics['temporal_stability'] = stability_metrics | |
| # 3. Feature informativeness | |
| if self.config.target_column in data.columns: | |
| informativeness = self._calculate_feature_informativeness(data) | |
| metrics['feature_informativeness'] = informativeness | |
| # 4. Target variable quality | |
| target = self.config.target_column | |
| if target in data.columns: | |
| target_data = data[target].dropna() | |
| if len(target_data) > 0: | |
| target_metrics = { | |
| 'missing_percentage': float(target_data.isnull().sum() / len(data) * 100), | |
| 'unique_values': int(target_data.nunique()), | |
| 'is_constant': bool(target_data.nunique() <= 1), | |
| 'has_outliers': self._check_target_outliers(target_data), | |
| 'distribution_type': self._identify_distribution(target_data) | |
| } | |
| metrics['target_quality'] = target_metrics | |
| # 5. Class balance (for classification) - not applicable here, but kept as placeholder | |
| metrics['class_balance'] = {'note': 'Not applicable for regression'} | |
| return metrics | |
| def _calculate_temporal_stability(self, data: pd.DataFrame) -> Dict: | |
| """Calculate time series stability metrics""" | |
| stability = {} | |
| if not isinstance(data.index, pd.DatetimeIndex): | |
| return stability | |
| # Split into periods (e.g., by years) | |
| if 'year' not in data.columns: | |
| data_copy = data.copy() | |
| data_copy['year'] = data_copy.index.year | |
| else: | |
| data_copy = data | |
| years = sorted(data_copy['year'].unique()) | |
| if len(years) > 1: | |
| # Statistics by years for numeric columns | |
| year_stats = {} | |
| for col in data.select_dtypes(include=[np.number]).columns[:5]: # First 5 columns | |
| yearly_means = data_copy.groupby('year')[col].mean() | |
| yearly_stds = data_copy.groupby('year')[col].std() | |
| # Coefficient of variation between years | |
| if yearly_means.std() > 0: | |
| cv_between_years = yearly_means.std() / yearly_means.mean() | |
| else: | |
| cv_between_years = 0 | |
| year_stats[col] = { | |
| 'yearly_means': yearly_means.to_dict(), | |
| 'yearly_stds': yearly_stds.to_dict(), | |
| 'cv_between_years': float(cv_between_years), | |
| 'mean_stability': float(1 - cv_between_years) # 1 - CV, closer to 1 means more stable | |
| } | |
| stability['yearly_statistics'] = year_stats | |
| # Check for time gaps | |
| time_diff = pd.Series(data.index).diff().dropna() | |
| if len(time_diff) > 0: | |
| max_gap = time_diff.max() | |
| avg_gap = time_diff.mean() | |
| gap_std = time_diff.std() | |
| stability['time_gaps'] = { | |
| 'max_gap_days': float(max_gap.days if hasattr(max_gap, 'days') else max_gap), | |
| 'avg_gap_days': float(avg_gap.days if hasattr(avg_gap, 'days') else avg_gap), | |
| 'gap_std': float(gap_std.days if hasattr(gap_std, 'days') else gap_std), | |
| 'has_irregular_gaps': gap_std > avg_gap * 0.5 # If standard deviation > 50% of mean | |
| } | |
| # Seasonal stability | |
| if len(data) > 365: | |
| try: | |
| # Analyse seasonal patterns | |
| seasonal_stability = self._analyse_seasonal_stability(data) | |
| stability['seasonal_stability'] = seasonal_stability | |
| except: | |
| pass | |
| return stability | |
| def _analyse_seasonal_stability(self, data: pd.DataFrame) -> Dict: | |
| """Analyse seasonal patterns stability""" | |
| if not isinstance(data.index, pd.DatetimeIndex): | |
| return {} | |
| # For simplicity, analyse only target variable | |
| target = self.config.target_column | |
| if target not in data.columns: | |
| return {} | |
| series = data[target] | |
| # Split by years and compare seasonal patterns | |
| data_copy = data.copy() | |
| data_copy['year'] = data_copy.index.year | |
| data_copy['month'] = data_copy.index.month | |
| if 'year' in data_copy.columns and 'month' in data_copy.columns: | |
| monthly_means = data_copy.groupby(['year', 'month'])[target].mean().unstack() | |
| if not monthly_means.empty: | |
| # Correlation between years | |
| yearly_corr = monthly_means.corr().mean().mean() | |
| # Variation between years | |
| monthly_cv = monthly_means.std() / monthly_means.mean() | |
| avg_monthly_cv = monthly_cv.mean() | |
| return { | |
| 'yearly_correlation': float(yearly_corr), | |
| 'average_monthly_cv': float(avg_monthly_cv), | |
| 'seasonal_consistency': 'high' if yearly_corr > 0.8 and avg_monthly_cv < 0.3 else | |
| 'medium' if yearly_corr > 0.6 else 'low' | |
| } | |
| return {} | |
| def _calculate_feature_informativeness(self, data: pd.DataFrame) -> Dict: | |
| """Calculate feature informativeness""" | |
| informativeness = {} | |
| target = self.config.target_column | |
| if target not in data.columns: | |
| return informativeness | |
| numeric_cols = data.select_dtypes(include=[np.number]).columns | |
| numeric_cols = [col for col in numeric_cols if col != target] | |
| for col in numeric_cols[:20]: # Limit number of features for analysis | |
| try: | |
| # Correlation with target variable | |
| correlation = data[col].corr(data[target]) | |
| # Mutual information (approximated) | |
| # For simplicity, use absolute correlation as informativeness measure | |
| informativeness[col] = { | |
| 'correlation_with_target': float(correlation), | |
| 'abs_correlation': float(abs(correlation)), | |
| 'informativeness': 'high' if abs(correlation) > 0.5 else | |
| 'medium' if abs(correlation) > 0.3 else 'low' | |
| } | |
| except: | |
| continue | |
| return informativeness | |
| def _check_target_outliers(self, target_series: pd.Series) -> Dict: | |
| """Check target variable for outliers""" | |
| if len(target_series) < 10: | |
| return {'has_outliers': False, 'outlier_percentage': 0} | |
| q1 = target_series.quantile(0.25) | |
| q3 = target_series.quantile(0.75) | |
| iqr = q3 - q1 | |
| if iqr > 0: | |
| lower_bound = q1 - 1.5 * iqr | |
| upper_bound = q3 + 1.5 * iqr | |
| outliers = target_series[(target_series < lower_bound) | (target_series > upper_bound)] | |
| outlier_percentage = len(outliers) / len(target_series) * 100 | |
| return { | |
| 'has_outliers': len(outliers) > 0, | |
| 'outlier_count': int(len(outliers)), | |
| 'outlier_percentage': float(outlier_percentage), | |
| 'outlier_bounds': {'lower': float(lower_bound), 'upper': float(upper_bound)} | |
| } | |
| return {'has_outliers': False, 'outlier_percentage': 0} | |
| def _identify_distribution(self, series: pd.Series) -> str: | |
| """Identify distribution type""" | |
| if len(series) < 30: | |
| return 'insufficient_data' | |
| skewness = series.skew() | |
| kurtosis = series.kurtosis() | |
| if abs(skewness) < 0.5 and abs(kurtosis) < 1: | |
| return 'normal_like' | |
| elif skewness > 1: | |
| return 'right_skewed' | |
| elif skewness < -1: | |
| return 'left_skewed' | |
| elif kurtosis > 3: | |
| return 'heavy_tailed' | |
| elif kurtosis < 2: | |
| return 'light_tailed' | |
| else: | |
| return 'unknown' | |
| def _find_issues(self, data: pd.DataFrame, rules: Dict) -> Dict: | |
| """Find data problems""" | |
| issues = { | |
| 'critical': [], | |
| 'warning': [], | |
| 'info': [] | |
| } | |
| # 1. Check missing values in important features | |
| missing_info = data.isnull().sum() | |
| high_missing_cols = missing_info[missing_info / len(data) * 100 > 20].index.tolist() | |
| for col in high_missing_cols: | |
| missing_pct = missing_info[col] / len(data) * 100 | |
| if missing_pct > 50: | |
| issues['critical'].append(f"Column '{col}': {missing_pct:.1f}% missing values (critical)") | |
| elif missing_pct > 20: | |
| issues['warning'].append(f"Column '{col}': {missing_pct:.1f}% missing values") | |
| # 2. Check constant features | |
| for col in data.columns: | |
| if data[col].nunique() <= 1: | |
| issues['critical'].append(f"Column '{col}': constant value") | |
| # 3. Check feature correlation with itself (lags) | |
| numeric_cols = data.select_dtypes(include=[np.number]).columns | |
| for col in numeric_cols: | |
| if '_lag_' in col or '_diff_' in col: | |
| base_col = col.split('_lag_')[0] if '_lag_' in col else col.split('_diff_')[0] | |
| if base_col in numeric_cols: | |
| correlation = data[col].corr(data[base_col]) | |
| if pd.notna(correlation) and abs(correlation) > 0.95: | |
| issues['info'].append(f"Column '{col}': high correlation with '{base_col}' ({correlation:.3f})") | |
| # 4. Check time gaps | |
| if isinstance(data.index, pd.DatetimeIndex): | |
| time_diff = pd.Series(data.index).diff().dropna() | |
| if len(time_diff) > 0: | |
| max_gap = time_diff.max() | |
| if hasattr(max_gap, 'days') and max_gap.days > 30: | |
| issues['warning'].append(f"Detected time gap: {max_gap.days} days") | |
| # 5. Check target variable | |
| target = self.config.target_column | |
| if target in data.columns: | |
| target_data = data[target].dropna() | |
| if len(target_data) > 0: | |
| if target_data.nunique() <= 1: | |
| issues['critical'].append(f"Target variable '{target}': constant value") | |
| # Check for outliers | |
| outlier_check = self._check_target_outliers(target_data) | |
| if outlier_check.get('has_outliers', False) and outlier_check.get('outlier_percentage', 0) > 10: | |
| issues['warning'].append(f"Target variable '{target}': {outlier_check['outlier_percentage']:.1f}% outliers") | |
| # 6. Check multicollinearity (simplified) | |
| if len(numeric_cols) > 5: | |
| corr_matrix = data[numeric_cols].corr().abs() | |
| high_corr_pairs = [] | |
| for i in range(len(corr_matrix.columns)): | |
| for j in range(i+1, len(corr_matrix.columns)): | |
| if corr_matrix.iloc[i, j] > 0.9: | |
| col1 = corr_matrix.columns[i] | |
| col2 = corr_matrix.columns[j] | |
| high_corr_pairs.append((col1, col2, corr_matrix.iloc[i, j])) | |
| if len(high_corr_pairs) > 5: | |
| issues['warning'].append(f"Detected multicollinearity: {len(high_corr_pairs)} pairs with correlation > 0.9") | |
| return issues | |
| def _generate_recommendations( | |
| self, | |
| basic_checks: Dict, | |
| quality_metrics: Dict, | |
| issues: Dict | |
| ) -> List[str]: | |
| """Generate data improvement recommendations""" | |
| recommendations = [] | |
| # Recommendations based on basic checks | |
| for check_name, check_info in basic_checks.items(): | |
| if not check_info.get('passed', True): | |
| if check_name == 'min_rows': | |
| recommendations.append(f"Increase data volume: current row count ({check_info['value']}) below minimum threshold ({check_info['threshold']})") | |
| elif check_name == 'has_target': | |
| recommendations.append(f"Add target variable '{self.config.target_column}' to data") | |
| elif check_name == 'missing_percentage': | |
| recommendations.append(f"Handle missing values: {check_info['value']:.1f}% missing exceeds threshold {check_info['threshold']}%") | |
| elif check_name == 'duplicates': | |
| recommendations.append(f"Remove duplicates: {check_info['value']:.1f}% duplicate rows") | |
| # Recommendations based on issues | |
| if issues.get('critical'): | |
| recommendations.append("Resolve critical issues before using data") | |
| if issues.get('warning'): | |
| recommendations.append("Consider addressing warnings to improve data quality") | |
| # Recommendations based on quality metrics | |
| target_metrics = quality_metrics.get('target_quality', {}) | |
| if target_metrics.get('is_constant', False): | |
| recommendations.append(f"Target variable '{self.config.target_column}' is constant, different target variable needed") | |
| if target_metrics.get('has_outliers', {}).get('has_outliers', False): | |
| outlier_pct = target_metrics['has_outliers'].get('outlier_percentage', 0) | |
| if outlier_pct > 5: | |
| recommendations.append(f"Handle outliers in target variable: {outlier_pct:.1f}% outliers") | |
| # Time series stability recommendations | |
| temporal_stability = quality_metrics.get('temporal_stability', {}) | |
| if temporal_stability.get('time_gaps', {}).get('has_irregular_gaps', False): | |
| recommendations.append("Detected irregular time intervals, consider resampling") | |
| return recommendations | |
| def _calculate_overall_score(self, validation_results: Dict) -> float: | |
| """Calculate overall data quality score""" | |
| score = 100 | |
| # Penalties for basic checks | |
| basic_checks = validation_results.get('basic_checks', {}) | |
| for check_name, check_info in basic_checks.items(): | |
| if not check_info.get('passed', True): | |
| if check_name == 'min_rows': | |
| score -= 30 | |
| elif check_name == 'has_target': | |
| score -= 50 | |
| elif check_name == 'missing_percentage': | |
| missing_pct = check_info.get('value', 0) | |
| if missing_pct > 50: | |
| score -= 40 | |
| elif missing_pct > 20: | |
| score -= 20 | |
| elif missing_pct > 5: | |
| score -= 10 | |
| elif check_name == 'duplicates': | |
| duplicate_pct = check_info.get('value', 0) | |
| if duplicate_pct > 20: | |
| score -= 30 | |
| elif duplicate_pct > 10: | |
| score -= 15 | |
| elif duplicate_pct > 5: | |
| score -= 5 | |
| # Penalties for issues | |
| issues = validation_results.get('issues', {}) | |
| if issues.get('critical'): | |
| score -= len(issues['critical']) * 20 | |
| if issues.get('warning'): | |
| score -= len(issues['warning']) * 5 | |
| # Bonuses for good metrics | |
| quality_metrics = validation_results.get('quality_metrics', {}) | |
| target_metrics = quality_metrics.get('target_quality', {}) | |
| if not target_metrics.get('is_constant', True): | |
| score += 10 | |
| if target_metrics.get('missing_percentage', 100) < 1: | |
| score += 5 | |
| # Limit score to 0-100 range | |
| return max(0, min(100, score)) | |
| def _log_validation_results(self, validation_results: Dict) -> None: | |
| """Log validation results""" | |
| stage = validation_results['stage'] | |
| status = validation_results['status'] | |
| score = validation_results['overall_score'] | |
| logger.info(f"VALIDATION RESULTS ({stage}):") | |
| logger.info(f" Status: {status}") | |
| logger.info(f" Overall score: {score}/100") | |
| logger.info(f" Data shape: {validation_results['data_shape'][0]}x{validation_results['data_shape'][1]}") | |
| # Basic checks | |
| logger.info("\nBASIC CHECKS:") | |
| for check_name, check_info in validation_results['basic_checks'].items(): | |
| status_icon = "✓" if check_info.get('passed', True) else "✗" | |
| logger.info(f" {status_icon} {check_name}: {check_info.get('value', 'N/A')}") | |
| # Issues | |
| issues = validation_results['issues'] | |
| if any(issues.values()): | |
| logger.info("\nDETECTED ISSUES:") | |
| for severity, issue_list in issues.items(): | |
| if issue_list: | |
| logger.info(f" {severity.upper()}:") | |
| for issue in issue_list[:5]: # Show only first 5 issues of each type | |
| logger.info(f" - {issue}") | |
| if len(issue_list) > 5: | |
| logger.info(f" ... and {len(issue_list) - 5} more issues") | |
| else: | |
| logger.info("\n✓ No issues detected") | |
| # Recommendations | |
| recommendations = validation_results['recommendations'] | |
| if recommendations: | |
| logger.info("\nRECOMMENDATIONS:") | |
| for i, rec in enumerate(recommendations, 1): | |
| logger.info(f" {i}. {rec}") | |
| # Conclusion | |
| if status == 'PASS': | |
| logger.info("\n✓ Data passed validation and is ready for use") | |
| elif status == 'WARNING': | |
| logger.info("\n⚠ Data requires attention, there are issues to address") | |
| else: | |
| logger.info("\n✗ Data requires significant improvement before use") | |
| def generate_report(self, stage: str = 'final') -> Dict: | |
| """Generate detailed validation report""" | |
| if stage not in self.validation_results: | |
| return {} | |
| report = self.validation_results[stage].copy() | |
| # Add metadata | |
| report['config'] = self.config.to_dict() | |
| report['validator_version'] = '1.0' | |
| report['generation_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| # Add detailed metrics | |
| quality_metrics = report.get('quality_metrics', {}) | |
| if 'numeric_statistics' in quality_metrics: | |
| # Numeric features summary | |
| numeric_stats = quality_metrics['numeric_statistics'] | |
| report['numeric_summary'] = { | |
| 'total_numeric_features': len(numeric_stats), | |
| 'features_with_high_skewness': sum(1 for s in numeric_stats.values() if abs(s.get('skewness', 0)) > 1), | |
| 'features_with_high_kurtosis': sum(1 for s in numeric_stats.values() if abs(s.get('kurtosis', 0)) > 3), | |
| 'features_with_many_zeros': sum(1 for s in numeric_stats.values() if s.get('zeros_percentage', 0) > 50) | |
| } | |
| return report | |
| def save_report(self, stage: str = 'final', path: str = None) -> None: | |
| """Save validation report to file""" | |
| if stage not in self.validation_results: | |
| logger.warning(f"Report for stage '{stage}' not found") | |
| return | |
| report = self.generate_report(stage) | |
| if path is None: | |
| path = f'{self.config.results_dir}/reports/validation_report_{stage}.json' | |
| # Create directory if needed | |
| Path(path).parent.mkdir(parents=True, exist_ok=True) | |
| # Custom JSON encoder | |
| class NumpyEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, (np.integer, np.floating)): | |
| if np.isnan(obj): | |
| return None | |
| return float(obj) | |
| elif isinstance(obj, np.bool_): | |
| return bool(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| elif isinstance(obj, pd.Timestamp): | |
| return obj.strftime('%Y-%m-%d %H:%M:%S') | |
| return super().default(obj) | |
| with open(path, 'w', encoding='utf-8') as f: | |
| json.dump(report, f, indent=4, ensure_ascii=False, cls=NumpyEncoder) | |
| logger.info(f"✓ Validation report saved: {path}") |