TimeFlowPro1 / missing_values /missing_analyzer.py
ArabovMK's picture
Update all files
bd3c428
# ============================================
# CLASS 3: MISSING VALUE ANALYSER
# ============================================
from typing import Dict, Tuple
from venv import logger
from config.config import Config
from scipy.interpolate import interp1d
from statsmodels.tsa.seasonal import seasonal_decompose, STL
try:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
print("✅ All imports working!")
except ImportError as e:
print(f"❌ Import error: {e}")
class MissingValueAnalyser:
"""Class for analysing and handling missing values"""
def __init__(self, config: Config):
"""
Initialise missing value analyser
Parameters:
-----------
config : Config
Experiment configuration
"""
self.config = config
self.missing_info = {}
self.handling_methods = {}
self.imputers = {}
self.missing_patterns = {}
def analyse(
self,
data: pd.DataFrame,
detailed: bool = True
) -> Dict:
"""
Analyse missing values in data
Parameters:
-----------
data : pd.DataFrame
Input data
detailed : bool
Whether to perform detailed analysis
Returns:
--------
Dict
Information about missing values
"""
logger.info("\n" + "="*80)
logger.info("MISSING VALUE ANALYSIS")
logger.info("="*80)
# Calculate missing values
missing_total = data.isnull().sum()
missing_percent = (missing_total / len(data)) * 100
missing_df = pd.DataFrame({
'missing_count': missing_total,
'missing_percent': missing_percent,
'dtype': data.dtypes.astype(str)
})
# Detailed analysis
if detailed:
self._detailed_missing_analysis(data, missing_df)
# Save information
self.missing_info = {
'summary': {
col: {
'missing_count': int(missing_df.loc[col, 'missing_count']),
'missing_percent': float(missing_df.loc[col, 'missing_percent']),
'dtype': missing_df.loc[col, 'dtype']
}
for col in missing_df.index
},
'overall': {
'total_missing': int(missing_total.sum()),
'total_rows': int(len(data)),
'total_cells': int(data.size),
'overall_missing_percentage': float(missing_total.sum() / data.size * 100),
'rows_with_any_missing': int(data.isnull().any(axis=1).sum()),
'rows_all_missing': int(data.isnull().all(axis=1).sum()),
'columns_with_missing': missing_df[missing_df['missing_count'] > 0].index.tolist(),
'columns_all_missing': missing_df[missing_df['missing_count'] == len(data)].index.tolist()
}
}
# Visualisation
if self.config.save_plots:
self._plot_missing_values(data, missing_df)
# Output results
self._log_missing_summary(missing_df)
return self.missing_info
def _detailed_missing_analysis(
self,
data: pd.DataFrame,
missing_df: pd.DataFrame
) -> None:
"""Detailed missing value analysis"""
# Analyse missing patterns
missing_matrix = data.isnull()
# Row missing patterns
row_patterns = missing_matrix.apply(lambda x: ''.join(x.astype(int).astype(str)), axis=1)
row_pattern_counts = row_patterns.value_counts().head(10)
# Column missing patterns
col_patterns = missing_matrix.apply(lambda x: ''.join(x.astype(int).astype(str)), axis=0)
col_pattern_counts = col_patterns.value_counts().head(10)
# Time-based missing patterns analysis
time_patterns = {}
if isinstance(data.index, pd.DatetimeIndex):
# Missing values by time
time_missing = data.isnull().resample('M').sum()
time_patterns['monthly_missing'] = time_missing.sum(axis=1).to_dict()
# Missing values by day of week
data_with_dow = data.copy()
data_with_dow['dayofweek'] = data.index.dayofweek
dow_missing = data_with_dow.groupby('dayofweek').apply(lambda x: x.isnull().sum().sum())
time_patterns['dayofweek_missing'] = dow_missing.to_dict()
self.missing_patterns = {
'row_patterns': row_pattern_counts.to_dict(),
'col_patterns': col_pattern_counts.to_dict(),
'time_patterns': time_patterns,
'missing_correlation': missing_matrix.corr().to_dict() # Missing value correlation
}
logger.debug(f"Found {len(row_pattern_counts)} unique row missing patterns")
logger.debug(f"Found {len(col_pattern_counts)} unique column missing patterns")
def _plot_missing_values(
self,
data: pd.DataFrame,
missing_df: pd.DataFrame
) -> None:
"""Visualise missing values"""
fig, axes = plt.subplots(3, 2, figsize=(16, 12))
# 1. Missing percentage histogram
axes[0, 0].barh(
missing_df.index,
missing_df['missing_percent']
)
axes[0, 0].axvline(self.config.missing_threshold, color='red', linestyle='--')
axes[0, 0].set_title('Missing Percentage by Column')
axes[0, 0].set_xlabel('Missing Percentage (%)')
axes[0, 0].set_ylabel('Columns')
axes[0, 0].grid(True, alpha=0.3)
# 2. Missing values heatmap
missing_matrix = data.isnull()
axes[0, 1].imshow(
missing_matrix.T if len(data) > 1000 else missing_matrix.T[:1000],
aspect='auto',
cmap='binary',
interpolation='none'
)
axes[0, 1].set_title('Missing Values Matrix')
axes[0, 1].set_xlabel('Observation Index')
axes[0, 1].set_ylabel('Variables')
axes[0, 1].set_yticks(range(len(data.columns)))
axes[0, 1].set_yticklabels(data.columns, fontsize=8)
# 3. Missing values over time (if time series)
if isinstance(data.index, pd.DatetimeIndex):
time_missing = data.isnull().resample('M').sum()
axes[1, 0].plot(time_missing.sum(axis=1))
axes[1, 0].set_title('Missing Values by Month')
axes[1, 0].set_xlabel('Date')
axes[1, 0].set_ylabel('Number of Missing Values')
axes[1, 0].grid(True, alpha=0.3)
# 4. Missing values by day of week
data_with_dow = data.copy()
data_with_dow['dayofweek'] = data.index.dayofweek
dow_missing = data_with_dow.groupby('dayofweek').apply(lambda x: x.isnull().sum().sum())
dow_names = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
axes[1, 1].bar(range(7), dow_missing)
axes[1, 1].set_title('Missing Values by Day of Week')
axes[1, 1].set_xlabel('Day of Week')
axes[1, 1].set_ylabel('Number of Missing Values')
axes[1, 1].set_xticks(range(7))
axes[1, 1].set_xticklabels(dow_names)
axes[1, 1].grid(True, alpha=0.3)
# 5. Missing value correlation
missing_corr = data.isnull().corr()
im = axes[2, 0].imshow(
missing_corr,
cmap='coolwarm',
vmin=-1,
vmax=1,
aspect='auto'
)
axes[2, 0].set_title('Missing Value Correlation Between Variables')
axes[2, 0].set_xlabel('Variables')
axes[2, 0].set_ylabel('Variables')
plt.colorbar(im, ax=axes[2, 0])
# 6. Cumulative missing sum
cumulative_missing = data.isnull().cumsum()
for col in data.columns[:5]: # First 5 columns
if data[col].isnull().any():
axes[2, 1].plot(
cumulative_missing.index,
cumulative_missing[col],
label=col[:20]
)
axes[2, 1].set_title('Cumulative Missing Values')
axes[2, 1].set_xlabel('Time/Index')
axes[2, 1].set_ylabel('Cumulative Missing')
axes[2, 1].legend(fontsize=8)
axes[2, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(
f'{self.config.results_dir}/plots/missing_values_analysis.png',
dpi=300,
bbox_inches='tight'
)
plt.show()
def _log_missing_summary(self, missing_df: pd.DataFrame) -> None:
"""Log missing value summary"""
missing_columns = missing_df[missing_df['missing_count'] > 0]
if len(missing_columns) > 0:
logger.info("MISSING VALUES FOUND:")
logger.info("-" * 50)
logger.info(f"Total missing values: {self.missing_info['overall']['total_missing']}")
logger.info(f"Overall missing percentage: {self.missing_info['overall']['overall_missing_percentage']:.2f}%")
logger.info(f"Rows with missing values: {self.missing_info['overall']['rows_with_any_missing']}")
logger.info(f"Columns with missing values: {len(self.missing_info['overall']['columns_with_missing'])}")
logger.info("\nTop-10 columns by missing values:")
top_missing = missing_df.nlargest(10, 'missing_percent')
for idx, (col, row) in enumerate(top_missing.iterrows(), 1):
logger.info(f" {idx:2d}. {col}: {int(row['missing_count'])} missing ({row['missing_percent']:.2f}%)")
else:
logger.info("✓ No missing values found")
def handle(
self,
data: pd.DataFrame,
method: str = 'interpolate',
strategy: str = 'columnwise',
**kwargs
) -> pd.DataFrame:
"""
Handle missing values
Parameters:
-----------
data : pd.DataFrame
Input data
method : str
Handling method: 'interpolate', 'ffill', 'bfill', 'mean', 'median', 'mode', 'knn', 'regression'
strategy : str
Strategy: 'columnwise', 'rowwise', 'global'
**kwargs : dict
Additional parameters for method
Returns:
--------
pd.DataFrame
Data with handled missing values
"""
logger.info("\n" + "="*80)
logger.info("HANDLING MISSING VALUES")
logger.info("="*80)
data_processed = data.copy()
methods_applied = {}
# Determine columns to process
if strategy == 'columnwise':
columns_to_process = data_processed.columns
elif strategy == 'rowwise':
# Row-wise handling (for time series)
data_processed = self._handle_rowwise(data_processed, method, **kwargs)
return data_processed
else:
columns_to_process = data_processed.select_dtypes(include=[np.number]).columns
# Process each column
for col in columns_to_process:
missing_before = data_processed[col].isnull().sum()
if missing_before > 0:
# Check if missing percentage exceeds threshold
missing_percent = (missing_before / len(data_processed)) * 100
if missing_percent > self.config.missing_threshold:
logger.warning(f" {col}: {missing_before} missing ({missing_percent:.1f}%) > threshold {self.config.missing_threshold}%")
if kwargs.get('drop_high_missing', False):
data_processed = data_processed.drop(columns=[col])
method_used = f"dropped (>{self.config.missing_threshold}% missing)"
missing_after = 0
else:
# Use selected method
data_processed[col], method_used = self._apply_imputation_method(
data_processed[col], method, **kwargs
)
missing_after = data_processed[col].isnull().sum()
else:
# Use selected method
data_processed[col], method_used = self._apply_imputation_method(
data_processed[col], method, **kwargs
)
missing_after = data_processed[col].isnull().sum()
methods_applied[col] = {
'method': method_used,
'missing_before': int(missing_before),
'missing_after': int(missing_after),
'missing_percent_before': float(missing_percent)
}
if missing_before > 0:
logger.info(f" {col}: {missing_before}{missing_after} missing ({method_used})")
self.handling_methods = methods_applied
# Check that all missing values are handled
remaining_missing = data_processed.isnull().sum().sum()
if remaining_missing == 0:
logger.info("✓ All missing values successfully handled")
else:
logger.warning(f"⚠ {remaining_missing} missing values remain")
# Additional handling of remaining missing values
data_processed = data_processed.fillna(method='ffill').fillna(method='bfill')
remaining_after = data_processed.isnull().sum().sum()
if remaining_after == 0:
logger.info("✓ Remaining missing values handled with ffill/bfill combination")
return data_processed
def _apply_imputation_method(
self,
series: pd.Series,
method: str,
**kwargs
) -> Tuple[pd.Series, str]:
"""
Apply imputation method to individual series
Parameters:
-----------
series : pd.Series
Input series
method : str
Imputation method
**kwargs : dict
Additional parameters
Returns:
--------
Tuple[pd.Series, str]
Processed series and method description
"""
if method == 'interpolate':
# Interpolation for time series
if isinstance(series.index, pd.DatetimeIndex):
method_name = f"{kwargs.get('interpolation_method', 'linear')} interpolation"
series_filled = series.interpolate(
method=kwargs.get('interpolation_method', 'linear'),
limit_direction=kwargs.get('limit_direction', 'both'),
limit=kwargs.get('limit', None)
)
else:
method_name = 'linear interpolation'
series_filled = series.interpolate(method='linear')
elif method == 'time_weighted':
# Time-weighted interpolation
method_name = 'time-weighted interpolation'
series_filled = self._time_weighted_interpolation(series)
elif method == 'seasonal':
# Seasonal interpolation
method_name = 'seasonal interpolation'
series_filled = self._seasonal_interpolation(series, **kwargs)
elif method == 'ffill':
# Forward fill
method_name = 'forward fill'
series_filled = series.ffill(limit=kwargs.get('limit', None))
elif method == 'bfill':
# Backward fill
method_name = 'backward fill'
series_filled = series.bfill(limit=kwargs.get('limit', None))
elif method == 'mean':
# Mean imputation
method_name = 'mean imputation'
series_filled = series.fillna(series.mean())
elif method == 'median':
# Median imputation
method_name = 'median imputation'
series_filled = series.fillna(series.median())
elif method == 'mode':
# Mode imputation
method_name = 'mode imputation'
mode_value = series.mode()
if not mode_value.empty:
series_filled = series.fillna(mode_value.iloc[0])
else:
series_filled = series.fillna(series.median())
elif method == 'knn':
# KNN imputation
method_name = f"KNN imputation (k={kwargs.get('k', 5)})"
# Simplified version using nearest neighbour mean
series_filled = self._knn_imputation(series, k=kwargs.get('k', 5))
elif method == 'regression':
# Regression imputation
method_name = 'regression imputation'
series_filled = self._regression_imputation(series, **kwargs)
elif method == 'spline':
# Spline interpolation
method_name = 'spline interpolation'
series_filled = series.interpolate(method='spline', order=kwargs.get('order', 3))
elif method == 'stl':
# STL decomposition + interpolation
method_name = 'STL-based imputation'
series_filled = self._stl_imputation(series, **kwargs)
else:
raise ValueError(f"Unknown method: {method}")
# If missing values remain, fill with ffill/bfill
if series_filled.isnull().any():
series_filled = series_filled.ffill().bfill()
method_name += " + ffill/bfill"
return series_filled, method_name
def _time_weighted_interpolation(self, series: pd.Series) -> pd.Series:
"""Time-weighted interpolation"""
if not isinstance(series.index, pd.DatetimeIndex):
return series.interpolate()
# Create timestamps
time_numeric = pd.Series(range(len(series)), index=series.index)
# Interpolate timestamps for missing values
time_interpolated = time_numeric.interpolate()
# Interpolate values based on timestamps
valid_mask = series.notna()
if valid_mask.sum() < 2:
return series.ffill().bfill()
# Use linear interpolation
valid_times = time_numeric[valid_mask]
valid_values = series[valid_mask]
# Interpolation
interp_func = interp1d(
valid_times,
valid_values,
kind='linear',
bounds_error=False,
fill_value='extrapolate'
)
series_filled = series.copy()
missing_mask = series.isna()
series_filled[missing_mask] = interp_func(time_interpolated[missing_mask])
return series_filled
def _seasonal_interpolation(
self,
series: pd.Series,
**kwargs
) -> pd.Series:
"""Seasonal interpolation"""
if not isinstance(series.index, pd.DatetimeIndex):
return series.interpolate()
period = kwargs.get('period', self.config.seasonal_period)
# Create series copy
series_filled = series.copy()
# Interpolation considering seasonality
for i in range(len(series)):
if pd.isna(series.iloc[i]):
# Find values at same seasonal position
seasonal_indices = []
for offset in range(1, 10): # Look in previous/next cycles
idx_back = i - offset * period
idx_forward = i + offset * period
if idx_back >= 0 and not pd.isna(series.iloc[idx_back]):
seasonal_indices.append(idx_back)
if idx_forward < len(series) and not pd.isna(series.iloc[idx_forward]):
seasonal_indices.append(idx_forward)
if seasonal_indices:
# Take mean value from seasonal positions
seasonal_values = series.iloc[seasonal_indices]
series_filled.iloc[i] = seasonal_values.mean()
# Fill remaining missing values with regular interpolation
series_filled = series_filled.interpolate()
return series_filled
def _knn_imputation(
self,
series: pd.Series,
k: int = 5
) -> pd.Series:
"""KNN imputation for time series"""
# Simplified KNN for time series
series_filled = series.copy()
for i in range(len(series)):
if pd.isna(series.iloc[i]):
# Find nearest k non-missing values
distances = []
values = []
for j in range(max(0, i - k * 10), min(len(series), i + k * 10)):
if j != i and not pd.isna(series.iloc[j]):
distance = abs(i - j)
distances.append(distance)
values.append(series.iloc[j])
if len(values) >= k:
break
if values:
# Distance-weighted average
weights = [1 / (d + 1) for d in distances]
weighted_avg = np.average(values, weights=weights)
series_filled.iloc[i] = weighted_avg
return series_filled
def _regression_imputation(
self,
series: pd.Series,
**kwargs
) -> pd.Series:
"""Regression imputation based on neighbouring values"""
# Simplified regression for time series
series_filled = series.copy()
if series.notna().sum() < 3:
return series.ffill().bfill()
# Use polynomial regression
x = np.arange(len(series))
y = series.values
# Valid values mask
valid_mask = ~np.isnan(y)
if valid_mask.sum() < 2:
return series.ffill().bfill()
# Polynomial regression degree 2
coeffs = np.polyfit(x[valid_mask], y[valid_mask], 2)
poly_func = np.poly1d(coeffs)
# Fill missing values
missing_mask = np.isnan(y)
series_filled.iloc[missing_mask] = poly_func(x[missing_mask])
return series_filled
def _stl_imputation(
self,
series: pd.Series,
**kwargs
) -> pd.Series:
"""STL decomposition-based imputation"""
try:
if not isinstance(series.index, pd.DatetimeIndex):
return series.interpolate()
# STL decomposition
stl = STL(
series.ffill().bfill(), # Fill missing for STL
period=kwargs.get('period', self.config.seasonal_period),
robust=True
)
result = stl.fit()
# Reconstruct series without noise
reconstructed = result.trend + result.seasonal
# Replace missing values with reconstructed values
series_filled = series.copy()
missing_mask = series.isna()
series_filled[missing_mask] = reconstructed[missing_mask]
return series_filled
except Exception as e:
logger.warning(f"STL imputation failed: {e}, using interpolation")
return series.interpolate()
def _handle_rowwise(
self,
data: pd.DataFrame,
method: str,
**kwargs
) -> pd.DataFrame:
"""Row-wise missing value handling"""
data_processed = data.copy()
# Remove rows with high missing counts
if kwargs.get('drop_rows_threshold', 0) > 0:
threshold = kwargs['drop_rows_threshold']
rows_before = len(data_processed)
missing_per_row = data_processed.isnull().sum(axis=1) / data_processed.shape[1] * 100
rows_to_drop = missing_per_row[missing_per_row > threshold].index
data_processed = data_processed.drop(rows_to_drop)
rows_after = len(data_processed)
logger.info(f"Rows removed: {rows_before - rows_after} (missing > {threshold}%)")
# Row-wise imputation
if method == 'row_mean':
data_processed = data_processed.T.fillna(data_processed.mean(axis=1)).T
elif method == 'row_median':
data_processed = data_processed.T.fillna(data_processed.median(axis=1)).T
elif method == 'row_ffill':
data_processed = data_processed.ffill(axis=1).bfill(axis=1)
return data_processed
def create_validation_rules(self) -> Dict:
"""Create validation rules based on missing value analysis"""
rules = {}
for col, info in self.missing_info['summary'].items():
missing_percent = info['missing_percent']
if missing_percent > 50:
rules[col] = {
'action': 'drop_column',
'reason': f'Missing > 50%: {missing_percent:.1f}%'
}
elif missing_percent > 20:
rules[col] = {
'action': 'advanced_imputation',
'reason': f'High missing: {missing_percent:.1f}%',
'recommended_method': 'knn'
}
elif missing_percent > 5:
rules[col] = {
'action': 'standard_imputation',
'reason': f'Moderate missing: {missing_percent:.1f}%',
'recommended_method': 'interpolate'
}
elif missing_percent > 0:
rules[col] = {
'action': 'simple_imputation',
'reason': f'Low missing: {missing_percent:.1f}%',
'recommended_method': 'ffill'
}
return rules
def get_report(self) -> Dict:
"""Get missing values report"""
return {
'missing_info': self.missing_info,
'handling_methods': self.handling_methods,
'missing_patterns': self.missing_patterns,
'validation_rules': self.create_validation_rules()
}