File size: 10,177 Bytes
226ac39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
227cb22
226ac39
 
 
 
 
227cb22
226ac39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
"""
Feature Engineering Tools
Tools for creating new features from existing data.
"""

import polars as pl
import numpy as np
from typing import Dict, Any, List, Optional
from pathlib import Path
import sys
import os

# Add parent directory to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from ..utils.polars_helpers import (
    load_dataframe,
    save_dataframe,
    get_numeric_columns,
    get_categorical_columns,
)
from ..utils.validation import (
    validate_file_exists,
    validate_file_format,
    validate_dataframe,
    validate_column_exists,
    validate_datetime_column,
)


def create_time_features(file_path: str, date_col: str, 
                        output_path: str) -> Dict[str, Any]:
    """
    Extract comprehensive time-based features from datetime column.
    
    Args:
        file_path: Path to CSV or Parquet file
        date_col: Name of datetime column
        output_path: Path to save dataset with new features
        
    Returns:
        Dictionary with feature engineering report
    """
    # Validation
    validate_file_exists(file_path)
    validate_file_format(file_path)
    
    # Load data
    df = load_dataframe(file_path)
    validate_dataframe(df)
    validate_column_exists(df, date_col)
    
    # Try to parse datetime if it's a string
    if df[date_col].dtype == pl.Utf8:
        try:
            df = df.with_columns(
                pl.col(date_col).str.strptime(pl.Datetime, strict=False).alias(date_col)
            )
        except:
            return {
                "status": "error",
                "message": f"Could not parse column '{date_col}' as datetime"
            }
    
    # Validate it's now a datetime
    if df[date_col].dtype not in [pl.Date, pl.Datetime]:
        return {
            "status": "error",
            "message": f"Column '{date_col}' is not a datetime type (dtype: {df[date_col].dtype})"
        }
    
    features_created = []
    
    # Extract basic time features
    df = df.with_columns([
        pl.col(date_col).dt.year().alias(f"{date_col}_year"),
        pl.col(date_col).dt.month().alias(f"{date_col}_month"),
        pl.col(date_col).dt.day().alias(f"{date_col}_day"),
        pl.col(date_col).dt.weekday().alias(f"{date_col}_dayofweek"),
        pl.col(date_col).dt.quarter().alias(f"{date_col}_quarter"),
    ])
    
    features_created.extend([
        f"{date_col}_year",
        f"{date_col}_month",
        f"{date_col}_day",
        f"{date_col}_dayofweek",
        f"{date_col}_quarter"
    ])
    
    # Create is_weekend feature
    df = df.with_columns(
        (pl.col(f"{date_col}_dayofweek") >= 5).cast(pl.Int8).alias(f"{date_col}_is_weekend")
    )
    features_created.append(f"{date_col}_is_weekend")
    
    # Cyclical encoding for month (sin/cos)
    df = df.with_columns([
        (2 * np.pi * pl.col(f"{date_col}_month") / 12).sin().alias(f"{date_col}_month_sin"),
        (2 * np.pi * pl.col(f"{date_col}_month") / 12).cos().alias(f"{date_col}_month_cos"),
    ])
    features_created.extend([
        f"{date_col}_month_sin",
        f"{date_col}_month_cos"
    ])
    
    # If datetime has time component, extract hour
    if df[date_col].dtype == pl.Datetime:
        try:
            df = df.with_columns([
                pl.col(date_col).dt.hour().alias(f"{date_col}_hour"),
            ])
            features_created.append(f"{date_col}_hour")
            
            # Cyclical encoding for hour
            df = df.with_columns([
                (2 * np.pi * pl.col(f"{date_col}_hour") / 24).sin().alias(f"{date_col}_hour_sin"),
                (2 * np.pi * pl.col(f"{date_col}_hour") / 24).cos().alias(f"{date_col}_hour_cos"),
            ])
            features_created.extend([
                f"{date_col}_hour_sin",
                f"{date_col}_hour_cos"
            ])
        except:
            pass  # Hour extraction failed, skip
    
    # Save dataset
    Path(output_path).parent.mkdir(parents=True, exist_ok=True)
    save_dataframe(df, output_path)
    
    return {
        "status": "success",
        "features_created": features_created,
        "num_features": len(features_created),
        "output_path": output_path
    }


def encode_categorical(file_path: str, method: str = "auto", columns: Optional[List[str]] = None,
                      target_col: Optional[str] = None, 
                      output_path: str = None) -> Dict[str, Any]:
    """
    Encode categorical variables.
    
    Args:
        file_path: Path to CSV or Parquet file
        method: Encoding method ('one_hot', 'target', 'frequency', 'auto')
        columns: List of columns to encode, or ['all'] for all categorical. If None, defaults to all categorical columns
        target_col: Required for target encoding - name of target column
        output_path: Path to save dataset with encoded features
        
    Returns:
        Dictionary with encoding report
    """
    # Validation
    validate_file_exists(file_path)
    validate_file_format(file_path)
    
    # Load data
    df = load_dataframe(file_path)
    validate_dataframe(df)
    
    # Determine which columns to process
    categorical_cols = get_categorical_columns(df)
    
    # Default to all categorical columns if not specified
    if columns is None or columns == ["all"]:
        target_cols = categorical_cols
    else:
        # Validate columns exist
        for col in columns:
            if col not in df.columns:
                raise ValueError(f"Column '{col}' not found")
        target_cols = columns
    
    # Auto-detect method if 'auto'
    if method == "auto":
        # Use frequency encoding for high-cardinality, one-hot for low
        method = "frequency"  # Default safe choice
    
    # For target encoding, validate target column
    if method == "target":
        if target_col is None:
            return {
                "status": "error",
                "message": "target_col is required for target encoding"
            }
        validate_column_exists(df, target_col)
    
    report = {
        "method": method,
        "columns_processed": {},
        "features_created": []
    }
    
    # Process each column
    for col in target_cols:
        if col not in df.columns:
            report["columns_processed"][col] = {
                "status": "error",
                "message": "Column not found"
            }
            continue
        
        n_unique = df[col].n_unique()
        
        try:
            if method == "one_hot":
                # One-hot encoding
                # Limit to top categories if too many
                if n_unique > 50:
                    report["columns_processed"][col] = {
                        "status": "warning",
                        "message": f"Column has {n_unique} unique values. Consider using frequency or target encoding instead."
                    }
                    continue
                
                # Get dummies
                encoded = df.select(pl.col(col)).to_dummies(columns=[col])
                
                # Add encoded columns to dataframe
                for enc_col in encoded.columns:
                    df = df.with_columns(encoded[enc_col])
                    report["features_created"].append(enc_col)
                
                # Drop original column
                df = df.drop(col)
                
                report["columns_processed"][col] = {
                    "status": "success",
                    "num_features_created": len(encoded.columns)
                }
            
            elif method == "frequency":
                # Frequency encoding
                value_counts = df[col].value_counts()
                freq_map = {
                    row[0]: row[1] / len(df)
                    for row in value_counts.iter_rows()
                }
                
                # Create new column with frequencies
                new_col_name = f"{col}_freq"
                df = df.with_columns(
                    pl.col(col).map_dict(freq_map, default=0.0).alias(new_col_name)
                )
                
                # Drop original column
                df = df.drop(col)
                
                report["features_created"].append(new_col_name)
                report["columns_processed"][col] = {
                    "status": "success",
                    "num_features_created": 1
                }
            
            elif method == "target":
                # Target encoding (mean encoding)
                # Calculate mean target value for each category
                target_means = (
                    df.group_by(col)
                    .agg(pl.col(target_col).mean().alias("target_mean"))
                )
                
                # Create dictionary for mapping
                target_map = {
                    row[0]: row[1]
                    for row in target_means.iter_rows()
                }
                
                # Global mean for unseen categories
                global_mean = df[target_col].mean()
                
                # Create new column with target encoding
                new_col_name = f"{col}_target_enc"
                df = df.with_columns(
                    pl.col(col).map_dict(target_map, default=global_mean).alias(new_col_name)
                )
                
                # Drop original column
                df = df.drop(col)
                
                report["features_created"].append(new_col_name)
                report["columns_processed"][col] = {
                    "status": "success",
                    "num_features_created": 1
                }
        
        except Exception as e:
            report["columns_processed"][col] = {
                "status": "error",
                "message": str(e)
            }
    
    report["total_features_created"] = len(report["features_created"])
    
    # Save dataset
    Path(output_path).parent.mkdir(parents=True, exist_ok=True)
    save_dataframe(df, output_path)
    report["output_path"] = output_path
    
    return report