Spaces:
Running
Running
| # ============================================ | |
| # CLASS 11: FEATURE SELECTION | |
| # ============================================ | |
| from typing import Dict, List, Optional, Tuple | |
| from venv import logger | |
| from config.config import Config | |
| try: | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from sklearn.ensemble import RandomForestRegressor | |
| from sklearn.decomposition import PCA | |
| from sklearn.preprocessing import StandardScaler | |
| print("✅ All imports working!") | |
| except ImportError as e: | |
| print(f"❌ Import error: {e}") | |
| from sklearn.inspection import permutation_importance, partial_dependence | |
| from sklearn.feature_selection import ( | |
| SelectKBest, SelectPercentile, RFE, RFECV, VarianceThreshold, | |
| f_regression, mutual_info_regression | |
| ) | |
| class FeatureSelector: | |
| """Class for selecting the most important features""" | |
| def __init__(self, config: Config): | |
| """ | |
| Initialise feature selector | |
| Parameters: | |
| ----------- | |
| config : Config | |
| Experiment configuration | |
| """ | |
| self.config = config | |
| self.selected_features = [] | |
| self.feature_importances = {} | |
| self.selection_methods = {} | |
| self.selector_objects = {} | |
| def select( | |
| self, | |
| data: pd.DataFrame, | |
| target_col: Optional[str] = None, | |
| method: str = None, | |
| n_features: int = None, | |
| **kwargs | |
| ) -> pd.DataFrame: | |
| """ | |
| Select the most important features | |
| Parameters: | |
| ----------- | |
| data : pd.DataFrame | |
| Input data | |
| target_col : str, optional | |
| Target variable. If None, uses configuration value. | |
| method : str, optional | |
| Selection method. If None, uses configuration value. | |
| n_features : int, optional | |
| Number of features to select. If None, uses configuration value. | |
| **kwargs : dict | |
| Additional parameters for method | |
| Returns: | |
| -------- | |
| pd.DataFrame | |
| Data with selected features | |
| """ | |
| logger.info("\n" + "="*80) | |
| logger.info("FEATURE SELECTION") | |
| logger.info("="*80) | |
| target_col = target_col or self.config.target_column | |
| method = method or self.config.feature_selection_method | |
| n_features = n_features or self.config.max_features | |
| if target_col not in data.columns: | |
| logger.error(f"Target variable '{target_col}' not found") | |
| return data | |
| # Prepare data | |
| X = data.drop(columns=[target_col]).select_dtypes(include=[np.number]) | |
| y = data[target_col] | |
| # Remove missing values | |
| mask = X.notna().all(axis=1) & y.notna() | |
| X_clean = X[mask] | |
| y_clean = y[mask] | |
| if len(X_clean) < 10 or len(X_clean.columns) < 2: | |
| logger.warning("Insufficient data for feature selection") | |
| return data | |
| logger.info(f"Selection method: {method}") | |
| logger.info(f"Target number of features: {n_features}") | |
| logger.info(f"Initial number of features: {len(X.columns)}") | |
| logger.info(f"Data for selection: {len(X_clean)} records") | |
| # Apply selection method | |
| selected_features_list = [] | |
| feature_importance_dict = {} | |
| if method == 'correlation': | |
| selected_features_list, feature_importance_dict = self._correlation_selection( | |
| X_clean, y_clean, n_features, **kwargs | |
| ) | |
| elif method == 'mutual_info': | |
| selected_features_list, feature_importance_dict = self._mutual_info_selection( | |
| X_clean, y_clean, n_features, **kwargs | |
| ) | |
| elif method == 'rf': | |
| selected_features_list, feature_importance_dict = self._random_forest_selection( | |
| X_clean, y_clean, n_features, **kwargs | |
| ) | |
| elif method == 'pca': | |
| selected_features_list, feature_importance_dict = self._pca_selection( | |
| X_clean, y_clean, n_features, **kwargs | |
| ) | |
| elif method == 'rfe': | |
| selected_features_list, feature_importance_dict = self._rfe_selection( | |
| X_clean, y_clean, n_features, **kwargs | |
| ) | |
| elif method == 'lasso': | |
| selected_features_list, feature_importance_dict = self._lasso_selection( | |
| X_clean, y_clean, n_features, **kwargs | |
| ) | |
| elif method == 'hybrid': | |
| selected_features_list, feature_importance_dict = self._hybrid_selection( | |
| X_clean, y_clean, n_features, **kwargs | |
| ) | |
| else: | |
| logger.warning(f"Method {method} not supported, using correlation") | |
| selected_features_list, feature_importance_dict = self._correlation_selection( | |
| X_clean, y_clean, n_features, **kwargs | |
| ) | |
| # Save selected features | |
| self.selected_features = selected_features_list | |
| self.feature_importances = feature_importance_dict | |
| self.selection_methods[method] = { | |
| 'selected_features': selected_features_list, | |
| 'n_features': len(selected_features_list), | |
| 'feature_importances': feature_importance_dict | |
| } | |
| # Form final dataset | |
| features_to_keep = selected_features_list + [target_col] | |
| features_to_keep = [f for f in features_to_keep if f in data.columns] | |
| data_selected = data[features_to_keep].copy() | |
| logger.info(f"✓ Selected {len(selected_features_list)} features") | |
| logger.info(f" Total features kept: {len(data_selected.columns)}") | |
| # Visualisation | |
| if self.config.save_plots and selected_features_list: | |
| self._plot_feature_selection( | |
| X_clean, y_clean, selected_features_list, | |
| feature_importance_dict, method | |
| ) | |
| return data_selected | |
| def _correlation_selection( | |
| self, | |
| X: pd.DataFrame, | |
| y: pd.Series, | |
| n_features: int, | |
| **kwargs | |
| ) -> Tuple[List[str], Dict]: | |
| """Feature selection based on correlation""" | |
| # Calculate correlations with target variable | |
| correlations = X.corrwith(y).abs().sort_values(ascending=False) | |
| # Select top-n_features | |
| selected_features = correlations.head(n_features).index.tolist() | |
| feature_importance = correlations.to_dict() | |
| return selected_features, feature_importance | |
| def _mutual_info_selection( | |
| self, | |
| X: pd.DataFrame, | |
| y: pd.Series, | |
| n_features: int, | |
| **kwargs | |
| ) -> Tuple[List[str], Dict]: | |
| """Feature selection based on mutual information""" | |
| try: | |
| mi_scores = mutual_info_regression(X, y, random_state=kwargs.get('random_state', 42)) | |
| mi_series = pd.Series(mi_scores, index=X.columns) | |
| mi_series = mi_series.sort_values(ascending=False) | |
| selected_features = mi_series.head(n_features).index.tolist() | |
| feature_importance = mi_series.to_dict() | |
| return selected_features, feature_importance | |
| except Exception as e: | |
| logger.warning(f"Mutual information selection failed: {e}, using correlation") | |
| return self._correlation_selection(X, y, n_features, **kwargs) | |
| def _random_forest_selection( | |
| self, | |
| X: pd.DataFrame, | |
| y: pd.Series, | |
| n_features: int, | |
| **kwargs | |
| ) -> Tuple[List[str], Dict]: | |
| """Feature selection based on Random Forest""" | |
| try: | |
| rf = RandomForestRegressor( | |
| n_estimators=kwargs.get('n_estimators', 100), | |
| max_depth=kwargs.get('max_depth', None), | |
| random_state=kwargs.get('random_state', 42), | |
| n_jobs=self.config.n_jobs if self.config.use_multiprocessing else None | |
| ) | |
| rf.fit(X, y) | |
| importances = pd.Series(rf.feature_importances_, index=X.columns) | |
| importances = importances.sort_values(ascending=False) | |
| selected_features = importances.head(n_features).index.tolist() | |
| feature_importance = importances.to_dict() | |
| self.selector_objects['random_forest'] = rf | |
| return selected_features, feature_importance | |
| except Exception as e: | |
| logger.warning(f"Random Forest selection failed: {e}, using correlation") | |
| return self._correlation_selection(X, y, n_features, **kwargs) | |
| def _pca_selection( | |
| self, | |
| X: pd.DataFrame, | |
| y: pd.Series, | |
| n_features: int, | |
| **kwargs | |
| ) -> Tuple[List[str], Dict]: | |
| """Feature selection based on PCA""" | |
| try: | |
| # First standardise data | |
| from sklearn.preprocessing import StandardScaler | |
| scaler = StandardScaler() | |
| X_scaled = scaler.fit_transform(X) | |
| # Apply PCA | |
| pca = PCA(n_components=min(n_features, len(X.columns))) | |
| X_pca = pca.fit_transform(X_scaled) | |
| # Get feature importance via absolute component values | |
| importance = np.abs(pca.components_).sum(axis=0) | |
| importance_series = pd.Series(importance, index=X.columns) | |
| importance_series = importance_series.sort_values(ascending=False) | |
| selected_features = importance_series.head(n_features).index.tolist() | |
| feature_importance = importance_series.to_dict() | |
| self.selector_objects['pca'] = pca | |
| self.selector_objects['scaler'] = scaler | |
| return selected_features, feature_importance | |
| except Exception as e: | |
| logger.warning(f"PCA selection failed: {e}, using correlation") | |
| return self._correlation_selection(X, y, n_features, **kwargs) | |
| def _rfe_selection( | |
| self, | |
| X: pd.DataFrame, | |
| y: pd.Series, | |
| n_features: int, | |
| **kwargs | |
| ) -> Tuple[List[str], Dict]: | |
| """Recursive Feature Elimination""" | |
| try: | |
| from sklearn.feature_selection import RFE | |
| from sklearn.linear_model import LinearRegression | |
| estimator = LinearRegression() | |
| rfe = RFE( | |
| estimator=estimator, | |
| n_features_to_select=n_features, | |
| step=kwargs.get('step', 1) | |
| ) | |
| rfe.fit(X, y) | |
| selected_mask = rfe.support_ | |
| selected_features = X.columns[selected_mask].tolist() | |
| # Feature importance via ranking | |
| ranking = pd.Series(rfe.ranking_, index=X.columns) | |
| feature_importance = (1 / ranking).to_dict() # Convert ranking to importance | |
| self.selector_objects['rfe'] = rfe | |
| return selected_features, feature_importance | |
| except Exception as e: | |
| logger.warning(f"RFE selection failed: {e}, using correlation") | |
| return self._correlation_selection(X, y, n_features, **kwargs) | |
| def _lasso_selection( | |
| self, | |
| X: pd.DataFrame, | |
| y: pd.Series, | |
| n_features: int, | |
| **kwargs | |
| ) -> Tuple[List[str], Dict]: | |
| """Feature selection using Lasso""" | |
| try: | |
| from sklearn.linear_model import LassoCV | |
| lasso = LassoCV( | |
| cv=kwargs.get('cv', 5), | |
| random_state=kwargs.get('random_state', 42), | |
| max_iter=kwargs.get('max_iter', 1000) | |
| ) | |
| lasso.fit(X, y) | |
| # Features with non-zero coefficients | |
| coefficients = pd.Series(lasso.coef_, index=X.columns) | |
| non_zero_features = coefficients[coefficients != 0].abs().sort_values(ascending=False) | |
| # Select top-n_features | |
| selected_features = non_zero_features.head(n_features).index.tolist() | |
| feature_importance = non_zero_features.to_dict() | |
| self.selector_objects['lasso'] = lasso | |
| return selected_features, feature_importance | |
| except Exception as e: | |
| logger.warning(f"Lasso selection failed: {e}, using correlation") | |
| return self._correlation_selection(X, y, n_features, **kwargs) | |
| def _hybrid_selection( | |
| self, | |
| X: pd.DataFrame, | |
| y: pd.Series, | |
| n_features: int, | |
| **kwargs | |
| ) -> Tuple[List[str], Dict]: | |
| """Hybrid feature selection method""" | |
| # Combine multiple methods | |
| methods = kwargs.get('methods', ['correlation', 'mutual_info', 'rf']) | |
| weights = kwargs.get('weights', [0.3, 0.3, 0.4]) | |
| all_importances = {} | |
| for method, weight in zip(methods, weights): | |
| try: | |
| if method == 'correlation': | |
| _, importance = self._correlation_selection(X, y, n_features, **kwargs) | |
| elif method == 'mutual_info': | |
| _, importance = self._mutual_info_selection(X, y, n_features, **kwargs) | |
| elif method == 'rf': | |
| _, importance = self._random_forest_selection(X, y, n_features, **kwargs) | |
| else: | |
| continue | |
| # Normalise importances and weight them | |
| importance_series = pd.Series(importance) | |
| if importance_series.max() > importance_series.min(): | |
| importance_normalized = (importance_series - importance_series.min()) / \ | |
| (importance_series.max() - importance_series.min()) | |
| else: | |
| importance_normalized = pd.Series(1, index=importance_series.index) | |
| # Add weighted importances | |
| for feature in importance_normalized.index: | |
| if feature not in all_importances: | |
| all_importances[feature] = 0 | |
| all_importances[feature] += importance_normalized[feature] * weight | |
| except Exception as e: | |
| logger.debug(f"Method {method} failed in hybrid selection: {e}") | |
| # Sort by total importance | |
| combined_importance = pd.Series(all_importances).sort_values(ascending=False) | |
| selected_features = combined_importance.head(n_features).index.tolist() | |
| return selected_features, combined_importance.to_dict() | |
| def _plot_feature_selection( | |
| self, | |
| X: pd.DataFrame, | |
| y: pd.Series, | |
| selected_features: List[str], | |
| feature_importance: Dict, | |
| method: str | |
| ) -> None: | |
| """Visualise feature selection results""" | |
| # Prepare data for visualisation | |
| importance_series = pd.Series(feature_importance).sort_values(ascending=False) | |
| # Limit number of features for display | |
| display_features = importance_series.head(20) | |
| fig, axes = plt.subplots(2, 2, figsize=(14, 10)) | |
| # 1. Feature importance | |
| y_pos = np.arange(len(display_features)) | |
| axes[0, 0].barh(y_pos, display_features.values) | |
| axes[0, 0].set_yticks(y_pos) | |
| axes[0, 0].set_yticklabels(display_features.index, fontsize=9) | |
| axes[0, 0].invert_yaxis() | |
| axes[0, 0].set_xlabel('Importance') | |
| axes[0, 0].set_title(f'Top-{len(display_features)} features by importance ({method})') | |
| axes[0, 0].grid(True, alpha=0.3, axis='x') | |
| # 2. Cumulative importance | |
| cumulative_importance = importance_series.cumsum() / importance_series.sum() | |
| axes[0, 1].plot(range(1, len(cumulative_importance) + 1), cumulative_importance.values) | |
| axes[0, 1].axhline(y=0.8, color='r', linestyle='--', alpha=0.7, label='80% importance') | |
| axes[0, 1].axhline(y=0.9, color='orange', linestyle='--', alpha=0.7, label='90% importance') | |
| axes[0, 1].set_xlabel('Number of features') | |
| axes[0, 1].set_ylabel('Cumulative importance') | |
| axes[0, 1].set_title('Cumulative feature importance') | |
| axes[0, 1].legend() | |
| axes[0, 1].grid(True, alpha=0.3) | |
| # 3. Correlation matrix of selected features | |
| if len(selected_features) > 1: | |
| selected_X = X[selected_features] | |
| corr_matrix = selected_X.corr() | |
| mask = np.triu(np.ones_like(corr_matrix, dtype=bool)) | |
| sns.heatmap( | |
| corr_matrix, | |
| annot=True, | |
| fmt='.2f', | |
| cmap='coolwarm', | |
| center=0, | |
| square=True, | |
| mask=mask, | |
| cbar_kws={'shrink': 0.8}, | |
| ax=axes[1, 0] | |
| ) | |
| axes[1, 0].set_title(f'Correlation of selected features ({len(selected_features)})') | |
| # 4. Importance distribution | |
| axes[1, 1].hist(importance_series.values, bins=30, edgecolor='black', alpha=0.7) | |
| axes[1, 1].set_xlabel('Feature importance') | |
| axes[1, 1].set_ylabel('Frequency') | |
| axes[1, 1].set_title('Feature importance distribution') | |
| axes[1, 1].grid(True, alpha=0.3) | |
| plt.suptitle(f'Feature selection results using {method} method', fontsize=14) | |
| plt.tight_layout() | |
| plt.savefig( | |
| f'{self.config.results_dir}/plots/feature_selection_{method}.png', | |
| dpi=300, | |
| bbox_inches='tight' | |
| ) | |
| plt.show() | |
| def get_report(self) -> Dict: | |
| """Get feature selection report""" | |
| return { | |
| 'selected_features': self.selected_features, | |
| 'feature_importances': self.feature_importances, | |
| 'selection_methods': self.selection_methods | |
| } |