|
|
""" |
|
|
encode.py – Traffic data encoder for LSTM traffic flow prediction |
|
|
|
|
|
This module provides TrafficDataEncoder for processing 5-minute traffic sensor data |
|
|
into sequences suitable for LSTM training. Key features: |
|
|
- Sensor-safe windowing (no cross-sensor leakage) |
|
|
- Feature engineering (time, geographic, categorical) |
|
|
- Speed-based class weighting support |
|
|
- Robust missing value handling |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from sklearn.preprocessing import OrdinalEncoder, StandardScaler |
|
|
from sklearn.utils.validation import check_is_fitted |
|
|
from typing import List, Tuple, Dict, Optional |
|
|
import joblib |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
class TrafficDataEncoder: |
|
|
""" |
|
|
Encodes traffic sensor data into sequences for LSTM training. |
|
|
|
|
|
Features: |
|
|
- Geographic coordinates (lat/lon -> x/y km) |
|
|
- Time features (hour, day of week) |
|
|
- Categorical encoding (direction, weather) |
|
|
- Speed-based class weighting |
|
|
- Sensor-safe windowing |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
seq_len: int = 12, |
|
|
horizon: int = 1, |
|
|
target_col: str = "speed_mph" |
|
|
): |
|
|
self.seq_len = seq_len |
|
|
self.horizon = horizon |
|
|
self.target_col = target_col |
|
|
|
|
|
|
|
|
self.cat_cols = ["direction", "weather"] |
|
|
self.num_cols = [ |
|
|
"lanes", "% Observed", "Latitude", "Longitude", |
|
|
"hour_sin", "hour_cos", "dow_sin", "dow_cos" |
|
|
] |
|
|
|
|
|
|
|
|
self.ordinal_encoder: Optional[OrdinalEncoder] = None |
|
|
self.scaler: Optional[StandardScaler] = None |
|
|
self.num_medians: Dict[str, float] = {} |
|
|
self.is_fitted = False |
|
|
|
|
|
def _ensure_sensor_id_and_sort(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Create sensor_id and sort by sensor and time.""" |
|
|
df = df.copy() |
|
|
|
|
|
|
|
|
if "sensor_id" not in df.columns: |
|
|
df["sensor_id"] = ( |
|
|
df["Latitude"].round(6).astype(str) + ";" + |
|
|
df["Longitude"].round(6).astype(str) |
|
|
) |
|
|
|
|
|
|
|
|
df["Time"] = pd.to_datetime(df["Time"], errors="coerce") |
|
|
return df.sort_values(["sensor_id", "Time"]).reset_index(drop=True) |
|
|
|
|
|
def _add_time_features(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Add cyclical time features.""" |
|
|
dt = pd.to_datetime(df["Time"], errors="coerce") |
|
|
hour = dt.dt.hour + dt.dt.minute / 60.0 |
|
|
dow = dt.dt.dayofweek |
|
|
|
|
|
df["hour_sin"] = np.sin(2 * np.pi * hour / 24) |
|
|
df["hour_cos"] = np.cos(2 * np.pi * hour / 24) |
|
|
df["dow_sin"] = np.sin(2 * np.pi * dow / 7) |
|
|
df["dow_cos"] = np.cos(2 * np.pi * dow / 7) |
|
|
|
|
|
return df |
|
|
|
|
|
def _clean_numeric(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Clean and convert numeric columns.""" |
|
|
|
|
|
df["lanes"] = pd.to_numeric(df.get("lanes", 0), errors="coerce") |
|
|
|
|
|
|
|
|
df["% Observed"] = pd.to_numeric(df.get("% Observed", 100), errors="coerce") |
|
|
|
|
|
return df |
|
|
|
|
|
def _compute_speed_weights(self, y: np.ndarray) -> Dict[str, float]: |
|
|
"""Compute class weights for speed-based weighting.""" |
|
|
|
|
|
low_mask = y <= 30 |
|
|
high_mask = y >= 60 |
|
|
medium_mask = ~(low_mask | high_mask) |
|
|
|
|
|
n_low = low_mask.sum() |
|
|
n_medium = medium_mask.sum() |
|
|
n_high = high_mask.sum() |
|
|
n_total = len(y) |
|
|
|
|
|
print(f"Speed distribution:") |
|
|
print(f" Low (≤30): {n_low} samples ({n_low/n_total*100:.1f}%)") |
|
|
print(f" Medium (30-60): {n_medium} samples ({n_medium/n_total*100:.1f}%)") |
|
|
print(f" High (≥60): {n_high} samples ({n_high/n_total*100:.1f}%)") |
|
|
|
|
|
|
|
|
if n_low > 0 and n_medium > 0 and n_high > 0: |
|
|
weight_low = n_total / (3 * n_low) |
|
|
weight_medium = n_total / (3 * n_medium) |
|
|
weight_high = n_total / (3 * n_high) |
|
|
else: |
|
|
weight_low = weight_medium = weight_high = 1.0 |
|
|
|
|
|
print(f"Class weights: Low={weight_low:.2f}, Medium={weight_medium:.2f}, High={weight_high:.2f}") |
|
|
|
|
|
return { |
|
|
"weight_low": weight_low, |
|
|
"weight_medium": weight_medium, |
|
|
"weight_high": weight_high, |
|
|
"low_threshold": 30, |
|
|
"high_threshold": 60 |
|
|
} |
|
|
|
|
|
def fit(self, df: pd.DataFrame) -> "TrafficDataEncoder": |
|
|
"""Fit the encoder on training data.""" |
|
|
print("Fitting encoder...") |
|
|
|
|
|
|
|
|
df = self._ensure_sensor_id_and_sort(df) |
|
|
df = self._add_time_features(df) |
|
|
df = self._clean_numeric(df) |
|
|
|
|
|
|
|
|
df[self.cat_cols] = df[self.cat_cols].fillna("UNK") |
|
|
self.num_medians = df[self.num_cols].median(numeric_only=True).to_dict() |
|
|
df[self.num_cols] = df[self.num_cols].fillna(self.num_medians) |
|
|
|
|
|
|
|
|
self.ordinal_encoder = OrdinalEncoder( |
|
|
handle_unknown="use_encoded_value", |
|
|
unknown_value=-1 |
|
|
) |
|
|
self.ordinal_encoder.fit(df[self.cat_cols]) |
|
|
|
|
|
self.scaler = StandardScaler() |
|
|
self.scaler.fit(df[self.num_cols]) |
|
|
|
|
|
self.is_fitted = True |
|
|
print("Encoder fitted successfully") |
|
|
return self |
|
|
|
|
|
def _preprocess(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Apply preprocessing steps.""" |
|
|
df = self._ensure_sensor_id_and_sort(df) |
|
|
df = self._add_time_features(df) |
|
|
df = self._clean_numeric(df) |
|
|
|
|
|
|
|
|
df[self.cat_cols] = df[self.cat_cols].fillna("UNK") |
|
|
df[self.num_cols] = df[self.num_cols].fillna(self.num_medians) |
|
|
|
|
|
return df |
|
|
|
|
|
def transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: |
|
|
""" |
|
|
Transform data into sequences. |
|
|
|
|
|
Returns: |
|
|
X: (N, seq_len, n_features) - input sequences |
|
|
y: (N, horizon) - target values |
|
|
target_indices: (N,) - indices of target rows in original df |
|
|
timestamps: (N,) - timestamps of target rows |
|
|
""" |
|
|
check_is_fitted(self, ["ordinal_encoder", "scaler", "num_medians"]) |
|
|
|
|
|
df = self._preprocess(df) |
|
|
|
|
|
X_chunks = [] |
|
|
y_chunks = [] |
|
|
target_indices = [] |
|
|
timestamps = [] |
|
|
|
|
|
|
|
|
for sensor_id, group in df.groupby("sensor_id", sort=False): |
|
|
if len(group) < self.seq_len + self.horizon: |
|
|
continue |
|
|
|
|
|
|
|
|
cat_features = self.ordinal_encoder.transform(group[self.cat_cols]).astype(np.float32) |
|
|
num_features = self.scaler.transform(group[self.num_cols]).astype(np.float32) |
|
|
features = np.concatenate([num_features, cat_features], axis=1) |
|
|
|
|
|
|
|
|
targets = group[self.target_col].to_numpy(dtype=np.float32) |
|
|
group_timestamps = group["Time"].to_numpy() |
|
|
group_indices = group.index.to_numpy() |
|
|
|
|
|
|
|
|
n_windows = len(group) - self.seq_len - self.horizon + 1 |
|
|
for i in range(n_windows): |
|
|
X_chunks.append(features[i:i + self.seq_len]) |
|
|
y_chunks.append(targets[i + self.seq_len:i + self.seq_len + self.horizon]) |
|
|
target_indices.append(group_indices[i + self.seq_len + self.horizon - 1]) |
|
|
timestamps.append(group_timestamps[i + self.seq_len + self.horizon - 1]) |
|
|
|
|
|
if not X_chunks: |
|
|
|
|
|
n_features = len(self.num_cols) + len(self.cat_cols) |
|
|
return ( |
|
|
np.empty((0, self.seq_len, n_features), dtype=np.float32), |
|
|
np.empty((0, self.horizon), dtype=np.float32), |
|
|
np.empty(0, dtype=int), |
|
|
np.empty(0, dtype=object) |
|
|
) |
|
|
|
|
|
X = np.stack(X_chunks, axis=0) |
|
|
y = np.stack(y_chunks, axis=0) |
|
|
target_indices = np.array(target_indices, dtype=int) |
|
|
timestamps = np.array(timestamps) |
|
|
|
|
|
print(f"Created {len(X)} sequences from {len(df.groupby('sensor_id'))} sensors") |
|
|
return X, y, target_indices, timestamps |
|
|
|
|
|
def fit_transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: |
|
|
"""Fit encoder and transform data in one step.""" |
|
|
return self.fit(df).transform(df) |
|
|
|
|
|
def get_speed_weights(self, y: np.ndarray) -> Dict[str, float]: |
|
|
"""Get speed-based class weights for weighted loss.""" |
|
|
return self._compute_speed_weights(y) |
|
|
|
|
|
def save(self, filepath: str) -> None: |
|
|
"""Save the fitted encoder.""" |
|
|
if not self.is_fitted: |
|
|
raise ValueError("Encoder must be fitted before saving") |
|
|
|
|
|
joblib.dump(self, filepath) |
|
|
print(f"Encoder saved to {filepath}") |
|
|
|
|
|
@classmethod |
|
|
def load(cls, filepath: str) -> "TrafficDataEncoder": |
|
|
"""Load a fitted encoder.""" |
|
|
|
|
|
try: |
|
|
encoder = joblib.load(filepath) |
|
|
if not isinstance(encoder, cls): |
|
|
raise ValueError(f"Loaded object is not a {cls.__name__}") |
|
|
return encoder |
|
|
except AttributeError as e: |
|
|
if "TrafficDataEncoder" in str(e): |
|
|
|
|
|
print("Warning: Encoder was saved from different module context. Reconstructing...") |
|
|
|
|
|
|
|
|
import sys |
|
|
import types |
|
|
|
|
|
|
|
|
original_main = sys.modules.get('__main__') |
|
|
temp_module = types.ModuleType('temp_encode') |
|
|
temp_module.TrafficDataEncoder = cls |
|
|
sys.modules['__main__'] = temp_module |
|
|
|
|
|
try: |
|
|
|
|
|
encoder = joblib.load(filepath) |
|
|
if not isinstance(encoder, cls): |
|
|
raise ValueError(f"Loaded object is not a {cls.__name__}") |
|
|
return encoder |
|
|
finally: |
|
|
|
|
|
if original_main is not None: |
|
|
sys.modules['__main__'] = original_main |
|
|
else: |
|
|
del sys.modules['__main__'] |
|
|
else: |
|
|
raise e |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""CLI interface for encoding data.""" |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Encode traffic data for LSTM training") |
|
|
parser.add_argument("csv_file", help="Path to CSV file with traffic data") |
|
|
parser.add_argument("--seq_len", type=int, default=12, help="Sequence length (default: 12)") |
|
|
parser.add_argument("--horizon", type=int, default=1, help="Prediction horizon (default: 1)") |
|
|
parser.add_argument("--target_col", default="speed_mph", help="Target column name") |
|
|
parser.add_argument("--save_encoder", help="Path to save fitted encoder") |
|
|
parser.add_argument("--output", help="Path to save encoded data (optional)") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
print(f"Loading data from {args.csv_file}") |
|
|
df = pd.read_csv(args.csv_file) |
|
|
|
|
|
|
|
|
encoder = TrafficDataEncoder( |
|
|
seq_len=args.seq_len, |
|
|
horizon=args.horizon, |
|
|
target_col=args.target_col |
|
|
) |
|
|
|
|
|
X, y, target_indices, timestamps = encoder.fit_transform(df) |
|
|
|
|
|
print(f"Encoded data shapes:") |
|
|
print(f" X: {X.shape}") |
|
|
print(f" y: {y.shape}") |
|
|
print(f" Target indices: {len(target_indices)}") |
|
|
print(f" Timestamps: {len(timestamps)}") |
|
|
|
|
|
|
|
|
if args.save_encoder: |
|
|
encoder.save(args.save_encoder) |
|
|
|
|
|
|
|
|
if args.output: |
|
|
np.savez(args.output, X=X, y=y, target_indices=target_indices, timestamps=timestamps) |
|
|
print(f"Encoded data saved to {args.output}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|