TimeFlowPro / features /feature_engineer.py
ArabovMK's picture
Update all files
d8f69a9
# ============================================
# CLASS 5: FEATURE ENGINEER
# ============================================
from typing import Dict, List, Optional
from venv import logger
from config.config import Config
import pandas as pd
import numpy as np
class FeatureEngineer:
"""Class for creating new features for time series"""
def __init__(self, config: Config):
"""
Initialise feature engineer
Parameters:
-----------
config : Config
Experiment configuration
"""
self.config = config
self.created_features = []
self.feature_info = {}
self.feature_importances = {}
self.transforms_applied = {}
def create_all_features(
self,
data: pd.DataFrame,
target_col: Optional[str] = None
) -> pd.DataFrame:
"""
Create all types of features
Parameters:
-----------
data : pd.DataFrame
Input data
target_col : str, optional
Target variable. If None, uses configuration value.
Returns:
--------
pd.DataFrame
Data with all features
"""
logger.info("\n" + "="*80)
logger.info("CREATING FEATURES FOR TIME SERIES")
logger.info("="*80)
target_col = target_col or self.config.target_column
initial_features = len(data.columns)
initial_rows = len(data)
# Check and save index
original_index = data.index
index_is_datetime = isinstance(original_index, pd.DatetimeIndex)
logger.info(f"Initial number of features: {initial_features}")
logger.info(f"Initial number of rows: {initial_rows}")
logger.info(f"Index is DatetimeIndex: {index_is_datetime}")
# If index not DatetimeIndex but 'date' column exists
if not index_is_datetime and 'date' in data.columns:
logger.info("Attempting to set DatetimeIndex from 'date' column")
try:
data = data.set_index('date')
if isinstance(data.index, pd.DatetimeIndex):
index_is_datetime = True
original_index = data.index
logger.info("✓ DatetimeIndex set from 'date' column")
else:
logger.warning("Failed to set DatetimeIndex")
except Exception as e:
logger.warning(f"Error setting DatetimeIndex: {e}")
# Save data copy for index restoration later
data_processed = data.copy()
# 1. Create basic temporal features (if date exists)
if index_is_datetime:
logger.info("\n1. BASIC TEMPORAL FEATURES")
data_processed = self.create_temporal_features(data_processed)
else:
logger.info("\n1. BASIC TEMPORAL FEATURES: skipped (no DatetimeIndex)")
# 2. Create statistical features
logger.info("\n2. STATISTICAL FEATURES")
data_processed = self.create_statistical_features(data_processed, target_col)
# 3. Create rolling features
logger.info("\n3. ROLLING FEATURES")
data_processed = self.create_rolling_features(data_processed, target_col)
# 4. Create lag features (limited quantity)
logger.info("\n4. LAG FEATURES")
data_processed = self.create_lag_features(data_processed, target_col)
# 5. Create interaction features
logger.info("\n5. INTERACTION FEATURES")
data_processed = self.create_interaction_features(data_processed, target_col)
# 6. Create spectral features (only if sufficient data)
logger.info("\n6. SPECTRAL FEATURES")
if len(data_processed) > 100:
data_processed = self.create_spectral_features(data_processed, target_col)
else:
logger.info(" Skipped: insufficient data")
# 7. Create decomposition features (only if sufficient data and date exists)
logger.info("\n7. DECOMPOSITION FEATURES")
if len(data_processed) > 365 and index_is_datetime:
data_processed = self.create_decomposition_features(data_processed, target_col)
else:
logger.info(" Skipped: insufficient data or no DatetimeIndex")
# Remove rows with NaN that appeared due to lags and differences
rows_before_nan = len(data_processed)
data_processed = data_processed.dropna()
rows_after_nan = len(data_processed)
removed_rows = rows_before_nan - rows_after_nan
# Remove constant features
constant_features = []
for col in data_processed.columns:
if data_processed[col].nunique() <= 1:
constant_features.append(col)
if constant_features:
logger.info(f"\nRemoving constant features: {len(constant_features)} found")
for feat in constant_features[:10]:
logger.info(f" - {feat}")
if len(constant_features) > 10:
logger.info(f" ... and {len(constant_features) - 10} more features")
data_processed = data_processed.drop(columns=constant_features)
# Update created features list
self.created_features = [f for f in self.created_features if f not in constant_features]
# Save information
self.feature_info = {
'initial_features': initial_features,
'final_features': len(data_processed.columns),
'features_created': len(self.created_features),
'initial_rows': initial_rows,
'final_rows': len(data_processed),
'removed_rows': removed_rows,
'constant_features_removed': len(constant_features),
'created_features_list': self.created_features,
'feature_categories': self.get_feature_categories()
}
logger.info(f"\nFeature creation summary:")
logger.info(f" Initial number of features: {initial_features}")
logger.info(f" Final number of features: {len(data_processed.columns)}")
logger.info(f" New features created: {len(self.created_features)}")
logger.info(f" Initial number of rows: {initial_rows}")
logger.info(f" Final number of rows: {len(data_processed)}")
logger.info(f" Rows removed due to NaN: {removed_rows}")
logger.info(f" Constant features removed: {len(constant_features)}")
return data_processed
def create_temporal_features(self, data: pd.DataFrame) -> pd.DataFrame:
"""
Create temporal features
Parameters:
-----------
data : pd.DataFrame
Input data
Returns:
--------
pd.DataFrame
Data with temporal features
"""
data_processed = data.copy()
if not isinstance(data_processed.index, pd.DatetimeIndex):
logger.warning("Temporal features not created: index not DatetimeIndex")
return data_processed
try:
# Basic temporal features
data_processed['year'] = data_processed.index.year
data_processed['month'] = data_processed.index.month
data_processed['day'] = data_processed.index.day
data_processed['dayofyear'] = data_processed.index.dayofyear
data_processed['dayofweek'] = data_processed.index.dayofweek
data_processed['weekofyear'] = data_processed.index.isocalendar().week.astype(int)
data_processed['quarter'] = data_processed.index.quarter
data_processed['is_weekend'] = data_processed['dayofweek'].isin([5, 6]).astype(int)
# Cyclic features for seasonality
data_processed['month_sin'] = np.sin(2 * np.pi * data_processed['month'] / 12)
data_processed['month_cos'] = np.cos(2 * np.pi * data_processed['month'] / 12)
data_processed['dayofyear_sin'] = np.sin(2 * np.pi * data_processed['dayofyear'] / 365.25)
data_processed['dayofyear_cos'] = np.cos(2 * np.pi * data_processed['dayofyear'] / 365.25)
data_processed['dayofweek_sin'] = np.sin(2 * np.pi * data_processed['dayofweek'] / 7)
data_processed['dayofweek_cos'] = np.cos(2 * np.pi * data_processed['dayofweek'] / 7)
# Time in days from start (relative features)
min_date = data_processed.index.min()
data_processed['days_from_start'] = (data_processed.index - min_date).days
# Register created features
temporal_features = ['year', 'month', 'day', 'dayofyear', 'dayofweek',
'weekofyear', 'quarter', 'is_weekend', 'month_sin',
'month_cos', 'dayofyear_sin', 'dayofyear_cos',
'dayofweek_sin', 'dayofweek_cos', 'days_from_start']
self.created_features.extend([f for f in temporal_features if f not in self.created_features])
logger.info(f"✓ Created {len(temporal_features)} temporal features")
except Exception as e:
logger.warning(f"Error creating temporal features: {e}")
return data_processed
def create_statistical_features(
self,
data: pd.DataFrame,
target_col: str
) -> pd.DataFrame:
"""
Create statistical features
Parameters:
-----------
data : pd.DataFrame
Input data
target_col : str
Target variable
Returns:
--------
pd.DataFrame
Data with statistical features
"""
data_processed = data.copy()
if target_col not in data_processed.columns:
logger.warning(f"Target variable '{target_col}' not found")
return data_processed
# Only if we have year data
if 'year' in data_processed.columns:
# Yearly statistics
try:
yearly_stats = data_processed.groupby('year')[target_col].agg([
'mean', 'std', 'min', 'max', 'median'
])
yearly_stats.columns = [f'{target_col}_yearly_{col}' for col in yearly_stats.columns]
data_processed = data_processed.merge(yearly_stats, on='year', how='left')
# Add created features to list
for col in yearly_stats.columns:
self.created_features.append(col)
except Exception as e:
logger.debug(f"Yearly statistics not created: {e}")
# Normalised features (only if there is variation)
std_val = data_processed[target_col].std()
if std_val > 0:
data_processed[f'{target_col}_zscore'] = (data_processed[target_col] - data_processed[target_col].mean()) / std_val
self.created_features.append(f'{target_col}_zscore')
# Features based on percentiles (binary features)
try:
for p in [0.25, 0.5, 0.75]:
quantile_val = data_processed[target_col].quantile(p)
data_processed[f'{target_col}_above_p{int(p*100)}'] = (data_processed[target_col] > quantile_val).astype(int)
self.created_features.append(f'{target_col}_above_p{int(p*100)}')
except Exception as e:
logger.debug(f"Quantile features not created: {e}")
logger.info(f"✓ Statistical features created: {len([c for c in data_processed.columns if c not in data.columns])}")
return data_processed
def create_rolling_features(
self,
data: pd.DataFrame,
target_col: str
) -> pd.DataFrame:
"""
Create rolling statistics
Parameters:
-----------
data : pd.DataFrame
Input data
target_col : str
Target variable
Returns:
--------
pd.DataFrame
Data with rolling features
"""
data_processed = data.copy()
if target_col not in data_processed.columns:
logger.warning(f"Target variable '{target_col}' not found")
return data_processed
# Use only main windows from configuration
windows = [w for w in self.config.rolling_windows if w < len(data_processed) // 2]
for window in windows:
try:
# Basic statistics
data_processed[f'{target_col}_rolling_mean_{window}'] = data_processed[target_col].rolling(
window=window, min_periods=max(1, window//4), center=True
).mean()
data_processed[f'{target_col}_rolling_std_{window}'] = data_processed[target_col].rolling(
window=window, min_periods=max(1, window//4), center=True
).std()
data_processed[f'{target_col}_rolling_min_{window}'] = data_processed[target_col].rolling(
window=window, min_periods=max(1, window//4), center=True
).min()
data_processed[f'{target_col}_rolling_max_{window}'] = data_processed[target_col].rolling(
window=window, min_periods=max(1, window//4), center=True
).max()
self.created_features.extend([
f'{target_col}_rolling_mean_{window}',
f'{target_col}_rolling_std_{window}',
f'{target_col}_rolling_min_{window}',
f'{target_col}_rolling_max_{window}'
])
except Exception as e:
logger.debug(f"Rolling features for window {window} not created: {e}")
continue
logger.info(f"✓ Rolling features created: {len([c for c in data_processed.columns if 'rolling' in c and c not in data.columns])}")
return data_processed
def create_lag_features(
self,
data: pd.DataFrame,
target_col: str
) -> pd.DataFrame:
"""
Create lag features
Parameters:
-----------
data : pd.DataFrame
Input data
target_col : str
Target variable
Returns:
--------
pd.DataFrame
Data with lag features
"""
data_processed = data.copy()
if target_col not in data_processed.columns:
logger.warning(f"Target variable '{target_col}' not found")
return data_processed
# Limited number of lags
max_lags = min(self.config.max_lags, 7) # Maximum 7 lags
for lag in [1, 2, 3, 7, 14, 30]:
if lag <= max_lags:
data_processed[f'{target_col}_lag_{lag}'] = data_processed[target_col].shift(lag)
self.created_features.append(f'{target_col}_lag_{lag}')
# Seasonal lags (only if sufficient data)
if len(data_processed) > 365:
try:
data_processed[f'{target_col}_seasonal_lag_365'] = data_processed[target_col].shift(365)
self.created_features.append(f'{target_col}_seasonal_lag_365')
except Exception as e:
logger.debug(f"Seasonal lag not created: {e}")
# Differences (stationarity)
data_processed[f'{target_col}_diff_1'] = data_processed[target_col].diff(1)
self.created_features.append(f'{target_col}_diff_1')
if len(data_processed) > 7:
data_processed[f'{target_col}_diff_7'] = data_processed[target_col].diff(7)
self.created_features.append(f'{target_col}_diff_7')
logger.info(f"✓ Lag features created: {len([c for c in data_processed.columns if ('lag' in c or 'diff' in c) and c not in data.columns])}")
return data_processed
def create_interaction_features(
self,
data: pd.DataFrame,
target_col: str
) -> pd.DataFrame:
"""
Create interaction features
Parameters:
-----------
data : pd.DataFrame
Input data
target_col : str
Target variable
Returns:
--------
pd.DataFrame
Data with interaction features
"""
data_processed = data.copy()
if target_col not in data_processed.columns:
logger.warning(f"Target variable '{target_col}' not found")
return data_processed
# Interactions with temperature (only if data exists)
temp_cols = ['tavg', 'tmin', 'tmax']
available_temp_cols = [col for col in temp_cols if col in data_processed.columns]
for temp_col in available_temp_cols:
try:
# Avoid division by zero
temp_data = data_processed[temp_col].replace(0, np.nan)
if temp_data.notna().all() and (temp_data != 0).all():
data_processed[f'{target_col}_{temp_col}_ratio'] = data_processed[target_col] / temp_data
self.created_features.append(f'{target_col}_{temp_col}_ratio')
# Product
data_processed[f'{target_col}_{temp_col}_product'] = data_processed[target_col] * temp_data
self.created_features.append(f'{target_col}_{temp_col}_product')
except Exception as e:
logger.debug(f"Interaction feature with {temp_col} not created: {e}")
# Interaction with water level
if 'urovenvoda' in data_processed.columns:
try:
uroven_data = data_processed['urovenvoda'].replace(0, np.nan)
if uroven_data.notna().all() and (uroven_data != 0).all():
data_processed[f'{target_col}_urovenvoda_ratio'] = data_processed[target_col] / uroven_data
self.created_features.append(f'{target_col}_urovenvoda_ratio')
except Exception as e:
logger.debug(f"Interaction feature with urovenvoda not created: {e}")
logger.info(f"✓ Interaction features created: {len([c for c in data_processed.columns if ('ratio' in c or 'product' in c) and c not in data.columns])}")
return data_processed
def create_spectral_features(
self,
data: pd.DataFrame,
target_col: str
) -> pd.DataFrame:
"""
Create spectral features
Parameters:
-----------
data : pd.DataFrame
Input data
target_col : str
Target variable
Returns:
--------
pd.DataFrame
Data with spectral features
"""
data_processed = data.copy()
if target_col not in data_processed.columns:
logger.warning(f"Target variable '{target_col}' not found")
return data_processed
if len(data_processed) < 100:
logger.info("Insufficient data for creating spectral features")
return data_processed
try:
# Fast Fourier Transform
series = data_processed[target_col].dropna().values
if len(series) > 50:
# Calculate periodogram
from scipy.signal import periodogram
freqs, psd = periodogram(series, fs=1.0)
# Find dominant frequencies
if len(psd) > 3:
# Top-3 frequencies by power
top_indices = np.argsort(psd)[-3:][::-1]
for i, idx in enumerate(top_indices, 1):
if idx < len(freqs):
freq = freqs[idx]
if freq > 0:
period = 1 / freq
data_processed[f'{target_col}_dominant_period_{i}'] = period
self.created_features.append(f'{target_col}_dominant_period_{i}')
except Exception as e:
logger.debug(f"Spectral features creation failed: {e}")
return data_processed
def create_decomposition_features(
self,
data: pd.DataFrame,
target_col: str
) -> pd.DataFrame:
"""
Create features based on decomposition
Parameters:
-----------
data : pd.DataFrame
Input data
target_col : str
Target variable
Returns:
--------
pd.DataFrame
Data with decomposition features
"""
data_processed = data.copy()
if target_col not in data_processed.columns:
logger.warning(f"Target variable '{target_col}' not found")
return data_processed
if len(data_processed) < 365:
logger.info("Insufficient data for decomposition")
return data_processed
try:
# Check for date presence
if isinstance(data_processed.index, pd.DatetimeIndex):
# STL decomposition
if len(data_processed) > 730: # Need at least 2 years for yearly seasonality
try:
from statsmodels.tsa.seasonal import STL
# STL decomposition
stl = STL(
data_processed[target_col].fillna(method='ffill'),
period=365,
robust=True
)
result = stl.fit()
# Add components
data_processed[f'{target_col}_trend'] = result.trend
data_processed[f'{target_col}_seasonal'] = result.seasonal
data_processed[f'{target_col}_residual'] = result.resid
self.created_features.extend([
f'{target_col}_trend',
f'{target_col}_seasonal',
f'{target_col}_residual'
])
logger.info("✓ STL decomposition successful")
except Exception as e:
logger.debug(f"STL decomposition failed: {e}")
# Simple seasonal decomposition
try:
from statsmodels.tsa.seasonal import seasonal_decompose
decomposition = seasonal_decompose(
data_processed[target_col].fillna(method='ffill'),
model='additive',
period=365,
extrapolate_trend='freq'
)
data_processed[f'{target_col}_trend'] = decomposition.trend
data_processed[f'{target_col}_seasonal'] = decomposition.seasonal
self.created_features.extend([
f'{target_col}_trend',
f'{target_col}_seasonal'
])
logger.info("✓ Seasonal decomposition successful")
except Exception as e2:
logger.debug(f"Seasonal decomposition failed: {e2}")
except Exception as e:
logger.debug(f"Decomposition features creation failed: {e}")
return data_processed
def get_feature_categories(self) -> Dict[str, List[str]]:
"""Get features by categories"""
categories = {
'temporal': [],
'statistical': [],
'rolling': [],
'lag': [],
'interaction': [],
'spectral': [],
'decomposition': [],
'binary': []
}
for feature in self.created_features:
if any(keyword in feature for keyword in ['year', 'month', 'day', 'week', 'quarter', 'sin', 'cos', 'is_weekend']):
categories['temporal'].append(feature)
elif any(keyword in feature for keyword in ['zscore', 'above_p', 'yearly_']):
if 'above_p' in feature:
categories['binary'].append(feature)
else:
categories['statistical'].append(feature)
elif 'rolling' in feature:
categories['rolling'].append(feature)
elif any(keyword in feature for keyword in ['lag', 'diff']):
categories['lag'].append(feature)
elif 'ratio' in feature or 'product' in feature:
categories['interaction'].append(feature)
elif 'dominant' in feature:
categories['spectral'].append(feature)
elif any(keyword in feature for keyword in ['trend', 'seasonal', 'residual']):
categories['decomposition'].append(feature)
# Remove empty categories
categories = {k: v for k, v in categories.items() if v}
return categories