# ============================================ # CLASS 3: MISSING VALUE ANALYSER # ============================================ from typing import Dict, Tuple from venv import logger from config.config import Config from scipy.interpolate import interp1d from statsmodels.tsa.seasonal import seasonal_decompose, STL try: import pandas as pd import numpy as np import matplotlib.pyplot as plt print("✅ All imports working!") except ImportError as e: print(f"❌ Import error: {e}") class MissingValueAnalyser: """Class for analysing and handling missing values""" def __init__(self, config: Config): """ Initialise missing value analyser Parameters: ----------- config : Config Experiment configuration """ self.config = config self.missing_info = {} self.handling_methods = {} self.imputers = {} self.missing_patterns = {} def analyse( self, data: pd.DataFrame, detailed: bool = True ) -> Dict: """ Analyse missing values in data Parameters: ----------- data : pd.DataFrame Input data detailed : bool Whether to perform detailed analysis Returns: -------- Dict Information about missing values """ logger.info("\n" + "="*80) logger.info("MISSING VALUE ANALYSIS") logger.info("="*80) # Calculate missing values missing_total = data.isnull().sum() missing_percent = (missing_total / len(data)) * 100 missing_df = pd.DataFrame({ 'missing_count': missing_total, 'missing_percent': missing_percent, 'dtype': data.dtypes.astype(str) }) # Detailed analysis if detailed: self._detailed_missing_analysis(data, missing_df) # Save information self.missing_info = { 'summary': { col: { 'missing_count': int(missing_df.loc[col, 'missing_count']), 'missing_percent': float(missing_df.loc[col, 'missing_percent']), 'dtype': missing_df.loc[col, 'dtype'] } for col in missing_df.index }, 'overall': { 'total_missing': int(missing_total.sum()), 'total_rows': int(len(data)), 'total_cells': int(data.size), 'overall_missing_percentage': float(missing_total.sum() / data.size * 100), 'rows_with_any_missing': int(data.isnull().any(axis=1).sum()), 'rows_all_missing': int(data.isnull().all(axis=1).sum()), 'columns_with_missing': missing_df[missing_df['missing_count'] > 0].index.tolist(), 'columns_all_missing': missing_df[missing_df['missing_count'] == len(data)].index.tolist() } } # Visualisation if self.config.save_plots: self._plot_missing_values(data, missing_df) # Output results self._log_missing_summary(missing_df) return self.missing_info def _detailed_missing_analysis( self, data: pd.DataFrame, missing_df: pd.DataFrame ) -> None: """Detailed missing value analysis""" # Analyse missing patterns missing_matrix = data.isnull() # Row missing patterns row_patterns = missing_matrix.apply(lambda x: ''.join(x.astype(int).astype(str)), axis=1) row_pattern_counts = row_patterns.value_counts().head(10) # Column missing patterns col_patterns = missing_matrix.apply(lambda x: ''.join(x.astype(int).astype(str)), axis=0) col_pattern_counts = col_patterns.value_counts().head(10) # Time-based missing patterns analysis time_patterns = {} if isinstance(data.index, pd.DatetimeIndex): # Missing values by time time_missing = data.isnull().resample('M').sum() time_patterns['monthly_missing'] = time_missing.sum(axis=1).to_dict() # Missing values by day of week data_with_dow = data.copy() data_with_dow['dayofweek'] = data.index.dayofweek dow_missing = data_with_dow.groupby('dayofweek').apply(lambda x: x.isnull().sum().sum()) time_patterns['dayofweek_missing'] = dow_missing.to_dict() self.missing_patterns = { 'row_patterns': row_pattern_counts.to_dict(), 'col_patterns': col_pattern_counts.to_dict(), 'time_patterns': time_patterns, 'missing_correlation': missing_matrix.corr().to_dict() # Missing value correlation } logger.debug(f"Found {len(row_pattern_counts)} unique row missing patterns") logger.debug(f"Found {len(col_pattern_counts)} unique column missing patterns") def _plot_missing_values( self, data: pd.DataFrame, missing_df: pd.DataFrame ) -> None: """Visualise missing values""" fig, axes = plt.subplots(3, 2, figsize=(16, 12)) # 1. Missing percentage histogram axes[0, 0].barh( missing_df.index, missing_df['missing_percent'] ) axes[0, 0].axvline(self.config.missing_threshold, color='red', linestyle='--') axes[0, 0].set_title('Missing Percentage by Column') axes[0, 0].set_xlabel('Missing Percentage (%)') axes[0, 0].set_ylabel('Columns') axes[0, 0].grid(True, alpha=0.3) # 2. Missing values heatmap missing_matrix = data.isnull() axes[0, 1].imshow( missing_matrix.T if len(data) > 1000 else missing_matrix.T[:1000], aspect='auto', cmap='binary', interpolation='none' ) axes[0, 1].set_title('Missing Values Matrix') axes[0, 1].set_xlabel('Observation Index') axes[0, 1].set_ylabel('Variables') axes[0, 1].set_yticks(range(len(data.columns))) axes[0, 1].set_yticklabels(data.columns, fontsize=8) # 3. Missing values over time (if time series) if isinstance(data.index, pd.DatetimeIndex): time_missing = data.isnull().resample('M').sum() axes[1, 0].plot(time_missing.sum(axis=1)) axes[1, 0].set_title('Missing Values by Month') axes[1, 0].set_xlabel('Date') axes[1, 0].set_ylabel('Number of Missing Values') axes[1, 0].grid(True, alpha=0.3) # 4. Missing values by day of week data_with_dow = data.copy() data_with_dow['dayofweek'] = data.index.dayofweek dow_missing = data_with_dow.groupby('dayofweek').apply(lambda x: x.isnull().sum().sum()) dow_names = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] axes[1, 1].bar(range(7), dow_missing) axes[1, 1].set_title('Missing Values by Day of Week') axes[1, 1].set_xlabel('Day of Week') axes[1, 1].set_ylabel('Number of Missing Values') axes[1, 1].set_xticks(range(7)) axes[1, 1].set_xticklabels(dow_names) axes[1, 1].grid(True, alpha=0.3) # 5. Missing value correlation missing_corr = data.isnull().corr() im = axes[2, 0].imshow( missing_corr, cmap='coolwarm', vmin=-1, vmax=1, aspect='auto' ) axes[2, 0].set_title('Missing Value Correlation Between Variables') axes[2, 0].set_xlabel('Variables') axes[2, 0].set_ylabel('Variables') plt.colorbar(im, ax=axes[2, 0]) # 6. Cumulative missing sum cumulative_missing = data.isnull().cumsum() for col in data.columns[:5]: # First 5 columns if data[col].isnull().any(): axes[2, 1].plot( cumulative_missing.index, cumulative_missing[col], label=col[:20] ) axes[2, 1].set_title('Cumulative Missing Values') axes[2, 1].set_xlabel('Time/Index') axes[2, 1].set_ylabel('Cumulative Missing') axes[2, 1].legend(fontsize=8) axes[2, 1].grid(True, alpha=0.3) plt.tight_layout() plt.savefig( f'{self.config.results_dir}/plots/missing_values_analysis.png', dpi=300, bbox_inches='tight' ) plt.show() def _log_missing_summary(self, missing_df: pd.DataFrame) -> None: """Log missing value summary""" missing_columns = missing_df[missing_df['missing_count'] > 0] if len(missing_columns) > 0: logger.info("MISSING VALUES FOUND:") logger.info("-" * 50) logger.info(f"Total missing values: {self.missing_info['overall']['total_missing']}") logger.info(f"Overall missing percentage: {self.missing_info['overall']['overall_missing_percentage']:.2f}%") logger.info(f"Rows with missing values: {self.missing_info['overall']['rows_with_any_missing']}") logger.info(f"Columns with missing values: {len(self.missing_info['overall']['columns_with_missing'])}") logger.info("\nTop-10 columns by missing values:") top_missing = missing_df.nlargest(10, 'missing_percent') for idx, (col, row) in enumerate(top_missing.iterrows(), 1): logger.info(f" {idx:2d}. {col}: {int(row['missing_count'])} missing ({row['missing_percent']:.2f}%)") else: logger.info("✓ No missing values found") def handle( self, data: pd.DataFrame, method: str = 'interpolate', strategy: str = 'columnwise', **kwargs ) -> pd.DataFrame: """ Handle missing values Parameters: ----------- data : pd.DataFrame Input data method : str Handling method: 'interpolate', 'ffill', 'bfill', 'mean', 'median', 'mode', 'knn', 'regression' strategy : str Strategy: 'columnwise', 'rowwise', 'global' **kwargs : dict Additional parameters for method Returns: -------- pd.DataFrame Data with handled missing values """ logger.info("\n" + "="*80) logger.info("HANDLING MISSING VALUES") logger.info("="*80) data_processed = data.copy() methods_applied = {} # Determine columns to process if strategy == 'columnwise': columns_to_process = data_processed.columns elif strategy == 'rowwise': # Row-wise handling (for time series) data_processed = self._handle_rowwise(data_processed, method, **kwargs) return data_processed else: columns_to_process = data_processed.select_dtypes(include=[np.number]).columns # Process each column for col in columns_to_process: missing_before = data_processed[col].isnull().sum() if missing_before > 0: # Check if missing percentage exceeds threshold missing_percent = (missing_before / len(data_processed)) * 100 if missing_percent > self.config.missing_threshold: logger.warning(f" {col}: {missing_before} missing ({missing_percent:.1f}%) > threshold {self.config.missing_threshold}%") if kwargs.get('drop_high_missing', False): data_processed = data_processed.drop(columns=[col]) method_used = f"dropped (>{self.config.missing_threshold}% missing)" missing_after = 0 else: # Use selected method data_processed[col], method_used = self._apply_imputation_method( data_processed[col], method, **kwargs ) missing_after = data_processed[col].isnull().sum() else: # Use selected method data_processed[col], method_used = self._apply_imputation_method( data_processed[col], method, **kwargs ) missing_after = data_processed[col].isnull().sum() methods_applied[col] = { 'method': method_used, 'missing_before': int(missing_before), 'missing_after': int(missing_after), 'missing_percent_before': float(missing_percent) } if missing_before > 0: logger.info(f" {col}: {missing_before} → {missing_after} missing ({method_used})") self.handling_methods = methods_applied # Check that all missing values are handled remaining_missing = data_processed.isnull().sum().sum() if remaining_missing == 0: logger.info("✓ All missing values successfully handled") else: logger.warning(f"⚠ {remaining_missing} missing values remain") # Additional handling of remaining missing values data_processed = data_processed.fillna(method='ffill').fillna(method='bfill') remaining_after = data_processed.isnull().sum().sum() if remaining_after == 0: logger.info("✓ Remaining missing values handled with ffill/bfill combination") return data_processed def _apply_imputation_method( self, series: pd.Series, method: str, **kwargs ) -> Tuple[pd.Series, str]: """ Apply imputation method to individual series Parameters: ----------- series : pd.Series Input series method : str Imputation method **kwargs : dict Additional parameters Returns: -------- Tuple[pd.Series, str] Processed series and method description """ if method == 'interpolate': # Interpolation for time series if isinstance(series.index, pd.DatetimeIndex): method_name = f"{kwargs.get('interpolation_method', 'linear')} interpolation" series_filled = series.interpolate( method=kwargs.get('interpolation_method', 'linear'), limit_direction=kwargs.get('limit_direction', 'both'), limit=kwargs.get('limit', None) ) else: method_name = 'linear interpolation' series_filled = series.interpolate(method='linear') elif method == 'time_weighted': # Time-weighted interpolation method_name = 'time-weighted interpolation' series_filled = self._time_weighted_interpolation(series) elif method == 'seasonal': # Seasonal interpolation method_name = 'seasonal interpolation' series_filled = self._seasonal_interpolation(series, **kwargs) elif method == 'ffill': # Forward fill method_name = 'forward fill' series_filled = series.ffill(limit=kwargs.get('limit', None)) elif method == 'bfill': # Backward fill method_name = 'backward fill' series_filled = series.bfill(limit=kwargs.get('limit', None)) elif method == 'mean': # Mean imputation method_name = 'mean imputation' series_filled = series.fillna(series.mean()) elif method == 'median': # Median imputation method_name = 'median imputation' series_filled = series.fillna(series.median()) elif method == 'mode': # Mode imputation method_name = 'mode imputation' mode_value = series.mode() if not mode_value.empty: series_filled = series.fillna(mode_value.iloc[0]) else: series_filled = series.fillna(series.median()) elif method == 'knn': # KNN imputation method_name = f"KNN imputation (k={kwargs.get('k', 5)})" # Simplified version using nearest neighbour mean series_filled = self._knn_imputation(series, k=kwargs.get('k', 5)) elif method == 'regression': # Regression imputation method_name = 'regression imputation' series_filled = self._regression_imputation(series, **kwargs) elif method == 'spline': # Spline interpolation method_name = 'spline interpolation' series_filled = series.interpolate(method='spline', order=kwargs.get('order', 3)) elif method == 'stl': # STL decomposition + interpolation method_name = 'STL-based imputation' series_filled = self._stl_imputation(series, **kwargs) else: raise ValueError(f"Unknown method: {method}") # If missing values remain, fill with ffill/bfill if series_filled.isnull().any(): series_filled = series_filled.ffill().bfill() method_name += " + ffill/bfill" return series_filled, method_name def _time_weighted_interpolation(self, series: pd.Series) -> pd.Series: """Time-weighted interpolation""" if not isinstance(series.index, pd.DatetimeIndex): return series.interpolate() # Create timestamps time_numeric = pd.Series(range(len(series)), index=series.index) # Interpolate timestamps for missing values time_interpolated = time_numeric.interpolate() # Interpolate values based on timestamps valid_mask = series.notna() if valid_mask.sum() < 2: return series.ffill().bfill() # Use linear interpolation valid_times = time_numeric[valid_mask] valid_values = series[valid_mask] # Interpolation interp_func = interp1d( valid_times, valid_values, kind='linear', bounds_error=False, fill_value='extrapolate' ) series_filled = series.copy() missing_mask = series.isna() series_filled[missing_mask] = interp_func(time_interpolated[missing_mask]) return series_filled def _seasonal_interpolation( self, series: pd.Series, **kwargs ) -> pd.Series: """Seasonal interpolation""" if not isinstance(series.index, pd.DatetimeIndex): return series.interpolate() period = kwargs.get('period', self.config.seasonal_period) # Create series copy series_filled = series.copy() # Interpolation considering seasonality for i in range(len(series)): if pd.isna(series.iloc[i]): # Find values at same seasonal position seasonal_indices = [] for offset in range(1, 10): # Look in previous/next cycles idx_back = i - offset * period idx_forward = i + offset * period if idx_back >= 0 and not pd.isna(series.iloc[idx_back]): seasonal_indices.append(idx_back) if idx_forward < len(series) and not pd.isna(series.iloc[idx_forward]): seasonal_indices.append(idx_forward) if seasonal_indices: # Take mean value from seasonal positions seasonal_values = series.iloc[seasonal_indices] series_filled.iloc[i] = seasonal_values.mean() # Fill remaining missing values with regular interpolation series_filled = series_filled.interpolate() return series_filled def _knn_imputation( self, series: pd.Series, k: int = 5 ) -> pd.Series: """KNN imputation for time series""" # Simplified KNN for time series series_filled = series.copy() for i in range(len(series)): if pd.isna(series.iloc[i]): # Find nearest k non-missing values distances = [] values = [] for j in range(max(0, i - k * 10), min(len(series), i + k * 10)): if j != i and not pd.isna(series.iloc[j]): distance = abs(i - j) distances.append(distance) values.append(series.iloc[j]) if len(values) >= k: break if values: # Distance-weighted average weights = [1 / (d + 1) for d in distances] weighted_avg = np.average(values, weights=weights) series_filled.iloc[i] = weighted_avg return series_filled def _regression_imputation( self, series: pd.Series, **kwargs ) -> pd.Series: """Regression imputation based on neighbouring values""" # Simplified regression for time series series_filled = series.copy() if series.notna().sum() < 3: return series.ffill().bfill() # Use polynomial regression x = np.arange(len(series)) y = series.values # Valid values mask valid_mask = ~np.isnan(y) if valid_mask.sum() < 2: return series.ffill().bfill() # Polynomial regression degree 2 coeffs = np.polyfit(x[valid_mask], y[valid_mask], 2) poly_func = np.poly1d(coeffs) # Fill missing values missing_mask = np.isnan(y) series_filled.iloc[missing_mask] = poly_func(x[missing_mask]) return series_filled def _stl_imputation( self, series: pd.Series, **kwargs ) -> pd.Series: """STL decomposition-based imputation""" try: if not isinstance(series.index, pd.DatetimeIndex): return series.interpolate() # STL decomposition stl = STL( series.ffill().bfill(), # Fill missing for STL period=kwargs.get('period', self.config.seasonal_period), robust=True ) result = stl.fit() # Reconstruct series without noise reconstructed = result.trend + result.seasonal # Replace missing values with reconstructed values series_filled = series.copy() missing_mask = series.isna() series_filled[missing_mask] = reconstructed[missing_mask] return series_filled except Exception as e: logger.warning(f"STL imputation failed: {e}, using interpolation") return series.interpolate() def _handle_rowwise( self, data: pd.DataFrame, method: str, **kwargs ) -> pd.DataFrame: """Row-wise missing value handling""" data_processed = data.copy() # Remove rows with high missing counts if kwargs.get('drop_rows_threshold', 0) > 0: threshold = kwargs['drop_rows_threshold'] rows_before = len(data_processed) missing_per_row = data_processed.isnull().sum(axis=1) / data_processed.shape[1] * 100 rows_to_drop = missing_per_row[missing_per_row > threshold].index data_processed = data_processed.drop(rows_to_drop) rows_after = len(data_processed) logger.info(f"Rows removed: {rows_before - rows_after} (missing > {threshold}%)") # Row-wise imputation if method == 'row_mean': data_processed = data_processed.T.fillna(data_processed.mean(axis=1)).T elif method == 'row_median': data_processed = data_processed.T.fillna(data_processed.median(axis=1)).T elif method == 'row_ffill': data_processed = data_processed.ffill(axis=1).bfill(axis=1) return data_processed def create_validation_rules(self) -> Dict: """Create validation rules based on missing value analysis""" rules = {} for col, info in self.missing_info['summary'].items(): missing_percent = info['missing_percent'] if missing_percent > 50: rules[col] = { 'action': 'drop_column', 'reason': f'Missing > 50%: {missing_percent:.1f}%' } elif missing_percent > 20: rules[col] = { 'action': 'advanced_imputation', 'reason': f'High missing: {missing_percent:.1f}%', 'recommended_method': 'knn' } elif missing_percent > 5: rules[col] = { 'action': 'standard_imputation', 'reason': f'Moderate missing: {missing_percent:.1f}%', 'recommended_method': 'interpolate' } elif missing_percent > 0: rules[col] = { 'action': 'simple_imputation', 'reason': f'Low missing: {missing_percent:.1f}%', 'recommended_method': 'ffill' } return rules def get_report(self) -> Dict: """Get missing values report""" return { 'missing_info': self.missing_info, 'handling_methods': self.handling_methods, 'missing_patterns': self.missing_patterns, 'validation_rules': self.create_validation_rules() }