TimeFlowPro / feature_selection /feature_selector.py
ArabovMK's picture
Update all files
d8f69a9
# ============================================
# CLASS 11: FEATURE SELECTION
# ============================================
from typing import Dict, List, Optional, Tuple
from venv import logger
from config.config import Config
try:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestRegressor
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
print("✅ All imports working!")
except ImportError as e:
print(f"❌ Import error: {e}")
from sklearn.inspection import permutation_importance, partial_dependence
from sklearn.feature_selection import (
SelectKBest, SelectPercentile, RFE, RFECV, VarianceThreshold,
f_regression, mutual_info_regression
)
class FeatureSelector:
"""Class for selecting the most important features"""
def __init__(self, config: Config):
"""
Initialise feature selector
Parameters:
-----------
config : Config
Experiment configuration
"""
self.config = config
self.selected_features = []
self.feature_importances = {}
self.selection_methods = {}
self.selector_objects = {}
def select(
self,
data: pd.DataFrame,
target_col: Optional[str] = None,
method: str = None,
n_features: int = None,
**kwargs
) -> pd.DataFrame:
"""
Select the most important features
Parameters:
-----------
data : pd.DataFrame
Input data
target_col : str, optional
Target variable. If None, uses configuration value.
method : str, optional
Selection method. If None, uses configuration value.
n_features : int, optional
Number of features to select. If None, uses configuration value.
**kwargs : dict
Additional parameters for method
Returns:
--------
pd.DataFrame
Data with selected features
"""
logger.info("\n" + "="*80)
logger.info("FEATURE SELECTION")
logger.info("="*80)
target_col = target_col or self.config.target_column
method = method or self.config.feature_selection_method
n_features = n_features or self.config.max_features
if target_col not in data.columns:
logger.error(f"Target variable '{target_col}' not found")
return data
# Prepare data
X = data.drop(columns=[target_col]).select_dtypes(include=[np.number])
y = data[target_col]
# Remove missing values
mask = X.notna().all(axis=1) & y.notna()
X_clean = X[mask]
y_clean = y[mask]
if len(X_clean) < 10 or len(X_clean.columns) < 2:
logger.warning("Insufficient data for feature selection")
return data
logger.info(f"Selection method: {method}")
logger.info(f"Target number of features: {n_features}")
logger.info(f"Initial number of features: {len(X.columns)}")
logger.info(f"Data for selection: {len(X_clean)} records")
# Apply selection method
selected_features_list = []
feature_importance_dict = {}
if method == 'correlation':
selected_features_list, feature_importance_dict = self._correlation_selection(
X_clean, y_clean, n_features, **kwargs
)
elif method == 'mutual_info':
selected_features_list, feature_importance_dict = self._mutual_info_selection(
X_clean, y_clean, n_features, **kwargs
)
elif method == 'rf':
selected_features_list, feature_importance_dict = self._random_forest_selection(
X_clean, y_clean, n_features, **kwargs
)
elif method == 'pca':
selected_features_list, feature_importance_dict = self._pca_selection(
X_clean, y_clean, n_features, **kwargs
)
elif method == 'rfe':
selected_features_list, feature_importance_dict = self._rfe_selection(
X_clean, y_clean, n_features, **kwargs
)
elif method == 'lasso':
selected_features_list, feature_importance_dict = self._lasso_selection(
X_clean, y_clean, n_features, **kwargs
)
elif method == 'hybrid':
selected_features_list, feature_importance_dict = self._hybrid_selection(
X_clean, y_clean, n_features, **kwargs
)
else:
logger.warning(f"Method {method} not supported, using correlation")
selected_features_list, feature_importance_dict = self._correlation_selection(
X_clean, y_clean, n_features, **kwargs
)
# Save selected features
self.selected_features = selected_features_list
self.feature_importances = feature_importance_dict
self.selection_methods[method] = {
'selected_features': selected_features_list,
'n_features': len(selected_features_list),
'feature_importances': feature_importance_dict
}
# Form final dataset
features_to_keep = selected_features_list + [target_col]
features_to_keep = [f for f in features_to_keep if f in data.columns]
data_selected = data[features_to_keep].copy()
logger.info(f"✓ Selected {len(selected_features_list)} features")
logger.info(f" Total features kept: {len(data_selected.columns)}")
# Visualisation
if self.config.save_plots and selected_features_list:
self._plot_feature_selection(
X_clean, y_clean, selected_features_list,
feature_importance_dict, method
)
return data_selected
def _correlation_selection(
self,
X: pd.DataFrame,
y: pd.Series,
n_features: int,
**kwargs
) -> Tuple[List[str], Dict]:
"""Feature selection based on correlation"""
# Calculate correlations with target variable
correlations = X.corrwith(y).abs().sort_values(ascending=False)
# Select top-n_features
selected_features = correlations.head(n_features).index.tolist()
feature_importance = correlations.to_dict()
return selected_features, feature_importance
def _mutual_info_selection(
self,
X: pd.DataFrame,
y: pd.Series,
n_features: int,
**kwargs
) -> Tuple[List[str], Dict]:
"""Feature selection based on mutual information"""
try:
mi_scores = mutual_info_regression(X, y, random_state=kwargs.get('random_state', 42))
mi_series = pd.Series(mi_scores, index=X.columns)
mi_series = mi_series.sort_values(ascending=False)
selected_features = mi_series.head(n_features).index.tolist()
feature_importance = mi_series.to_dict()
return selected_features, feature_importance
except Exception as e:
logger.warning(f"Mutual information selection failed: {e}, using correlation")
return self._correlation_selection(X, y, n_features, **kwargs)
def _random_forest_selection(
self,
X: pd.DataFrame,
y: pd.Series,
n_features: int,
**kwargs
) -> Tuple[List[str], Dict]:
"""Feature selection based on Random Forest"""
try:
rf = RandomForestRegressor(
n_estimators=kwargs.get('n_estimators', 100),
max_depth=kwargs.get('max_depth', None),
random_state=kwargs.get('random_state', 42),
n_jobs=self.config.n_jobs if self.config.use_multiprocessing else None
)
rf.fit(X, y)
importances = pd.Series(rf.feature_importances_, index=X.columns)
importances = importances.sort_values(ascending=False)
selected_features = importances.head(n_features).index.tolist()
feature_importance = importances.to_dict()
self.selector_objects['random_forest'] = rf
return selected_features, feature_importance
except Exception as e:
logger.warning(f"Random Forest selection failed: {e}, using correlation")
return self._correlation_selection(X, y, n_features, **kwargs)
def _pca_selection(
self,
X: pd.DataFrame,
y: pd.Series,
n_features: int,
**kwargs
) -> Tuple[List[str], Dict]:
"""Feature selection based on PCA"""
try:
# First standardise data
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Apply PCA
pca = PCA(n_components=min(n_features, len(X.columns)))
X_pca = pca.fit_transform(X_scaled)
# Get feature importance via absolute component values
importance = np.abs(pca.components_).sum(axis=0)
importance_series = pd.Series(importance, index=X.columns)
importance_series = importance_series.sort_values(ascending=False)
selected_features = importance_series.head(n_features).index.tolist()
feature_importance = importance_series.to_dict()
self.selector_objects['pca'] = pca
self.selector_objects['scaler'] = scaler
return selected_features, feature_importance
except Exception as e:
logger.warning(f"PCA selection failed: {e}, using correlation")
return self._correlation_selection(X, y, n_features, **kwargs)
def _rfe_selection(
self,
X: pd.DataFrame,
y: pd.Series,
n_features: int,
**kwargs
) -> Tuple[List[str], Dict]:
"""Recursive Feature Elimination"""
try:
from sklearn.feature_selection import RFE
from sklearn.linear_model import LinearRegression
estimator = LinearRegression()
rfe = RFE(
estimator=estimator,
n_features_to_select=n_features,
step=kwargs.get('step', 1)
)
rfe.fit(X, y)
selected_mask = rfe.support_
selected_features = X.columns[selected_mask].tolist()
# Feature importance via ranking
ranking = pd.Series(rfe.ranking_, index=X.columns)
feature_importance = (1 / ranking).to_dict() # Convert ranking to importance
self.selector_objects['rfe'] = rfe
return selected_features, feature_importance
except Exception as e:
logger.warning(f"RFE selection failed: {e}, using correlation")
return self._correlation_selection(X, y, n_features, **kwargs)
def _lasso_selection(
self,
X: pd.DataFrame,
y: pd.Series,
n_features: int,
**kwargs
) -> Tuple[List[str], Dict]:
"""Feature selection using Lasso"""
try:
from sklearn.linear_model import LassoCV
lasso = LassoCV(
cv=kwargs.get('cv', 5),
random_state=kwargs.get('random_state', 42),
max_iter=kwargs.get('max_iter', 1000)
)
lasso.fit(X, y)
# Features with non-zero coefficients
coefficients = pd.Series(lasso.coef_, index=X.columns)
non_zero_features = coefficients[coefficients != 0].abs().sort_values(ascending=False)
# Select top-n_features
selected_features = non_zero_features.head(n_features).index.tolist()
feature_importance = non_zero_features.to_dict()
self.selector_objects['lasso'] = lasso
return selected_features, feature_importance
except Exception as e:
logger.warning(f"Lasso selection failed: {e}, using correlation")
return self._correlation_selection(X, y, n_features, **kwargs)
def _hybrid_selection(
self,
X: pd.DataFrame,
y: pd.Series,
n_features: int,
**kwargs
) -> Tuple[List[str], Dict]:
"""Hybrid feature selection method"""
# Combine multiple methods
methods = kwargs.get('methods', ['correlation', 'mutual_info', 'rf'])
weights = kwargs.get('weights', [0.3, 0.3, 0.4])
all_importances = {}
for method, weight in zip(methods, weights):
try:
if method == 'correlation':
_, importance = self._correlation_selection(X, y, n_features, **kwargs)
elif method == 'mutual_info':
_, importance = self._mutual_info_selection(X, y, n_features, **kwargs)
elif method == 'rf':
_, importance = self._random_forest_selection(X, y, n_features, **kwargs)
else:
continue
# Normalise importances and weight them
importance_series = pd.Series(importance)
if importance_series.max() > importance_series.min():
importance_normalized = (importance_series - importance_series.min()) / \
(importance_series.max() - importance_series.min())
else:
importance_normalized = pd.Series(1, index=importance_series.index)
# Add weighted importances
for feature in importance_normalized.index:
if feature not in all_importances:
all_importances[feature] = 0
all_importances[feature] += importance_normalized[feature] * weight
except Exception as e:
logger.debug(f"Method {method} failed in hybrid selection: {e}")
# Sort by total importance
combined_importance = pd.Series(all_importances).sort_values(ascending=False)
selected_features = combined_importance.head(n_features).index.tolist()
return selected_features, combined_importance.to_dict()
def _plot_feature_selection(
self,
X: pd.DataFrame,
y: pd.Series,
selected_features: List[str],
feature_importance: Dict,
method: str
) -> None:
"""Visualise feature selection results"""
# Prepare data for visualisation
importance_series = pd.Series(feature_importance).sort_values(ascending=False)
# Limit number of features for display
display_features = importance_series.head(20)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. Feature importance
y_pos = np.arange(len(display_features))
axes[0, 0].barh(y_pos, display_features.values)
axes[0, 0].set_yticks(y_pos)
axes[0, 0].set_yticklabels(display_features.index, fontsize=9)
axes[0, 0].invert_yaxis()
axes[0, 0].set_xlabel('Importance')
axes[0, 0].set_title(f'Top-{len(display_features)} features by importance ({method})')
axes[0, 0].grid(True, alpha=0.3, axis='x')
# 2. Cumulative importance
cumulative_importance = importance_series.cumsum() / importance_series.sum()
axes[0, 1].plot(range(1, len(cumulative_importance) + 1), cumulative_importance.values)
axes[0, 1].axhline(y=0.8, color='r', linestyle='--', alpha=0.7, label='80% importance')
axes[0, 1].axhline(y=0.9, color='orange', linestyle='--', alpha=0.7, label='90% importance')
axes[0, 1].set_xlabel('Number of features')
axes[0, 1].set_ylabel('Cumulative importance')
axes[0, 1].set_title('Cumulative feature importance')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 3. Correlation matrix of selected features
if len(selected_features) > 1:
selected_X = X[selected_features]
corr_matrix = selected_X.corr()
mask = np.triu(np.ones_like(corr_matrix, dtype=bool))
sns.heatmap(
corr_matrix,
annot=True,
fmt='.2f',
cmap='coolwarm',
center=0,
square=True,
mask=mask,
cbar_kws={'shrink': 0.8},
ax=axes[1, 0]
)
axes[1, 0].set_title(f'Correlation of selected features ({len(selected_features)})')
# 4. Importance distribution
axes[1, 1].hist(importance_series.values, bins=30, edgecolor='black', alpha=0.7)
axes[1, 1].set_xlabel('Feature importance')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].set_title('Feature importance distribution')
axes[1, 1].grid(True, alpha=0.3)
plt.suptitle(f'Feature selection results using {method} method', fontsize=14)
plt.tight_layout()
plt.savefig(
f'{self.config.results_dir}/plots/feature_selection_{method}.png',
dpi=300,
bbox_inches='tight'
)
plt.show()
def get_report(self) -> Dict:
"""Get feature selection report"""
return {
'selected_features': self.selected_features,
'feature_importances': self.feature_importances,
'selection_methods': self.selection_methods
}