# ============================================ # CLASS 11: FEATURE SELECTION # ============================================ from typing import Dict, List, Optional, Tuple from venv import logger from config.config import Config try: import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from sklearn.ensemble import RandomForestRegressor from sklearn.decomposition import PCA from sklearn.preprocessing import StandardScaler print("✅ All imports working!") except ImportError as e: print(f"❌ Import error: {e}") from sklearn.inspection import permutation_importance, partial_dependence from sklearn.feature_selection import ( SelectKBest, SelectPercentile, RFE, RFECV, VarianceThreshold, f_regression, mutual_info_regression ) class FeatureSelector: """Class for selecting the most important features""" def __init__(self, config: Config): """ Initialise feature selector Parameters: ----------- config : Config Experiment configuration """ self.config = config self.selected_features = [] self.feature_importances = {} self.selection_methods = {} self.selector_objects = {} def select( self, data: pd.DataFrame, target_col: Optional[str] = None, method: str = None, n_features: int = None, **kwargs ) -> pd.DataFrame: """ Select the most important features Parameters: ----------- data : pd.DataFrame Input data target_col : str, optional Target variable. If None, uses configuration value. method : str, optional Selection method. If None, uses configuration value. n_features : int, optional Number of features to select. If None, uses configuration value. **kwargs : dict Additional parameters for method Returns: -------- pd.DataFrame Data with selected features """ logger.info("\n" + "="*80) logger.info("FEATURE SELECTION") logger.info("="*80) target_col = target_col or self.config.target_column method = method or self.config.feature_selection_method n_features = n_features or self.config.max_features if target_col not in data.columns: logger.error(f"Target variable '{target_col}' not found") return data # Prepare data X = data.drop(columns=[target_col]).select_dtypes(include=[np.number]) y = data[target_col] # Remove missing values mask = X.notna().all(axis=1) & y.notna() X_clean = X[mask] y_clean = y[mask] if len(X_clean) < 10 or len(X_clean.columns) < 2: logger.warning("Insufficient data for feature selection") return data logger.info(f"Selection method: {method}") logger.info(f"Target number of features: {n_features}") logger.info(f"Initial number of features: {len(X.columns)}") logger.info(f"Data for selection: {len(X_clean)} records") # Apply selection method selected_features_list = [] feature_importance_dict = {} if method == 'correlation': selected_features_list, feature_importance_dict = self._correlation_selection( X_clean, y_clean, n_features, **kwargs ) elif method == 'mutual_info': selected_features_list, feature_importance_dict = self._mutual_info_selection( X_clean, y_clean, n_features, **kwargs ) elif method == 'rf': selected_features_list, feature_importance_dict = self._random_forest_selection( X_clean, y_clean, n_features, **kwargs ) elif method == 'pca': selected_features_list, feature_importance_dict = self._pca_selection( X_clean, y_clean, n_features, **kwargs ) elif method == 'rfe': selected_features_list, feature_importance_dict = self._rfe_selection( X_clean, y_clean, n_features, **kwargs ) elif method == 'lasso': selected_features_list, feature_importance_dict = self._lasso_selection( X_clean, y_clean, n_features, **kwargs ) elif method == 'hybrid': selected_features_list, feature_importance_dict = self._hybrid_selection( X_clean, y_clean, n_features, **kwargs ) else: logger.warning(f"Method {method} not supported, using correlation") selected_features_list, feature_importance_dict = self._correlation_selection( X_clean, y_clean, n_features, **kwargs ) # Save selected features self.selected_features = selected_features_list self.feature_importances = feature_importance_dict self.selection_methods[method] = { 'selected_features': selected_features_list, 'n_features': len(selected_features_list), 'feature_importances': feature_importance_dict } # Form final dataset features_to_keep = selected_features_list + [target_col] features_to_keep = [f for f in features_to_keep if f in data.columns] data_selected = data[features_to_keep].copy() logger.info(f"✓ Selected {len(selected_features_list)} features") logger.info(f" Total features kept: {len(data_selected.columns)}") # Visualisation if self.config.save_plots and selected_features_list: self._plot_feature_selection( X_clean, y_clean, selected_features_list, feature_importance_dict, method ) return data_selected def _correlation_selection( self, X: pd.DataFrame, y: pd.Series, n_features: int, **kwargs ) -> Tuple[List[str], Dict]: """Feature selection based on correlation""" # Calculate correlations with target variable correlations = X.corrwith(y).abs().sort_values(ascending=False) # Select top-n_features selected_features = correlations.head(n_features).index.tolist() feature_importance = correlations.to_dict() return selected_features, feature_importance def _mutual_info_selection( self, X: pd.DataFrame, y: pd.Series, n_features: int, **kwargs ) -> Tuple[List[str], Dict]: """Feature selection based on mutual information""" try: mi_scores = mutual_info_regression(X, y, random_state=kwargs.get('random_state', 42)) mi_series = pd.Series(mi_scores, index=X.columns) mi_series = mi_series.sort_values(ascending=False) selected_features = mi_series.head(n_features).index.tolist() feature_importance = mi_series.to_dict() return selected_features, feature_importance except Exception as e: logger.warning(f"Mutual information selection failed: {e}, using correlation") return self._correlation_selection(X, y, n_features, **kwargs) def _random_forest_selection( self, X: pd.DataFrame, y: pd.Series, n_features: int, **kwargs ) -> Tuple[List[str], Dict]: """Feature selection based on Random Forest""" try: rf = RandomForestRegressor( n_estimators=kwargs.get('n_estimators', 100), max_depth=kwargs.get('max_depth', None), random_state=kwargs.get('random_state', 42), n_jobs=self.config.n_jobs if self.config.use_multiprocessing else None ) rf.fit(X, y) importances = pd.Series(rf.feature_importances_, index=X.columns) importances = importances.sort_values(ascending=False) selected_features = importances.head(n_features).index.tolist() feature_importance = importances.to_dict() self.selector_objects['random_forest'] = rf return selected_features, feature_importance except Exception as e: logger.warning(f"Random Forest selection failed: {e}, using correlation") return self._correlation_selection(X, y, n_features, **kwargs) def _pca_selection( self, X: pd.DataFrame, y: pd.Series, n_features: int, **kwargs ) -> Tuple[List[str], Dict]: """Feature selection based on PCA""" try: # First standardise data from sklearn.preprocessing import StandardScaler scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # Apply PCA pca = PCA(n_components=min(n_features, len(X.columns))) X_pca = pca.fit_transform(X_scaled) # Get feature importance via absolute component values importance = np.abs(pca.components_).sum(axis=0) importance_series = pd.Series(importance, index=X.columns) importance_series = importance_series.sort_values(ascending=False) selected_features = importance_series.head(n_features).index.tolist() feature_importance = importance_series.to_dict() self.selector_objects['pca'] = pca self.selector_objects['scaler'] = scaler return selected_features, feature_importance except Exception as e: logger.warning(f"PCA selection failed: {e}, using correlation") return self._correlation_selection(X, y, n_features, **kwargs) def _rfe_selection( self, X: pd.DataFrame, y: pd.Series, n_features: int, **kwargs ) -> Tuple[List[str], Dict]: """Recursive Feature Elimination""" try: from sklearn.feature_selection import RFE from sklearn.linear_model import LinearRegression estimator = LinearRegression() rfe = RFE( estimator=estimator, n_features_to_select=n_features, step=kwargs.get('step', 1) ) rfe.fit(X, y) selected_mask = rfe.support_ selected_features = X.columns[selected_mask].tolist() # Feature importance via ranking ranking = pd.Series(rfe.ranking_, index=X.columns) feature_importance = (1 / ranking).to_dict() # Convert ranking to importance self.selector_objects['rfe'] = rfe return selected_features, feature_importance except Exception as e: logger.warning(f"RFE selection failed: {e}, using correlation") return self._correlation_selection(X, y, n_features, **kwargs) def _lasso_selection( self, X: pd.DataFrame, y: pd.Series, n_features: int, **kwargs ) -> Tuple[List[str], Dict]: """Feature selection using Lasso""" try: from sklearn.linear_model import LassoCV lasso = LassoCV( cv=kwargs.get('cv', 5), random_state=kwargs.get('random_state', 42), max_iter=kwargs.get('max_iter', 1000) ) lasso.fit(X, y) # Features with non-zero coefficients coefficients = pd.Series(lasso.coef_, index=X.columns) non_zero_features = coefficients[coefficients != 0].abs().sort_values(ascending=False) # Select top-n_features selected_features = non_zero_features.head(n_features).index.tolist() feature_importance = non_zero_features.to_dict() self.selector_objects['lasso'] = lasso return selected_features, feature_importance except Exception as e: logger.warning(f"Lasso selection failed: {e}, using correlation") return self._correlation_selection(X, y, n_features, **kwargs) def _hybrid_selection( self, X: pd.DataFrame, y: pd.Series, n_features: int, **kwargs ) -> Tuple[List[str], Dict]: """Hybrid feature selection method""" # Combine multiple methods methods = kwargs.get('methods', ['correlation', 'mutual_info', 'rf']) weights = kwargs.get('weights', [0.3, 0.3, 0.4]) all_importances = {} for method, weight in zip(methods, weights): try: if method == 'correlation': _, importance = self._correlation_selection(X, y, n_features, **kwargs) elif method == 'mutual_info': _, importance = self._mutual_info_selection(X, y, n_features, **kwargs) elif method == 'rf': _, importance = self._random_forest_selection(X, y, n_features, **kwargs) else: continue # Normalise importances and weight them importance_series = pd.Series(importance) if importance_series.max() > importance_series.min(): importance_normalized = (importance_series - importance_series.min()) / \ (importance_series.max() - importance_series.min()) else: importance_normalized = pd.Series(1, index=importance_series.index) # Add weighted importances for feature in importance_normalized.index: if feature not in all_importances: all_importances[feature] = 0 all_importances[feature] += importance_normalized[feature] * weight except Exception as e: logger.debug(f"Method {method} failed in hybrid selection: {e}") # Sort by total importance combined_importance = pd.Series(all_importances).sort_values(ascending=False) selected_features = combined_importance.head(n_features).index.tolist() return selected_features, combined_importance.to_dict() def _plot_feature_selection( self, X: pd.DataFrame, y: pd.Series, selected_features: List[str], feature_importance: Dict, method: str ) -> None: """Visualise feature selection results""" # Prepare data for visualisation importance_series = pd.Series(feature_importance).sort_values(ascending=False) # Limit number of features for display display_features = importance_series.head(20) fig, axes = plt.subplots(2, 2, figsize=(14, 10)) # 1. Feature importance y_pos = np.arange(len(display_features)) axes[0, 0].barh(y_pos, display_features.values) axes[0, 0].set_yticks(y_pos) axes[0, 0].set_yticklabels(display_features.index, fontsize=9) axes[0, 0].invert_yaxis() axes[0, 0].set_xlabel('Importance') axes[0, 0].set_title(f'Top-{len(display_features)} features by importance ({method})') axes[0, 0].grid(True, alpha=0.3, axis='x') # 2. Cumulative importance cumulative_importance = importance_series.cumsum() / importance_series.sum() axes[0, 1].plot(range(1, len(cumulative_importance) + 1), cumulative_importance.values) axes[0, 1].axhline(y=0.8, color='r', linestyle='--', alpha=0.7, label='80% importance') axes[0, 1].axhline(y=0.9, color='orange', linestyle='--', alpha=0.7, label='90% importance') axes[0, 1].set_xlabel('Number of features') axes[0, 1].set_ylabel('Cumulative importance') axes[0, 1].set_title('Cumulative feature importance') axes[0, 1].legend() axes[0, 1].grid(True, alpha=0.3) # 3. Correlation matrix of selected features if len(selected_features) > 1: selected_X = X[selected_features] corr_matrix = selected_X.corr() mask = np.triu(np.ones_like(corr_matrix, dtype=bool)) sns.heatmap( corr_matrix, annot=True, fmt='.2f', cmap='coolwarm', center=0, square=True, mask=mask, cbar_kws={'shrink': 0.8}, ax=axes[1, 0] ) axes[1, 0].set_title(f'Correlation of selected features ({len(selected_features)})') # 4. Importance distribution axes[1, 1].hist(importance_series.values, bins=30, edgecolor='black', alpha=0.7) axes[1, 1].set_xlabel('Feature importance') axes[1, 1].set_ylabel('Frequency') axes[1, 1].set_title('Feature importance distribution') axes[1, 1].grid(True, alpha=0.3) plt.suptitle(f'Feature selection results using {method} method', fontsize=14) plt.tight_layout() plt.savefig( f'{self.config.results_dir}/plots/feature_selection_{method}.png', dpi=300, bbox_inches='tight' ) plt.show() def get_report(self) -> Dict: """Get feature selection report""" return { 'selected_features': self.selected_features, 'feature_importances': self.feature_importances, 'selection_methods': self.selection_methods }