TrafCast_2 / model_v3 /encode.py
amitom's picture
Initial clean commit (code only)
a08a4d0
"""
encode.py – Traffic data encoder for LSTM traffic flow prediction
This module provides TrafficDataEncoder for processing 5-minute traffic sensor data
into sequences suitable for LSTM training. Key features:
- Sensor-safe windowing (no cross-sensor leakage)
- Feature engineering (time, geographic, categorical)
- Speed-based class weighting support
- Robust missing value handling
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from sklearn.preprocessing import OrdinalEncoder, StandardScaler
from sklearn.utils.validation import check_is_fitted
from typing import List, Tuple, Dict, Optional
import joblib
from pathlib import Path
class TrafficDataEncoder:
"""
Encodes traffic sensor data into sequences for LSTM training.
Features:
- Geographic coordinates (lat/lon -> x/y km)
- Time features (hour, day of week)
- Categorical encoding (direction, weather)
- Speed-based class weighting
- Sensor-safe windowing
"""
def __init__(
self,
seq_len: int = 12, # 12 * 5min = 1 hour history
horizon: int = 1, # predict 1 step ahead (5 minutes)
target_col: str = "speed_mph"
):
self.seq_len = seq_len
self.horizon = horizon
self.target_col = target_col
# Feature columns
self.cat_cols = ["direction", "weather"]
self.num_cols = [
"lanes", "% Observed", "Latitude", "Longitude",
"hour_sin", "hour_cos", "dow_sin", "dow_cos"
]
# Fitted components
self.ordinal_encoder: Optional[OrdinalEncoder] = None
self.scaler: Optional[StandardScaler] = None
self.num_medians: Dict[str, float] = {}
self.is_fitted = False
def _ensure_sensor_id_and_sort(self, df: pd.DataFrame) -> pd.DataFrame:
"""Create sensor_id and sort by sensor and time."""
df = df.copy()
# Create sensor_id from coordinates
if "sensor_id" not in df.columns:
df["sensor_id"] = (
df["Latitude"].round(6).astype(str) + ";" +
df["Longitude"].round(6).astype(str)
)
# Parse time and sort
df["Time"] = pd.to_datetime(df["Time"], errors="coerce")
return df.sort_values(["sensor_id", "Time"]).reset_index(drop=True)
def _add_time_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Add cyclical time features."""
dt = pd.to_datetime(df["Time"], errors="coerce")
hour = dt.dt.hour + dt.dt.minute / 60.0
dow = dt.dt.dayofweek
df["hour_sin"] = np.sin(2 * np.pi * hour / 24)
df["hour_cos"] = np.cos(2 * np.pi * hour / 24)
df["dow_sin"] = np.sin(2 * np.pi * dow / 7)
df["dow_cos"] = np.cos(2 * np.pi * dow / 7)
return df
def _clean_numeric(self, df: pd.DataFrame) -> pd.DataFrame:
"""Clean and convert numeric columns."""
# Ensure lanes is numeric
df["lanes"] = pd.to_numeric(df.get("lanes", 0), errors="coerce")
# Ensure % Observed is numeric
df["% Observed"] = pd.to_numeric(df.get("% Observed", 100), errors="coerce")
return df
def _compute_speed_weights(self, y: np.ndarray) -> Dict[str, float]:
"""Compute class weights for speed-based weighting."""
# Define speed classes based on user's experience
low_mask = y <= 30
high_mask = y >= 60
medium_mask = ~(low_mask | high_mask)
n_low = low_mask.sum()
n_medium = medium_mask.sum()
n_high = high_mask.sum()
n_total = len(y)
print(f"Speed distribution:")
print(f" Low (≤30): {n_low} samples ({n_low/n_total*100:.1f}%)")
print(f" Medium (30-60): {n_medium} samples ({n_medium/n_total*100:.1f}%)")
print(f" High (≥60): {n_high} samples ({n_high/n_total*100:.1f}%)")
# Compute inverse frequency weights
if n_low > 0 and n_medium > 0 and n_high > 0:
weight_low = n_total / (3 * n_low)
weight_medium = n_total / (3 * n_medium)
weight_high = n_total / (3 * n_high)
else:
weight_low = weight_medium = weight_high = 1.0
print(f"Class weights: Low={weight_low:.2f}, Medium={weight_medium:.2f}, High={weight_high:.2f}")
return {
"weight_low": weight_low,
"weight_medium": weight_medium,
"weight_high": weight_high,
"low_threshold": 30,
"high_threshold": 60
}
def fit(self, df: pd.DataFrame) -> "TrafficDataEncoder":
"""Fit the encoder on training data."""
print("Fitting encoder...")
# Preprocess data
df = self._ensure_sensor_id_and_sort(df)
df = self._add_time_features(df)
df = self._clean_numeric(df)
# Handle missing values
df[self.cat_cols] = df[self.cat_cols].fillna("UNK")
self.num_medians = df[self.num_cols].median(numeric_only=True).to_dict()
df[self.num_cols] = df[self.num_cols].fillna(self.num_medians)
# Fit encoders
self.ordinal_encoder = OrdinalEncoder(
handle_unknown="use_encoded_value",
unknown_value=-1
)
self.ordinal_encoder.fit(df[self.cat_cols])
self.scaler = StandardScaler()
self.scaler.fit(df[self.num_cols])
self.is_fitted = True
print("Encoder fitted successfully")
return self
def _preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply preprocessing steps."""
df = self._ensure_sensor_id_and_sort(df)
df = self._add_time_features(df)
df = self._clean_numeric(df)
# Handle missing values using fitted medians
df[self.cat_cols] = df[self.cat_cols].fillna("UNK")
df[self.num_cols] = df[self.num_cols].fillna(self.num_medians)
return df
def transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Transform data into sequences.
Returns:
X: (N, seq_len, n_features) - input sequences
y: (N, horizon) - target values
target_indices: (N,) - indices of target rows in original df
timestamps: (N,) - timestamps of target rows
"""
check_is_fitted(self, ["ordinal_encoder", "scaler", "num_medians"])
df = self._preprocess(df)
X_chunks = []
y_chunks = []
target_indices = []
timestamps = []
# Process each sensor separately to avoid cross-sensor leakage
for sensor_id, group in df.groupby("sensor_id", sort=False):
if len(group) < self.seq_len + self.horizon:
continue # Not enough data for this sensor
# Encode features
cat_features = self.ordinal_encoder.transform(group[self.cat_cols]).astype(np.float32)
num_features = self.scaler.transform(group[self.num_cols]).astype(np.float32)
features = np.concatenate([num_features, cat_features], axis=1)
# Get targets
targets = group[self.target_col].to_numpy(dtype=np.float32)
group_timestamps = group["Time"].to_numpy()
group_indices = group.index.to_numpy()
# Create sliding windows
n_windows = len(group) - self.seq_len - self.horizon + 1
for i in range(n_windows):
X_chunks.append(features[i:i + self.seq_len])
y_chunks.append(targets[i + self.seq_len:i + self.seq_len + self.horizon])
target_indices.append(group_indices[i + self.seq_len + self.horizon - 1])
timestamps.append(group_timestamps[i + self.seq_len + self.horizon - 1])
if not X_chunks:
# Return empty arrays with correct shapes
n_features = len(self.num_cols) + len(self.cat_cols)
return (
np.empty((0, self.seq_len, n_features), dtype=np.float32),
np.empty((0, self.horizon), dtype=np.float32),
np.empty(0, dtype=int),
np.empty(0, dtype=object)
)
X = np.stack(X_chunks, axis=0)
y = np.stack(y_chunks, axis=0)
target_indices = np.array(target_indices, dtype=int)
timestamps = np.array(timestamps)
print(f"Created {len(X)} sequences from {len(df.groupby('sensor_id'))} sensors")
return X, y, target_indices, timestamps
def fit_transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Fit encoder and transform data in one step."""
return self.fit(df).transform(df)
def get_speed_weights(self, y: np.ndarray) -> Dict[str, float]:
"""Get speed-based class weights for weighted loss."""
return self._compute_speed_weights(y)
def save(self, filepath: str) -> None:
"""Save the fitted encoder."""
if not self.is_fitted:
raise ValueError("Encoder must be fitted before saving")
joblib.dump(self, filepath)
print(f"Encoder saved to {filepath}")
@classmethod
def load(cls, filepath: str) -> "TrafficDataEncoder":
"""Load a fitted encoder."""
try:
encoder = joblib.load(filepath)
if not isinstance(encoder, cls):
raise ValueError(f"Loaded object is not a {cls.__name__}")
return encoder
except AttributeError as e:
if "TrafficDataEncoder" in str(e):
# Handle the case where encoder was saved from a different module context
print("Warning: Encoder was saved from different module context. Reconstructing...")
# Use a more robust approach with joblib
import sys
import types
# Temporarily modify sys.modules to include our class
original_main = sys.modules.get('__main__')
temp_module = types.ModuleType('temp_encode')
temp_module.TrafficDataEncoder = cls
sys.modules['__main__'] = temp_module
try:
# Now try loading with the modified module context
encoder = joblib.load(filepath)
if not isinstance(encoder, cls):
raise ValueError(f"Loaded object is not a {cls.__name__}")
return encoder
finally:
# Restore original __main__ module
if original_main is not None:
sys.modules['__main__'] = original_main
else:
del sys.modules['__main__']
else:
raise e
def main():
"""CLI interface for encoding data."""
import argparse
parser = argparse.ArgumentParser(description="Encode traffic data for LSTM training")
parser.add_argument("csv_file", help="Path to CSV file with traffic data")
parser.add_argument("--seq_len", type=int, default=12, help="Sequence length (default: 12)")
parser.add_argument("--horizon", type=int, default=1, help="Prediction horizon (default: 1)")
parser.add_argument("--target_col", default="speed_mph", help="Target column name")
parser.add_argument("--save_encoder", help="Path to save fitted encoder")
parser.add_argument("--output", help="Path to save encoded data (optional)")
args = parser.parse_args()
# Load data
print(f"Loading data from {args.csv_file}")
df = pd.read_csv(args.csv_file)
# Create and fit encoder
encoder = TrafficDataEncoder(
seq_len=args.seq_len,
horizon=args.horizon,
target_col=args.target_col
)
X, y, target_indices, timestamps = encoder.fit_transform(df)
print(f"Encoded data shapes:")
print(f" X: {X.shape}")
print(f" y: {y.shape}")
print(f" Target indices: {len(target_indices)}")
print(f" Timestamps: {len(timestamps)}")
# Save encoder if requested
if args.save_encoder:
encoder.save(args.save_encoder)
# Save encoded data if requested
if args.output:
np.savez(args.output, X=X, y=y, target_indices=target_indices, timestamps=timestamps)
print(f"Encoded data saved to {args.output}")
if __name__ == "__main__":
main()