""" Feature Engineering Tools Tools for creating new features from existing data. """ import polars as pl import numpy as np from typing import Dict, Any, List, Optional from pathlib import Path import sys import os # Add parent directory to path for imports sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from ..utils.polars_helpers import ( load_dataframe, save_dataframe, get_numeric_columns, get_categorical_columns, ) from ..utils.validation import ( validate_file_exists, validate_file_format, validate_dataframe, validate_column_exists, validate_datetime_column, ) def create_time_features(file_path: str, date_col: str, output_path: str) -> Dict[str, Any]: """ Extract comprehensive time-based features from datetime column. Args: file_path: Path to CSV or Parquet file date_col: Name of datetime column output_path: Path to save dataset with new features Returns: Dictionary with feature engineering report """ # Validation validate_file_exists(file_path) validate_file_format(file_path) # Load data df = load_dataframe(file_path) validate_dataframe(df) validate_column_exists(df, date_col) # Try to parse datetime if it's a string if df[date_col].dtype == pl.Utf8: try: df = df.with_columns( pl.col(date_col).str.strptime(pl.Datetime, strict=False).alias(date_col) ) except: return { "status": "error", "message": f"Could not parse column '{date_col}' as datetime" } # Validate it's now a datetime if df[date_col].dtype not in [pl.Date, pl.Datetime]: return { "status": "error", "message": f"Column '{date_col}' is not a datetime type (dtype: {df[date_col].dtype})" } features_created = [] # Extract basic time features df = df.with_columns([ pl.col(date_col).dt.year().alias(f"{date_col}_year"), pl.col(date_col).dt.month().alias(f"{date_col}_month"), pl.col(date_col).dt.day().alias(f"{date_col}_day"), pl.col(date_col).dt.weekday().alias(f"{date_col}_dayofweek"), pl.col(date_col).dt.quarter().alias(f"{date_col}_quarter"), ]) features_created.extend([ f"{date_col}_year", f"{date_col}_month", f"{date_col}_day", f"{date_col}_dayofweek", f"{date_col}_quarter" ]) # Create is_weekend feature df = df.with_columns( (pl.col(f"{date_col}_dayofweek") >= 5).cast(pl.Int8).alias(f"{date_col}_is_weekend") ) features_created.append(f"{date_col}_is_weekend") # Cyclical encoding for month (sin/cos) df = df.with_columns([ (2 * np.pi * pl.col(f"{date_col}_month") / 12).sin().alias(f"{date_col}_month_sin"), (2 * np.pi * pl.col(f"{date_col}_month") / 12).cos().alias(f"{date_col}_month_cos"), ]) features_created.extend([ f"{date_col}_month_sin", f"{date_col}_month_cos" ]) # If datetime has time component, extract hour if df[date_col].dtype == pl.Datetime: try: df = df.with_columns([ pl.col(date_col).dt.hour().alias(f"{date_col}_hour"), ]) features_created.append(f"{date_col}_hour") # Cyclical encoding for hour df = df.with_columns([ (2 * np.pi * pl.col(f"{date_col}_hour") / 24).sin().alias(f"{date_col}_hour_sin"), (2 * np.pi * pl.col(f"{date_col}_hour") / 24).cos().alias(f"{date_col}_hour_cos"), ]) features_created.extend([ f"{date_col}_hour_sin", f"{date_col}_hour_cos" ]) except: pass # Hour extraction failed, skip # Save dataset Path(output_path).parent.mkdir(parents=True, exist_ok=True) save_dataframe(df, output_path) return { "status": "success", "features_created": features_created, "num_features": len(features_created), "output_path": output_path } def encode_categorical(file_path: str, method: str = "auto", columns: Optional[List[str]] = None, target_col: Optional[str] = None, output_path: str = None) -> Dict[str, Any]: """ Encode categorical variables. Args: file_path: Path to CSV or Parquet file method: Encoding method ('one_hot', 'target', 'frequency', 'auto') columns: List of columns to encode, or ['all'] for all categorical. If None, defaults to all categorical columns target_col: Required for target encoding - name of target column output_path: Path to save dataset with encoded features Returns: Dictionary with encoding report """ # Validation validate_file_exists(file_path) validate_file_format(file_path) # Load data df = load_dataframe(file_path) validate_dataframe(df) # Determine which columns to process categorical_cols = get_categorical_columns(df) # Default to all categorical columns if not specified if columns is None or columns == ["all"]: target_cols = categorical_cols else: # Validate columns exist for col in columns: if col not in df.columns: raise ValueError(f"Column '{col}' not found") target_cols = columns # Auto-detect method if 'auto' if method == "auto": # Use frequency encoding for high-cardinality, one-hot for low method = "frequency" # Default safe choice # For target encoding, validate target column if method == "target": if target_col is None: return { "status": "error", "message": "target_col is required for target encoding" } validate_column_exists(df, target_col) report = { "method": method, "columns_processed": {}, "features_created": [] } # Process each column for col in target_cols: if col not in df.columns: report["columns_processed"][col] = { "status": "error", "message": "Column not found" } continue n_unique = df[col].n_unique() try: if method == "one_hot": # One-hot encoding # Limit to top categories if too many if n_unique > 50: report["columns_processed"][col] = { "status": "warning", "message": f"Column has {n_unique} unique values. Consider using frequency or target encoding instead." } continue # Get dummies encoded = df.select(pl.col(col)).to_dummies(columns=[col]) # Add encoded columns to dataframe for enc_col in encoded.columns: df = df.with_columns(encoded[enc_col]) report["features_created"].append(enc_col) # Drop original column df = df.drop(col) report["columns_processed"][col] = { "status": "success", "num_features_created": len(encoded.columns) } elif method == "frequency": # Frequency encoding value_counts = df[col].value_counts() freq_map = { row[0]: row[1] / len(df) for row in value_counts.iter_rows() } # Create new column with frequencies new_col_name = f"{col}_freq" df = df.with_columns( pl.col(col).map_dict(freq_map, default=0.0).alias(new_col_name) ) # Drop original column df = df.drop(col) report["features_created"].append(new_col_name) report["columns_processed"][col] = { "status": "success", "num_features_created": 1 } elif method == "target": # Target encoding (mean encoding) # Calculate mean target value for each category target_means = ( df.group_by(col) .agg(pl.col(target_col).mean().alias("target_mean")) ) # Create dictionary for mapping target_map = { row[0]: row[1] for row in target_means.iter_rows() } # Global mean for unseen categories global_mean = df[target_col].mean() # Create new column with target encoding new_col_name = f"{col}_target_enc" df = df.with_columns( pl.col(col).map_dict(target_map, default=global_mean).alias(new_col_name) ) # Drop original column df = df.drop(col) report["features_created"].append(new_col_name) report["columns_processed"][col] = { "status": "success", "num_features_created": 1 } except Exception as e: report["columns_processed"][col] = { "status": "error", "message": str(e) } report["total_features_created"] = len(report["features_created"]) # Save dataset Path(output_path).parent.mkdir(parents=True, exist_ok=True) save_dataframe(df, output_path) report["output_path"] = output_path return report