Spaces:
Sleeping
Sleeping
| # ============================================ | |
| # CLASS 5: FEATURE ENGINEER | |
| # ============================================ | |
| from typing import Dict, List, Optional | |
| from venv import logger | |
| from config.config import Config | |
| import pandas as pd | |
| import numpy as np | |
| class FeatureEngineer: | |
| """Class for creating new features for time series""" | |
| def __init__(self, config: Config): | |
| """ | |
| Initialise feature engineer | |
| Parameters: | |
| ----------- | |
| config : Config | |
| Experiment configuration | |
| """ | |
| self.config = config | |
| self.created_features = [] | |
| self.feature_info = {} | |
| self.feature_importances = {} | |
| self.transforms_applied = {} | |
| def create_all_features( | |
| self, | |
| data: pd.DataFrame, | |
| target_col: Optional[str] = None | |
| ) -> pd.DataFrame: | |
| """ | |
| Create all types of features | |
| Parameters: | |
| ----------- | |
| data : pd.DataFrame | |
| Input data | |
| target_col : str, optional | |
| Target variable. If None, uses configuration value. | |
| Returns: | |
| -------- | |
| pd.DataFrame | |
| Data with all features | |
| """ | |
| logger.info("\n" + "="*80) | |
| logger.info("CREATING FEATURES FOR TIME SERIES") | |
| logger.info("="*80) | |
| target_col = target_col or self.config.target_column | |
| initial_features = len(data.columns) | |
| initial_rows = len(data) | |
| # Check and save index | |
| original_index = data.index | |
| index_is_datetime = isinstance(original_index, pd.DatetimeIndex) | |
| logger.info(f"Initial number of features: {initial_features}") | |
| logger.info(f"Initial number of rows: {initial_rows}") | |
| logger.info(f"Index is DatetimeIndex: {index_is_datetime}") | |
| # If index not DatetimeIndex but 'date' column exists | |
| if not index_is_datetime and 'date' in data.columns: | |
| logger.info("Attempting to set DatetimeIndex from 'date' column") | |
| try: | |
| data = data.set_index('date') | |
| if isinstance(data.index, pd.DatetimeIndex): | |
| index_is_datetime = True | |
| original_index = data.index | |
| logger.info("✓ DatetimeIndex set from 'date' column") | |
| else: | |
| logger.warning("Failed to set DatetimeIndex") | |
| except Exception as e: | |
| logger.warning(f"Error setting DatetimeIndex: {e}") | |
| # Save data copy for index restoration later | |
| data_processed = data.copy() | |
| # 1. Create basic temporal features (if date exists) | |
| if index_is_datetime: | |
| logger.info("\n1. BASIC TEMPORAL FEATURES") | |
| data_processed = self.create_temporal_features(data_processed) | |
| else: | |
| logger.info("\n1. BASIC TEMPORAL FEATURES: skipped (no DatetimeIndex)") | |
| # 2. Create statistical features | |
| logger.info("\n2. STATISTICAL FEATURES") | |
| data_processed = self.create_statistical_features(data_processed, target_col) | |
| # 3. Create rolling features | |
| logger.info("\n3. ROLLING FEATURES") | |
| data_processed = self.create_rolling_features(data_processed, target_col) | |
| # 4. Create lag features (limited quantity) | |
| logger.info("\n4. LAG FEATURES") | |
| data_processed = self.create_lag_features(data_processed, target_col) | |
| # 5. Create interaction features | |
| logger.info("\n5. INTERACTION FEATURES") | |
| data_processed = self.create_interaction_features(data_processed, target_col) | |
| # 6. Create spectral features (only if sufficient data) | |
| logger.info("\n6. SPECTRAL FEATURES") | |
| if len(data_processed) > 100: | |
| data_processed = self.create_spectral_features(data_processed, target_col) | |
| else: | |
| logger.info(" Skipped: insufficient data") | |
| # 7. Create decomposition features (only if sufficient data and date exists) | |
| logger.info("\n7. DECOMPOSITION FEATURES") | |
| if len(data_processed) > 365 and index_is_datetime: | |
| data_processed = self.create_decomposition_features(data_processed, target_col) | |
| else: | |
| logger.info(" Skipped: insufficient data or no DatetimeIndex") | |
| # Remove rows with NaN that appeared due to lags and differences | |
| rows_before_nan = len(data_processed) | |
| data_processed = data_processed.dropna() | |
| rows_after_nan = len(data_processed) | |
| removed_rows = rows_before_nan - rows_after_nan | |
| # Remove constant features | |
| constant_features = [] | |
| for col in data_processed.columns: | |
| if data_processed[col].nunique() <= 1: | |
| constant_features.append(col) | |
| if constant_features: | |
| logger.info(f"\nRemoving constant features: {len(constant_features)} found") | |
| for feat in constant_features[:10]: | |
| logger.info(f" - {feat}") | |
| if len(constant_features) > 10: | |
| logger.info(f" ... and {len(constant_features) - 10} more features") | |
| data_processed = data_processed.drop(columns=constant_features) | |
| # Update created features list | |
| self.created_features = [f for f in self.created_features if f not in constant_features] | |
| # Save information | |
| self.feature_info = { | |
| 'initial_features': initial_features, | |
| 'final_features': len(data_processed.columns), | |
| 'features_created': len(self.created_features), | |
| 'initial_rows': initial_rows, | |
| 'final_rows': len(data_processed), | |
| 'removed_rows': removed_rows, | |
| 'constant_features_removed': len(constant_features), | |
| 'created_features_list': self.created_features, | |
| 'feature_categories': self.get_feature_categories() | |
| } | |
| logger.info(f"\nFeature creation summary:") | |
| logger.info(f" Initial number of features: {initial_features}") | |
| logger.info(f" Final number of features: {len(data_processed.columns)}") | |
| logger.info(f" New features created: {len(self.created_features)}") | |
| logger.info(f" Initial number of rows: {initial_rows}") | |
| logger.info(f" Final number of rows: {len(data_processed)}") | |
| logger.info(f" Rows removed due to NaN: {removed_rows}") | |
| logger.info(f" Constant features removed: {len(constant_features)}") | |
| return data_processed | |
| def create_temporal_features(self, data: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Create temporal features | |
| Parameters: | |
| ----------- | |
| data : pd.DataFrame | |
| Input data | |
| Returns: | |
| -------- | |
| pd.DataFrame | |
| Data with temporal features | |
| """ | |
| data_processed = data.copy() | |
| if not isinstance(data_processed.index, pd.DatetimeIndex): | |
| logger.warning("Temporal features not created: index not DatetimeIndex") | |
| return data_processed | |
| try: | |
| # Basic temporal features | |
| data_processed['year'] = data_processed.index.year | |
| data_processed['month'] = data_processed.index.month | |
| data_processed['day'] = data_processed.index.day | |
| data_processed['dayofyear'] = data_processed.index.dayofyear | |
| data_processed['dayofweek'] = data_processed.index.dayofweek | |
| data_processed['weekofyear'] = data_processed.index.isocalendar().week.astype(int) | |
| data_processed['quarter'] = data_processed.index.quarter | |
| data_processed['is_weekend'] = data_processed['dayofweek'].isin([5, 6]).astype(int) | |
| # Cyclic features for seasonality | |
| data_processed['month_sin'] = np.sin(2 * np.pi * data_processed['month'] / 12) | |
| data_processed['month_cos'] = np.cos(2 * np.pi * data_processed['month'] / 12) | |
| data_processed['dayofyear_sin'] = np.sin(2 * np.pi * data_processed['dayofyear'] / 365.25) | |
| data_processed['dayofyear_cos'] = np.cos(2 * np.pi * data_processed['dayofyear'] / 365.25) | |
| data_processed['dayofweek_sin'] = np.sin(2 * np.pi * data_processed['dayofweek'] / 7) | |
| data_processed['dayofweek_cos'] = np.cos(2 * np.pi * data_processed['dayofweek'] / 7) | |
| # Time in days from start (relative features) | |
| min_date = data_processed.index.min() | |
| data_processed['days_from_start'] = (data_processed.index - min_date).days | |
| # Register created features | |
| temporal_features = ['year', 'month', 'day', 'dayofyear', 'dayofweek', | |
| 'weekofyear', 'quarter', 'is_weekend', 'month_sin', | |
| 'month_cos', 'dayofyear_sin', 'dayofyear_cos', | |
| 'dayofweek_sin', 'dayofweek_cos', 'days_from_start'] | |
| self.created_features.extend([f for f in temporal_features if f not in self.created_features]) | |
| logger.info(f"✓ Created {len(temporal_features)} temporal features") | |
| except Exception as e: | |
| logger.warning(f"Error creating temporal features: {e}") | |
| return data_processed | |
| def create_statistical_features( | |
| self, | |
| data: pd.DataFrame, | |
| target_col: str | |
| ) -> pd.DataFrame: | |
| """ | |
| Create statistical features | |
| Parameters: | |
| ----------- | |
| data : pd.DataFrame | |
| Input data | |
| target_col : str | |
| Target variable | |
| Returns: | |
| -------- | |
| pd.DataFrame | |
| Data with statistical features | |
| """ | |
| data_processed = data.copy() | |
| if target_col not in data_processed.columns: | |
| logger.warning(f"Target variable '{target_col}' not found") | |
| return data_processed | |
| # Only if we have year data | |
| if 'year' in data_processed.columns: | |
| # Yearly statistics | |
| try: | |
| yearly_stats = data_processed.groupby('year')[target_col].agg([ | |
| 'mean', 'std', 'min', 'max', 'median' | |
| ]) | |
| yearly_stats.columns = [f'{target_col}_yearly_{col}' for col in yearly_stats.columns] | |
| data_processed = data_processed.merge(yearly_stats, on='year', how='left') | |
| # Add created features to list | |
| for col in yearly_stats.columns: | |
| self.created_features.append(col) | |
| except Exception as e: | |
| logger.debug(f"Yearly statistics not created: {e}") | |
| # Normalised features (only if there is variation) | |
| std_val = data_processed[target_col].std() | |
| if std_val > 0: | |
| data_processed[f'{target_col}_zscore'] = (data_processed[target_col] - data_processed[target_col].mean()) / std_val | |
| self.created_features.append(f'{target_col}_zscore') | |
| # Features based on percentiles (binary features) | |
| try: | |
| for p in [0.25, 0.5, 0.75]: | |
| quantile_val = data_processed[target_col].quantile(p) | |
| data_processed[f'{target_col}_above_p{int(p*100)}'] = (data_processed[target_col] > quantile_val).astype(int) | |
| self.created_features.append(f'{target_col}_above_p{int(p*100)}') | |
| except Exception as e: | |
| logger.debug(f"Quantile features not created: {e}") | |
| logger.info(f"✓ Statistical features created: {len([c for c in data_processed.columns if c not in data.columns])}") | |
| return data_processed | |
| def create_rolling_features( | |
| self, | |
| data: pd.DataFrame, | |
| target_col: str | |
| ) -> pd.DataFrame: | |
| """ | |
| Create rolling statistics | |
| Parameters: | |
| ----------- | |
| data : pd.DataFrame | |
| Input data | |
| target_col : str | |
| Target variable | |
| Returns: | |
| -------- | |
| pd.DataFrame | |
| Data with rolling features | |
| """ | |
| data_processed = data.copy() | |
| if target_col not in data_processed.columns: | |
| logger.warning(f"Target variable '{target_col}' not found") | |
| return data_processed | |
| # Use only main windows from configuration | |
| windows = [w for w in self.config.rolling_windows if w < len(data_processed) // 2] | |
| for window in windows: | |
| try: | |
| # Basic statistics | |
| data_processed[f'{target_col}_rolling_mean_{window}'] = data_processed[target_col].rolling( | |
| window=window, min_periods=max(1, window//4), center=True | |
| ).mean() | |
| data_processed[f'{target_col}_rolling_std_{window}'] = data_processed[target_col].rolling( | |
| window=window, min_periods=max(1, window//4), center=True | |
| ).std() | |
| data_processed[f'{target_col}_rolling_min_{window}'] = data_processed[target_col].rolling( | |
| window=window, min_periods=max(1, window//4), center=True | |
| ).min() | |
| data_processed[f'{target_col}_rolling_max_{window}'] = data_processed[target_col].rolling( | |
| window=window, min_periods=max(1, window//4), center=True | |
| ).max() | |
| self.created_features.extend([ | |
| f'{target_col}_rolling_mean_{window}', | |
| f'{target_col}_rolling_std_{window}', | |
| f'{target_col}_rolling_min_{window}', | |
| f'{target_col}_rolling_max_{window}' | |
| ]) | |
| except Exception as e: | |
| logger.debug(f"Rolling features for window {window} not created: {e}") | |
| continue | |
| logger.info(f"✓ Rolling features created: {len([c for c in data_processed.columns if 'rolling' in c and c not in data.columns])}") | |
| return data_processed | |
| def create_lag_features( | |
| self, | |
| data: pd.DataFrame, | |
| target_col: str | |
| ) -> pd.DataFrame: | |
| """ | |
| Create lag features | |
| Parameters: | |
| ----------- | |
| data : pd.DataFrame | |
| Input data | |
| target_col : str | |
| Target variable | |
| Returns: | |
| -------- | |
| pd.DataFrame | |
| Data with lag features | |
| """ | |
| data_processed = data.copy() | |
| if target_col not in data_processed.columns: | |
| logger.warning(f"Target variable '{target_col}' not found") | |
| return data_processed | |
| # Limited number of lags | |
| max_lags = min(self.config.max_lags, 7) # Maximum 7 lags | |
| for lag in [1, 2, 3, 7, 14, 30]: | |
| if lag <= max_lags: | |
| data_processed[f'{target_col}_lag_{lag}'] = data_processed[target_col].shift(lag) | |
| self.created_features.append(f'{target_col}_lag_{lag}') | |
| # Seasonal lags (only if sufficient data) | |
| if len(data_processed) > 365: | |
| try: | |
| data_processed[f'{target_col}_seasonal_lag_365'] = data_processed[target_col].shift(365) | |
| self.created_features.append(f'{target_col}_seasonal_lag_365') | |
| except Exception as e: | |
| logger.debug(f"Seasonal lag not created: {e}") | |
| # Differences (stationarity) | |
| data_processed[f'{target_col}_diff_1'] = data_processed[target_col].diff(1) | |
| self.created_features.append(f'{target_col}_diff_1') | |
| if len(data_processed) > 7: | |
| data_processed[f'{target_col}_diff_7'] = data_processed[target_col].diff(7) | |
| self.created_features.append(f'{target_col}_diff_7') | |
| logger.info(f"✓ Lag features created: {len([c for c in data_processed.columns if ('lag' in c or 'diff' in c) and c not in data.columns])}") | |
| return data_processed | |
| def create_interaction_features( | |
| self, | |
| data: pd.DataFrame, | |
| target_col: str | |
| ) -> pd.DataFrame: | |
| """ | |
| Create interaction features | |
| Parameters: | |
| ----------- | |
| data : pd.DataFrame | |
| Input data | |
| target_col : str | |
| Target variable | |
| Returns: | |
| -------- | |
| pd.DataFrame | |
| Data with interaction features | |
| """ | |
| data_processed = data.copy() | |
| if target_col not in data_processed.columns: | |
| logger.warning(f"Target variable '{target_col}' not found") | |
| return data_processed | |
| # Interactions with temperature (only if data exists) | |
| temp_cols = ['tavg', 'tmin', 'tmax'] | |
| available_temp_cols = [col for col in temp_cols if col in data_processed.columns] | |
| for temp_col in available_temp_cols: | |
| try: | |
| # Avoid division by zero | |
| temp_data = data_processed[temp_col].replace(0, np.nan) | |
| if temp_data.notna().all() and (temp_data != 0).all(): | |
| data_processed[f'{target_col}_{temp_col}_ratio'] = data_processed[target_col] / temp_data | |
| self.created_features.append(f'{target_col}_{temp_col}_ratio') | |
| # Product | |
| data_processed[f'{target_col}_{temp_col}_product'] = data_processed[target_col] * temp_data | |
| self.created_features.append(f'{target_col}_{temp_col}_product') | |
| except Exception as e: | |
| logger.debug(f"Interaction feature with {temp_col} not created: {e}") | |
| # Interaction with water level | |
| if 'urovenvoda' in data_processed.columns: | |
| try: | |
| uroven_data = data_processed['urovenvoda'].replace(0, np.nan) | |
| if uroven_data.notna().all() and (uroven_data != 0).all(): | |
| data_processed[f'{target_col}_urovenvoda_ratio'] = data_processed[target_col] / uroven_data | |
| self.created_features.append(f'{target_col}_urovenvoda_ratio') | |
| except Exception as e: | |
| logger.debug(f"Interaction feature with urovenvoda not created: {e}") | |
| logger.info(f"✓ Interaction features created: {len([c for c in data_processed.columns if ('ratio' in c or 'product' in c) and c not in data.columns])}") | |
| return data_processed | |
| def create_spectral_features( | |
| self, | |
| data: pd.DataFrame, | |
| target_col: str | |
| ) -> pd.DataFrame: | |
| """ | |
| Create spectral features | |
| Parameters: | |
| ----------- | |
| data : pd.DataFrame | |
| Input data | |
| target_col : str | |
| Target variable | |
| Returns: | |
| -------- | |
| pd.DataFrame | |
| Data with spectral features | |
| """ | |
| data_processed = data.copy() | |
| if target_col not in data_processed.columns: | |
| logger.warning(f"Target variable '{target_col}' not found") | |
| return data_processed | |
| if len(data_processed) < 100: | |
| logger.info("Insufficient data for creating spectral features") | |
| return data_processed | |
| try: | |
| # Fast Fourier Transform | |
| series = data_processed[target_col].dropna().values | |
| if len(series) > 50: | |
| # Calculate periodogram | |
| from scipy.signal import periodogram | |
| freqs, psd = periodogram(series, fs=1.0) | |
| # Find dominant frequencies | |
| if len(psd) > 3: | |
| # Top-3 frequencies by power | |
| top_indices = np.argsort(psd)[-3:][::-1] | |
| for i, idx in enumerate(top_indices, 1): | |
| if idx < len(freqs): | |
| freq = freqs[idx] | |
| if freq > 0: | |
| period = 1 / freq | |
| data_processed[f'{target_col}_dominant_period_{i}'] = period | |
| self.created_features.append(f'{target_col}_dominant_period_{i}') | |
| except Exception as e: | |
| logger.debug(f"Spectral features creation failed: {e}") | |
| return data_processed | |
| def create_decomposition_features( | |
| self, | |
| data: pd.DataFrame, | |
| target_col: str | |
| ) -> pd.DataFrame: | |
| """ | |
| Create features based on decomposition | |
| Parameters: | |
| ----------- | |
| data : pd.DataFrame | |
| Input data | |
| target_col : str | |
| Target variable | |
| Returns: | |
| -------- | |
| pd.DataFrame | |
| Data with decomposition features | |
| """ | |
| data_processed = data.copy() | |
| if target_col not in data_processed.columns: | |
| logger.warning(f"Target variable '{target_col}' not found") | |
| return data_processed | |
| if len(data_processed) < 365: | |
| logger.info("Insufficient data for decomposition") | |
| return data_processed | |
| try: | |
| # Check for date presence | |
| if isinstance(data_processed.index, pd.DatetimeIndex): | |
| # STL decomposition | |
| if len(data_processed) > 730: # Need at least 2 years for yearly seasonality | |
| try: | |
| from statsmodels.tsa.seasonal import STL | |
| # STL decomposition | |
| stl = STL( | |
| data_processed[target_col].fillna(method='ffill'), | |
| period=365, | |
| robust=True | |
| ) | |
| result = stl.fit() | |
| # Add components | |
| data_processed[f'{target_col}_trend'] = result.trend | |
| data_processed[f'{target_col}_seasonal'] = result.seasonal | |
| data_processed[f'{target_col}_residual'] = result.resid | |
| self.created_features.extend([ | |
| f'{target_col}_trend', | |
| f'{target_col}_seasonal', | |
| f'{target_col}_residual' | |
| ]) | |
| logger.info("✓ STL decomposition successful") | |
| except Exception as e: | |
| logger.debug(f"STL decomposition failed: {e}") | |
| # Simple seasonal decomposition | |
| try: | |
| from statsmodels.tsa.seasonal import seasonal_decompose | |
| decomposition = seasonal_decompose( | |
| data_processed[target_col].fillna(method='ffill'), | |
| model='additive', | |
| period=365, | |
| extrapolate_trend='freq' | |
| ) | |
| data_processed[f'{target_col}_trend'] = decomposition.trend | |
| data_processed[f'{target_col}_seasonal'] = decomposition.seasonal | |
| self.created_features.extend([ | |
| f'{target_col}_trend', | |
| f'{target_col}_seasonal' | |
| ]) | |
| logger.info("✓ Seasonal decomposition successful") | |
| except Exception as e2: | |
| logger.debug(f"Seasonal decomposition failed: {e2}") | |
| except Exception as e: | |
| logger.debug(f"Decomposition features creation failed: {e}") | |
| return data_processed | |
| def get_feature_categories(self) -> Dict[str, List[str]]: | |
| """Get features by categories""" | |
| categories = { | |
| 'temporal': [], | |
| 'statistical': [], | |
| 'rolling': [], | |
| 'lag': [], | |
| 'interaction': [], | |
| 'spectral': [], | |
| 'decomposition': [], | |
| 'binary': [] | |
| } | |
| for feature in self.created_features: | |
| if any(keyword in feature for keyword in ['year', 'month', 'day', 'week', 'quarter', 'sin', 'cos', 'is_weekend']): | |
| categories['temporal'].append(feature) | |
| elif any(keyword in feature for keyword in ['zscore', 'above_p', 'yearly_']): | |
| if 'above_p' in feature: | |
| categories['binary'].append(feature) | |
| else: | |
| categories['statistical'].append(feature) | |
| elif 'rolling' in feature: | |
| categories['rolling'].append(feature) | |
| elif any(keyword in feature for keyword in ['lag', 'diff']): | |
| categories['lag'].append(feature) | |
| elif 'ratio' in feature or 'product' in feature: | |
| categories['interaction'].append(feature) | |
| elif 'dominant' in feature: | |
| categories['spectral'].append(feature) | |
| elif any(keyword in feature for keyword in ['trend', 'seasonal', 'residual']): | |
| categories['decomposition'].append(feature) | |
| # Remove empty categories | |
| categories = {k: v for k, v in categories.items() if v} | |
| return categories |