# ============================================ # CLASS 5: FEATURE ENGINEER # ============================================ from typing import Dict, List, Optional from venv import logger from config.config import Config import pandas as pd import numpy as np class FeatureEngineer: """Class for creating new features for time series""" def __init__(self, config: Config): """ Initialise feature engineer Parameters: ----------- config : Config Experiment configuration """ self.config = config self.created_features = [] self.feature_info = {} self.feature_importances = {} self.transforms_applied = {} def create_all_features( self, data: pd.DataFrame, target_col: Optional[str] = None ) -> pd.DataFrame: """ Create all types of features Parameters: ----------- data : pd.DataFrame Input data target_col : str, optional Target variable. If None, uses configuration value. Returns: -------- pd.DataFrame Data with all features """ logger.info("\n" + "="*80) logger.info("CREATING FEATURES FOR TIME SERIES") logger.info("="*80) target_col = target_col or self.config.target_column initial_features = len(data.columns) initial_rows = len(data) # Check and save index original_index = data.index index_is_datetime = isinstance(original_index, pd.DatetimeIndex) logger.info(f"Initial number of features: {initial_features}") logger.info(f"Initial number of rows: {initial_rows}") logger.info(f"Index is DatetimeIndex: {index_is_datetime}") # If index not DatetimeIndex but 'date' column exists if not index_is_datetime and 'date' in data.columns: logger.info("Attempting to set DatetimeIndex from 'date' column") try: data = data.set_index('date') if isinstance(data.index, pd.DatetimeIndex): index_is_datetime = True original_index = data.index logger.info("✓ DatetimeIndex set from 'date' column") else: logger.warning("Failed to set DatetimeIndex") except Exception as e: logger.warning(f"Error setting DatetimeIndex: {e}") # Save data copy for index restoration later data_processed = data.copy() # 1. Create basic temporal features (if date exists) if index_is_datetime: logger.info("\n1. BASIC TEMPORAL FEATURES") data_processed = self.create_temporal_features(data_processed) else: logger.info("\n1. BASIC TEMPORAL FEATURES: skipped (no DatetimeIndex)") # 2. Create statistical features logger.info("\n2. STATISTICAL FEATURES") data_processed = self.create_statistical_features(data_processed, target_col) # 3. Create rolling features logger.info("\n3. ROLLING FEATURES") data_processed = self.create_rolling_features(data_processed, target_col) # 4. Create lag features (limited quantity) logger.info("\n4. LAG FEATURES") data_processed = self.create_lag_features(data_processed, target_col) # 5. Create interaction features logger.info("\n5. INTERACTION FEATURES") data_processed = self.create_interaction_features(data_processed, target_col) # 6. Create spectral features (only if sufficient data) logger.info("\n6. SPECTRAL FEATURES") if len(data_processed) > 100: data_processed = self.create_spectral_features(data_processed, target_col) else: logger.info(" Skipped: insufficient data") # 7. Create decomposition features (only if sufficient data and date exists) logger.info("\n7. DECOMPOSITION FEATURES") if len(data_processed) > 365 and index_is_datetime: data_processed = self.create_decomposition_features(data_processed, target_col) else: logger.info(" Skipped: insufficient data or no DatetimeIndex") # Remove rows with NaN that appeared due to lags and differences rows_before_nan = len(data_processed) data_processed = data_processed.dropna() rows_after_nan = len(data_processed) removed_rows = rows_before_nan - rows_after_nan # Remove constant features constant_features = [] for col in data_processed.columns: if data_processed[col].nunique() <= 1: constant_features.append(col) if constant_features: logger.info(f"\nRemoving constant features: {len(constant_features)} found") for feat in constant_features[:10]: logger.info(f" - {feat}") if len(constant_features) > 10: logger.info(f" ... and {len(constant_features) - 10} more features") data_processed = data_processed.drop(columns=constant_features) # Update created features list self.created_features = [f for f in self.created_features if f not in constant_features] # Save information self.feature_info = { 'initial_features': initial_features, 'final_features': len(data_processed.columns), 'features_created': len(self.created_features), 'initial_rows': initial_rows, 'final_rows': len(data_processed), 'removed_rows': removed_rows, 'constant_features_removed': len(constant_features), 'created_features_list': self.created_features, 'feature_categories': self.get_feature_categories() } logger.info(f"\nFeature creation summary:") logger.info(f" Initial number of features: {initial_features}") logger.info(f" Final number of features: {len(data_processed.columns)}") logger.info(f" New features created: {len(self.created_features)}") logger.info(f" Initial number of rows: {initial_rows}") logger.info(f" Final number of rows: {len(data_processed)}") logger.info(f" Rows removed due to NaN: {removed_rows}") logger.info(f" Constant features removed: {len(constant_features)}") return data_processed def create_temporal_features(self, data: pd.DataFrame) -> pd.DataFrame: """ Create temporal features Parameters: ----------- data : pd.DataFrame Input data Returns: -------- pd.DataFrame Data with temporal features """ data_processed = data.copy() if not isinstance(data_processed.index, pd.DatetimeIndex): logger.warning("Temporal features not created: index not DatetimeIndex") return data_processed try: # Basic temporal features data_processed['year'] = data_processed.index.year data_processed['month'] = data_processed.index.month data_processed['day'] = data_processed.index.day data_processed['dayofyear'] = data_processed.index.dayofyear data_processed['dayofweek'] = data_processed.index.dayofweek data_processed['weekofyear'] = data_processed.index.isocalendar().week.astype(int) data_processed['quarter'] = data_processed.index.quarter data_processed['is_weekend'] = data_processed['dayofweek'].isin([5, 6]).astype(int) # Cyclic features for seasonality data_processed['month_sin'] = np.sin(2 * np.pi * data_processed['month'] / 12) data_processed['month_cos'] = np.cos(2 * np.pi * data_processed['month'] / 12) data_processed['dayofyear_sin'] = np.sin(2 * np.pi * data_processed['dayofyear'] / 365.25) data_processed['dayofyear_cos'] = np.cos(2 * np.pi * data_processed['dayofyear'] / 365.25) data_processed['dayofweek_sin'] = np.sin(2 * np.pi * data_processed['dayofweek'] / 7) data_processed['dayofweek_cos'] = np.cos(2 * np.pi * data_processed['dayofweek'] / 7) # Time in days from start (relative features) min_date = data_processed.index.min() data_processed['days_from_start'] = (data_processed.index - min_date).days # Register created features temporal_features = ['year', 'month', 'day', 'dayofyear', 'dayofweek', 'weekofyear', 'quarter', 'is_weekend', 'month_sin', 'month_cos', 'dayofyear_sin', 'dayofyear_cos', 'dayofweek_sin', 'dayofweek_cos', 'days_from_start'] self.created_features.extend([f for f in temporal_features if f not in self.created_features]) logger.info(f"✓ Created {len(temporal_features)} temporal features") except Exception as e: logger.warning(f"Error creating temporal features: {e}") return data_processed def create_statistical_features( self, data: pd.DataFrame, target_col: str ) -> pd.DataFrame: """ Create statistical features Parameters: ----------- data : pd.DataFrame Input data target_col : str Target variable Returns: -------- pd.DataFrame Data with statistical features """ data_processed = data.copy() if target_col not in data_processed.columns: logger.warning(f"Target variable '{target_col}' not found") return data_processed # Only if we have year data if 'year' in data_processed.columns: # Yearly statistics try: yearly_stats = data_processed.groupby('year')[target_col].agg([ 'mean', 'std', 'min', 'max', 'median' ]) yearly_stats.columns = [f'{target_col}_yearly_{col}' for col in yearly_stats.columns] data_processed = data_processed.merge(yearly_stats, on='year', how='left') # Add created features to list for col in yearly_stats.columns: self.created_features.append(col) except Exception as e: logger.debug(f"Yearly statistics not created: {e}") # Normalised features (only if there is variation) std_val = data_processed[target_col].std() if std_val > 0: data_processed[f'{target_col}_zscore'] = (data_processed[target_col] - data_processed[target_col].mean()) / std_val self.created_features.append(f'{target_col}_zscore') # Features based on percentiles (binary features) try: for p in [0.25, 0.5, 0.75]: quantile_val = data_processed[target_col].quantile(p) data_processed[f'{target_col}_above_p{int(p*100)}'] = (data_processed[target_col] > quantile_val).astype(int) self.created_features.append(f'{target_col}_above_p{int(p*100)}') except Exception as e: logger.debug(f"Quantile features not created: {e}") logger.info(f"✓ Statistical features created: {len([c for c in data_processed.columns if c not in data.columns])}") return data_processed def create_rolling_features( self, data: pd.DataFrame, target_col: str ) -> pd.DataFrame: """ Create rolling statistics Parameters: ----------- data : pd.DataFrame Input data target_col : str Target variable Returns: -------- pd.DataFrame Data with rolling features """ data_processed = data.copy() if target_col not in data_processed.columns: logger.warning(f"Target variable '{target_col}' not found") return data_processed # Use only main windows from configuration windows = [w for w in self.config.rolling_windows if w < len(data_processed) // 2] for window in windows: try: # Basic statistics data_processed[f'{target_col}_rolling_mean_{window}'] = data_processed[target_col].rolling( window=window, min_periods=max(1, window//4), center=True ).mean() data_processed[f'{target_col}_rolling_std_{window}'] = data_processed[target_col].rolling( window=window, min_periods=max(1, window//4), center=True ).std() data_processed[f'{target_col}_rolling_min_{window}'] = data_processed[target_col].rolling( window=window, min_periods=max(1, window//4), center=True ).min() data_processed[f'{target_col}_rolling_max_{window}'] = data_processed[target_col].rolling( window=window, min_periods=max(1, window//4), center=True ).max() self.created_features.extend([ f'{target_col}_rolling_mean_{window}', f'{target_col}_rolling_std_{window}', f'{target_col}_rolling_min_{window}', f'{target_col}_rolling_max_{window}' ]) except Exception as e: logger.debug(f"Rolling features for window {window} not created: {e}") continue logger.info(f"✓ Rolling features created: {len([c for c in data_processed.columns if 'rolling' in c and c not in data.columns])}") return data_processed def create_lag_features( self, data: pd.DataFrame, target_col: str ) -> pd.DataFrame: """ Create lag features Parameters: ----------- data : pd.DataFrame Input data target_col : str Target variable Returns: -------- pd.DataFrame Data with lag features """ data_processed = data.copy() if target_col not in data_processed.columns: logger.warning(f"Target variable '{target_col}' not found") return data_processed # Limited number of lags max_lags = min(self.config.max_lags, 7) # Maximum 7 lags for lag in [1, 2, 3, 7, 14, 30]: if lag <= max_lags: data_processed[f'{target_col}_lag_{lag}'] = data_processed[target_col].shift(lag) self.created_features.append(f'{target_col}_lag_{lag}') # Seasonal lags (only if sufficient data) if len(data_processed) > 365: try: data_processed[f'{target_col}_seasonal_lag_365'] = data_processed[target_col].shift(365) self.created_features.append(f'{target_col}_seasonal_lag_365') except Exception as e: logger.debug(f"Seasonal lag not created: {e}") # Differences (stationarity) data_processed[f'{target_col}_diff_1'] = data_processed[target_col].diff(1) self.created_features.append(f'{target_col}_diff_1') if len(data_processed) > 7: data_processed[f'{target_col}_diff_7'] = data_processed[target_col].diff(7) self.created_features.append(f'{target_col}_diff_7') logger.info(f"✓ Lag features created: {len([c for c in data_processed.columns if ('lag' in c or 'diff' in c) and c not in data.columns])}") return data_processed def create_interaction_features( self, data: pd.DataFrame, target_col: str ) -> pd.DataFrame: """ Create interaction features Parameters: ----------- data : pd.DataFrame Input data target_col : str Target variable Returns: -------- pd.DataFrame Data with interaction features """ data_processed = data.copy() if target_col not in data_processed.columns: logger.warning(f"Target variable '{target_col}' not found") return data_processed # Interactions with temperature (only if data exists) temp_cols = ['tavg', 'tmin', 'tmax'] available_temp_cols = [col for col in temp_cols if col in data_processed.columns] for temp_col in available_temp_cols: try: # Avoid division by zero temp_data = data_processed[temp_col].replace(0, np.nan) if temp_data.notna().all() and (temp_data != 0).all(): data_processed[f'{target_col}_{temp_col}_ratio'] = data_processed[target_col] / temp_data self.created_features.append(f'{target_col}_{temp_col}_ratio') # Product data_processed[f'{target_col}_{temp_col}_product'] = data_processed[target_col] * temp_data self.created_features.append(f'{target_col}_{temp_col}_product') except Exception as e: logger.debug(f"Interaction feature with {temp_col} not created: {e}") # Interaction with water level if 'urovenvoda' in data_processed.columns: try: uroven_data = data_processed['urovenvoda'].replace(0, np.nan) if uroven_data.notna().all() and (uroven_data != 0).all(): data_processed[f'{target_col}_urovenvoda_ratio'] = data_processed[target_col] / uroven_data self.created_features.append(f'{target_col}_urovenvoda_ratio') except Exception as e: logger.debug(f"Interaction feature with urovenvoda not created: {e}") logger.info(f"✓ Interaction features created: {len([c for c in data_processed.columns if ('ratio' in c or 'product' in c) and c not in data.columns])}") return data_processed def create_spectral_features( self, data: pd.DataFrame, target_col: str ) -> pd.DataFrame: """ Create spectral features Parameters: ----------- data : pd.DataFrame Input data target_col : str Target variable Returns: -------- pd.DataFrame Data with spectral features """ data_processed = data.copy() if target_col not in data_processed.columns: logger.warning(f"Target variable '{target_col}' not found") return data_processed if len(data_processed) < 100: logger.info("Insufficient data for creating spectral features") return data_processed try: # Fast Fourier Transform series = data_processed[target_col].dropna().values if len(series) > 50: # Calculate periodogram from scipy.signal import periodogram freqs, psd = periodogram(series, fs=1.0) # Find dominant frequencies if len(psd) > 3: # Top-3 frequencies by power top_indices = np.argsort(psd)[-3:][::-1] for i, idx in enumerate(top_indices, 1): if idx < len(freqs): freq = freqs[idx] if freq > 0: period = 1 / freq data_processed[f'{target_col}_dominant_period_{i}'] = period self.created_features.append(f'{target_col}_dominant_period_{i}') except Exception as e: logger.debug(f"Spectral features creation failed: {e}") return data_processed def create_decomposition_features( self, data: pd.DataFrame, target_col: str ) -> pd.DataFrame: """ Create features based on decomposition Parameters: ----------- data : pd.DataFrame Input data target_col : str Target variable Returns: -------- pd.DataFrame Data with decomposition features """ data_processed = data.copy() if target_col not in data_processed.columns: logger.warning(f"Target variable '{target_col}' not found") return data_processed if len(data_processed) < 365: logger.info("Insufficient data for decomposition") return data_processed try: # Check for date presence if isinstance(data_processed.index, pd.DatetimeIndex): # STL decomposition if len(data_processed) > 730: # Need at least 2 years for yearly seasonality try: from statsmodels.tsa.seasonal import STL # STL decomposition stl = STL( data_processed[target_col].fillna(method='ffill'), period=365, robust=True ) result = stl.fit() # Add components data_processed[f'{target_col}_trend'] = result.trend data_processed[f'{target_col}_seasonal'] = result.seasonal data_processed[f'{target_col}_residual'] = result.resid self.created_features.extend([ f'{target_col}_trend', f'{target_col}_seasonal', f'{target_col}_residual' ]) logger.info("✓ STL decomposition successful") except Exception as e: logger.debug(f"STL decomposition failed: {e}") # Simple seasonal decomposition try: from statsmodels.tsa.seasonal import seasonal_decompose decomposition = seasonal_decompose( data_processed[target_col].fillna(method='ffill'), model='additive', period=365, extrapolate_trend='freq' ) data_processed[f'{target_col}_trend'] = decomposition.trend data_processed[f'{target_col}_seasonal'] = decomposition.seasonal self.created_features.extend([ f'{target_col}_trend', f'{target_col}_seasonal' ]) logger.info("✓ Seasonal decomposition successful") except Exception as e2: logger.debug(f"Seasonal decomposition failed: {e2}") except Exception as e: logger.debug(f"Decomposition features creation failed: {e}") return data_processed def get_feature_categories(self) -> Dict[str, List[str]]: """Get features by categories""" categories = { 'temporal': [], 'statistical': [], 'rolling': [], 'lag': [], 'interaction': [], 'spectral': [], 'decomposition': [], 'binary': [] } for feature in self.created_features: if any(keyword in feature for keyword in ['year', 'month', 'day', 'week', 'quarter', 'sin', 'cos', 'is_weekend']): categories['temporal'].append(feature) elif any(keyword in feature for keyword in ['zscore', 'above_p', 'yearly_']): if 'above_p' in feature: categories['binary'].append(feature) else: categories['statistical'].append(feature) elif 'rolling' in feature: categories['rolling'].append(feature) elif any(keyword in feature for keyword in ['lag', 'diff']): categories['lag'].append(feature) elif 'ratio' in feature or 'product' in feature: categories['interaction'].append(feature) elif 'dominant' in feature: categories['spectral'].append(feature) elif any(keyword in feature for keyword in ['trend', 'seasonal', 'residual']): categories['decomposition'].append(feature) # Remove empty categories categories = {k: v for k, v in categories.items() if v} return categories