Spaces:
Sleeping
Sleeping
| """ | |
| encode.py – Traffic data encoder for LSTM traffic flow prediction | |
| This module provides TrafficDataEncoder for processing 5-minute traffic sensor data | |
| into sequences suitable for LSTM training. Key features: | |
| - Sensor-safe windowing (no cross-sensor leakage) | |
| - Feature engineering (time, geographic, categorical) | |
| - Speed-based class weighting support | |
| - Robust missing value handling | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.preprocessing import OrdinalEncoder, StandardScaler | |
| from sklearn.utils.validation import check_is_fitted | |
| from typing import List, Tuple, Dict, Optional | |
| import joblib | |
| from pathlib import Path | |
| class TrafficDataEncoder: | |
| """ | |
| Encodes traffic sensor data into sequences for LSTM training. | |
| Features: | |
| - Geographic coordinates (lat/lon -> x/y km) | |
| - Time features (hour, day of week) | |
| - Categorical encoding (direction, weather) | |
| - Speed-based class weighting | |
| - Sensor-safe windowing | |
| """ | |
| def __init__( | |
| self, | |
| seq_len: int = 12, # 12 * 5min = 1 hour history | |
| horizon: int = 1, # predict 1 step ahead (5 minutes) | |
| target_col: str = "speed_mph" | |
| ): | |
| self.seq_len = seq_len | |
| self.horizon = horizon | |
| self.target_col = target_col | |
| # Feature columns | |
| self.cat_cols = ["direction", "weather"] | |
| self.num_cols = [ | |
| "lanes", "% Observed", "Latitude", "Longitude", | |
| "hour_sin", "hour_cos", "dow_sin", "dow_cos" | |
| ] | |
| # Fitted components | |
| self.ordinal_encoder: Optional[OrdinalEncoder] = None | |
| self.scaler: Optional[StandardScaler] = None | |
| self.num_medians: Dict[str, float] = {} | |
| self.is_fitted = False | |
| def _ensure_sensor_id_and_sort(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Create sensor_id and sort by sensor and time.""" | |
| df = df.copy() | |
| # Create sensor_id from coordinates | |
| if "sensor_id" not in df.columns: | |
| df["sensor_id"] = ( | |
| df["Latitude"].round(6).astype(str) + ";" + | |
| df["Longitude"].round(6).astype(str) | |
| ) | |
| # Parse time and sort | |
| df["Time"] = pd.to_datetime(df["Time"], errors="coerce") | |
| return df.sort_values(["sensor_id", "Time"]).reset_index(drop=True) | |
| def _add_time_features(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Add cyclical time features.""" | |
| dt = pd.to_datetime(df["Time"], errors="coerce") | |
| hour = dt.dt.hour + dt.dt.minute / 60.0 | |
| dow = dt.dt.dayofweek | |
| df["hour_sin"] = np.sin(2 * np.pi * hour / 24) | |
| df["hour_cos"] = np.cos(2 * np.pi * hour / 24) | |
| df["dow_sin"] = np.sin(2 * np.pi * dow / 7) | |
| df["dow_cos"] = np.cos(2 * np.pi * dow / 7) | |
| return df | |
| def _clean_numeric(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Clean and convert numeric columns.""" | |
| # Ensure lanes is numeric | |
| df["lanes"] = pd.to_numeric(df.get("lanes", 0), errors="coerce") | |
| # Ensure % Observed is numeric | |
| df["% Observed"] = pd.to_numeric(df.get("% Observed", 100), errors="coerce") | |
| return df | |
| def _compute_speed_weights(self, y: np.ndarray) -> Dict[str, float]: | |
| """Compute class weights for speed-based weighting.""" | |
| # Define speed classes based on user's experience | |
| low_mask = y <= 30 | |
| high_mask = y >= 60 | |
| medium_mask = ~(low_mask | high_mask) | |
| n_low = low_mask.sum() | |
| n_medium = medium_mask.sum() | |
| n_high = high_mask.sum() | |
| n_total = len(y) | |
| print(f"Speed distribution:") | |
| print(f" Low (≤30): {n_low} samples ({n_low/n_total*100:.1f}%)") | |
| print(f" Medium (30-60): {n_medium} samples ({n_medium/n_total*100:.1f}%)") | |
| print(f" High (≥60): {n_high} samples ({n_high/n_total*100:.1f}%)") | |
| # Compute inverse frequency weights | |
| if n_low > 0 and n_medium > 0 and n_high > 0: | |
| weight_low = n_total / (3 * n_low) | |
| weight_medium = n_total / (3 * n_medium) | |
| weight_high = n_total / (3 * n_high) | |
| else: | |
| weight_low = weight_medium = weight_high = 1.0 | |
| print(f"Class weights: Low={weight_low:.2f}, Medium={weight_medium:.2f}, High={weight_high:.2f}") | |
| return { | |
| "weight_low": weight_low, | |
| "weight_medium": weight_medium, | |
| "weight_high": weight_high, | |
| "low_threshold": 30, | |
| "high_threshold": 60 | |
| } | |
| def fit(self, df: pd.DataFrame) -> "TrafficDataEncoder": | |
| """Fit the encoder on training data.""" | |
| print("Fitting encoder...") | |
| # Preprocess data | |
| df = self._ensure_sensor_id_and_sort(df) | |
| df = self._add_time_features(df) | |
| df = self._clean_numeric(df) | |
| # Handle missing values | |
| df[self.cat_cols] = df[self.cat_cols].fillna("UNK") | |
| self.num_medians = df[self.num_cols].median(numeric_only=True).to_dict() | |
| df[self.num_cols] = df[self.num_cols].fillna(self.num_medians) | |
| # Fit encoders | |
| self.ordinal_encoder = OrdinalEncoder( | |
| handle_unknown="use_encoded_value", | |
| unknown_value=-1 | |
| ) | |
| self.ordinal_encoder.fit(df[self.cat_cols]) | |
| self.scaler = StandardScaler() | |
| self.scaler.fit(df[self.num_cols]) | |
| self.is_fitted = True | |
| print("Encoder fitted successfully") | |
| return self | |
| def _preprocess(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Apply preprocessing steps.""" | |
| df = self._ensure_sensor_id_and_sort(df) | |
| df = self._add_time_features(df) | |
| df = self._clean_numeric(df) | |
| # Handle missing values using fitted medians | |
| df[self.cat_cols] = df[self.cat_cols].fillna("UNK") | |
| df[self.num_cols] = df[self.num_cols].fillna(self.num_medians) | |
| return df | |
| def transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: | |
| """ | |
| Transform data into sequences. | |
| Returns: | |
| X: (N, seq_len, n_features) - input sequences | |
| y: (N, horizon) - target values | |
| target_indices: (N,) - indices of target rows in original df | |
| timestamps: (N,) - timestamps of target rows | |
| """ | |
| check_is_fitted(self, ["ordinal_encoder", "scaler", "num_medians"]) | |
| df = self._preprocess(df) | |
| X_chunks = [] | |
| y_chunks = [] | |
| target_indices = [] | |
| timestamps = [] | |
| # Process each sensor separately to avoid cross-sensor leakage | |
| for sensor_id, group in df.groupby("sensor_id", sort=False): | |
| if len(group) < self.seq_len + self.horizon: | |
| continue # Not enough data for this sensor | |
| # Encode features | |
| cat_features = self.ordinal_encoder.transform(group[self.cat_cols]).astype(np.float32) | |
| num_features = self.scaler.transform(group[self.num_cols]).astype(np.float32) | |
| features = np.concatenate([num_features, cat_features], axis=1) | |
| # Get targets | |
| targets = group[self.target_col].to_numpy(dtype=np.float32) | |
| group_timestamps = group["Time"].to_numpy() | |
| group_indices = group.index.to_numpy() | |
| # Create sliding windows | |
| n_windows = len(group) - self.seq_len - self.horizon + 1 | |
| for i in range(n_windows): | |
| X_chunks.append(features[i:i + self.seq_len]) | |
| y_chunks.append(targets[i + self.seq_len:i + self.seq_len + self.horizon]) | |
| target_indices.append(group_indices[i + self.seq_len + self.horizon - 1]) | |
| timestamps.append(group_timestamps[i + self.seq_len + self.horizon - 1]) | |
| if not X_chunks: | |
| # Return empty arrays with correct shapes | |
| n_features = len(self.num_cols) + len(self.cat_cols) | |
| return ( | |
| np.empty((0, self.seq_len, n_features), dtype=np.float32), | |
| np.empty((0, self.horizon), dtype=np.float32), | |
| np.empty(0, dtype=int), | |
| np.empty(0, dtype=object) | |
| ) | |
| X = np.stack(X_chunks, axis=0) | |
| y = np.stack(y_chunks, axis=0) | |
| target_indices = np.array(target_indices, dtype=int) | |
| timestamps = np.array(timestamps) | |
| print(f"Created {len(X)} sequences from {len(df.groupby('sensor_id'))} sensors") | |
| return X, y, target_indices, timestamps | |
| def fit_transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: | |
| """Fit encoder and transform data in one step.""" | |
| return self.fit(df).transform(df) | |
| def get_speed_weights(self, y: np.ndarray) -> Dict[str, float]: | |
| """Get speed-based class weights for weighted loss.""" | |
| return self._compute_speed_weights(y) | |
| def save(self, filepath: str) -> None: | |
| """Save the fitted encoder.""" | |
| if not self.is_fitted: | |
| raise ValueError("Encoder must be fitted before saving") | |
| joblib.dump(self, filepath) | |
| print(f"Encoder saved to {filepath}") | |
| def load(cls, filepath: str) -> "TrafficDataEncoder": | |
| """Load a fitted encoder.""" | |
| try: | |
| encoder = joblib.load(filepath) | |
| if not isinstance(encoder, cls): | |
| raise ValueError(f"Loaded object is not a {cls.__name__}") | |
| return encoder | |
| except AttributeError as e: | |
| if "TrafficDataEncoder" in str(e): | |
| # Handle the case where encoder was saved from a different module context | |
| print("Warning: Encoder was saved from different module context. Reconstructing...") | |
| # Use a more robust approach with joblib | |
| import sys | |
| import types | |
| # Temporarily modify sys.modules to include our class | |
| original_main = sys.modules.get('__main__') | |
| temp_module = types.ModuleType('temp_encode') | |
| temp_module.TrafficDataEncoder = cls | |
| sys.modules['__main__'] = temp_module | |
| try: | |
| # Now try loading with the modified module context | |
| encoder = joblib.load(filepath) | |
| if not isinstance(encoder, cls): | |
| raise ValueError(f"Loaded object is not a {cls.__name__}") | |
| return encoder | |
| finally: | |
| # Restore original __main__ module | |
| if original_main is not None: | |
| sys.modules['__main__'] = original_main | |
| else: | |
| del sys.modules['__main__'] | |
| else: | |
| raise e | |
| def main(): | |
| """CLI interface for encoding data.""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Encode traffic data for LSTM training") | |
| parser.add_argument("csv_file", help="Path to CSV file with traffic data") | |
| parser.add_argument("--seq_len", type=int, default=12, help="Sequence length (default: 12)") | |
| parser.add_argument("--horizon", type=int, default=1, help="Prediction horizon (default: 1)") | |
| parser.add_argument("--target_col", default="speed_mph", help="Target column name") | |
| parser.add_argument("--save_encoder", help="Path to save fitted encoder") | |
| parser.add_argument("--output", help="Path to save encoded data (optional)") | |
| args = parser.parse_args() | |
| # Load data | |
| print(f"Loading data from {args.csv_file}") | |
| df = pd.read_csv(args.csv_file) | |
| # Create and fit encoder | |
| encoder = TrafficDataEncoder( | |
| seq_len=args.seq_len, | |
| horizon=args.horizon, | |
| target_col=args.target_col | |
| ) | |
| X, y, target_indices, timestamps = encoder.fit_transform(df) | |
| print(f"Encoded data shapes:") | |
| print(f" X: {X.shape}") | |
| print(f" y: {y.shape}") | |
| print(f" Target indices: {len(target_indices)}") | |
| print(f" Timestamps: {len(timestamps)}") | |
| # Save encoder if requested | |
| if args.save_encoder: | |
| encoder.save(args.save_encoder) | |
| # Save encoded data if requested | |
| if args.output: | |
| np.savez(args.output, X=X, y=y, target_indices=target_indices, timestamps=timestamps) | |
| print(f"Encoded data saved to {args.output}") | |
| if __name__ == "__main__": | |
| main() | |