TrafCast / model_v3 /predict_road.py
amitom's picture
Minimal app for HF Space
73e9c25
"""
predict_road.py – Predict traffic speeds for all sensors on a specific road and direction
This module provides functions to predict traffic speeds for all sensors on a given road
and direction at a specific time. Designed for map visualization and real-time prediction.
"""
import pandas as pd
import numpy as np
import torch
from typing import List, Dict, Tuple, Optional
from datetime import datetime, timedelta
import joblib
import sys
import os
# Add current directory to path for local imports
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir)
from encode import TrafficDataEncoder
from train_lstm import LSTMRegressor
class RoadPredictor:
"""
Predictor for traffic speeds on specific roads and directions.
This class loads a trained model and encoder, then provides methods to predict
speeds for all sensors on a given road/direction at a specific time.
"""
def __init__(self, model_path: str, encoder_path: str, device: str = "auto"):
"""
Initialize the road predictor.
Args:
model_path: Path to trained model (.pt file)
encoder_path: Path to fitted encoder (.pkl file)
device: Device to use (auto, cpu, cuda, mps)
"""
# Device selection
if device == "auto":
if torch.backends.mps.is_available():
self.device = torch.device("mps")
elif torch.cuda.is_available():
self.device = torch.device("cuda")
else:
self.device = torch.device("cpu")
else:
self.device = torch.device(device)
print(f"Using device: {self.device}")
# Load encoder
print(f"Loading encoder from {encoder_path}")
self.encoder = TrafficDataEncoder.load(encoder_path)
# Load model
print(f"Loading model from {model_path}")
model_state = torch.load(model_path, map_location=self.device)
# Infer model architecture from saved state
n_features = len(self.encoder.num_cols) + len(self.encoder.cat_cols)
# Infer hidden_size from first LSTM layer weights
first_layer_weight_shape = model_state['lstm.weight_ih_l0'].shape
hidden_size = first_layer_weight_shape[0] // 4
# Check if bidirectional
bidirectional = 'lstm.weight_ih_l0_reverse' in model_state
# Infer number of layers
layer_keys = [k for k in model_state.keys() if k.startswith('lstm.weight_ih_l')]
n_layers = len(set([k.split('_l')[1].split('_')[0] for k in layer_keys]))
print(f"Model architecture: hidden_size={hidden_size}, n_layers={n_layers}, bidirectional={bidirectional}")
# Create and load model
self.model = LSTMRegressor(
n_features=n_features,
hidden_size=hidden_size,
n_layers=n_layers,
dropout=0.3,
bidirectional=bidirectional
).to(self.device)
self.model.load_state_dict(model_state)
self.model.eval()
print("Model and encoder loaded successfully")
def get_road_sensors(self, df: pd.DataFrame, road_name: str, direction: str) -> pd.DataFrame:
"""
Get all sensors for a specific road and direction.
Args:
df: DataFrame with traffic data
road_name: Name of the road (e.g., "I 405")
direction: Direction (e.g., "North", "South", "East", "West")
Returns:
DataFrame with unique sensors for the road/direction
"""
# Filter for the specific road and direction
road_data = df[(df['road_name'] == road_name) & (df['direction'] == direction)].copy()
if len(road_data) == 0:
raise ValueError(f"No data found for road '{road_name}' direction '{direction}'")
# Get unique sensors using the actual sensor_id from the data
sensors = road_data.groupby('sensor_id').agg({
'Latitude': 'first',
'Longitude': 'first',
'road_name': 'first',
'direction': 'first',
'lanes': 'first'
}).reset_index()
print(f"Found {len(sensors)} sensors on {road_name} {direction}")
return sensors
def prepare_prediction_data(
self,
df: pd.DataFrame,
road_name: str,
direction: str,
target_time: datetime,
seq_len: int = 12
) -> Tuple[pd.DataFrame, List[str]]:
"""
Prepare data for prediction by getting historical sequences for each sensor.
Args:
df: DataFrame with traffic data
road_name: Name of the road
direction: Direction
target_time: Time to predict for
seq_len: Length of historical sequence needed
Returns:
Tuple of (prepared_data, sensor_ids)
"""
# Get sensors for this road/direction
sensors = self.get_road_sensors(df, road_name, direction)
# Prepare data for each sensor
prepared_data = []
sensor_ids = []
for _, sensor in sensors.iterrows():
sensor_id = sensor['sensor_id']
# Get all data for this sensor
sensor_data = df[df['sensor_id'] == sensor_id].copy()
if len(sensor_data) == 0:
print(f"Warning: No data found for sensor {sensor_id}")
continue
sensor_data = sensor_data.sort_values('Time').reset_index(drop=True)
# Convert Time to datetime
sensor_data['Time'] = pd.to_datetime(sensor_data['Time'])
# Find the closest time to target_time
time_diffs = abs(sensor_data['Time'] - target_time)
if len(time_diffs) == 0:
print(f"Warning: No time data for sensor {sensor_id}")
continue
closest_idx = time_diffs.idxmin()
# Get historical sequence ending at closest time
start_idx = max(0, closest_idx - seq_len + 1)
end_idx = closest_idx + 1
if end_idx - start_idx < seq_len:
# Not enough historical data, skip this sensor
print(f"Warning: Not enough historical data for sensor {sensor_id} (need {seq_len}, have {end_idx - start_idx})")
continue
# Get the sequence
sequence_data = sensor_data.iloc[start_idx:end_idx].copy()
# Ensure we have exactly seq_len points
if len(sequence_data) > seq_len:
sequence_data = sequence_data.tail(seq_len)
# Verify we have the right number of points
if len(sequence_data) != seq_len:
print(f"Warning: Sequence length mismatch for sensor {sensor_id} (expected {seq_len}, got {len(sequence_data)})")
continue
# Add to prepared data
prepared_data.append(sequence_data)
sensor_ids.append(sensor_id)
if not prepared_data:
raise ValueError(f"No sensors with sufficient historical data for {road_name} {direction}")
# Combine all sensor data and ensure proper sorting
combined_data = pd.concat(prepared_data, ignore_index=True)
# Ensure the data is sorted by sensor_id and Time (required by encoder)
combined_data = combined_data.sort_values(['sensor_id', 'Time']).reset_index(drop=True)
# Add time features that the encoder expects
combined_data = self.encoder._add_time_features(combined_data)
print(f"Prepared data for {len(sensor_ids)} sensors")
print(f"Combined data shape: {combined_data.shape}")
print(f"Unique sensors in prepared data: {combined_data['sensor_id'].nunique()}")
return combined_data, sensor_ids
def predict_road_speeds(
self,
df: pd.DataFrame,
road_name: str,
direction: str,
target_time: datetime
) -> pd.DataFrame:
"""
Predict speeds for all sensors on a specific road and direction.
Args:
df: DataFrame with traffic data
road_name: Name of the road (e.g., "I 405")
direction: Direction (e.g., "North", "South", "East", "West")
target_time: Time to predict for
Returns:
DataFrame with predictions for each sensor
"""
print(f"Predicting speeds for {road_name} {direction} at {target_time}")
# Prepare data
prepared_data, sensor_ids = self.prepare_prediction_data(
df, road_name, direction, target_time
)
# Instead of using the encoder's transform method, let's create sequences manually
# since we already have the exact sequences we want
print(f"Creating sequences manually from {len(prepared_data)} rows...")
# Group by sensor and create sequences
X_sequences = []
y_targets = []
sensor_mapping = []
for sensor_id in sensor_ids:
sensor_data = prepared_data[prepared_data['sensor_id'] == sensor_id].sort_values('Time')
if len(sensor_data) >= self.encoder.seq_len:
# Get the last seq_len points as input sequence
sequence_data = sensor_data.tail(self.encoder.seq_len)
# Prepare features
cat_features = self.encoder.ordinal_encoder.transform(sequence_data[self.encoder.cat_cols]).astype(np.float32)
num_features = self.encoder.scaler.transform(sequence_data[self.encoder.num_cols]).astype(np.float32)
features = np.concatenate([num_features, cat_features], axis=1)
X_sequences.append(features)
# For prediction, we don't have a target, so we'll use the last speed as placeholder
last_speed = sequence_data[self.encoder.target_col].iloc[-1]
y_targets.append([last_speed])
sensor_mapping.append(sensor_id)
if not X_sequences:
raise ValueError("No valid sequences found for prediction")
X = np.stack(X_sequences, axis=0)
y = np.array(y_targets)
print(f"Created {len(X_sequences)} sequences with shape {X.shape}")
# Make predictions for each sequence
predictions = []
sensor_info = []
for i, sensor_id in enumerate(sensor_mapping):
# Get the sequence for this sensor
sensor_sequence = X[i:i+1] # Keep batch dimension
# Make prediction
with torch.no_grad():
sensor_sequence_tensor = torch.from_numpy(sensor_sequence).float().to(self.device)
prediction = self.model(sensor_sequence_tensor).cpu().numpy()[0, 0]
predictions.append(prediction)
# Get sensor info
sensor_data = prepared_data[prepared_data['sensor_id'] == sensor_id].iloc[0]
# Get real speed from the most recent data point
real_speed = sensor_data.get('speed_mph', None)
if real_speed is None and 'speed' in sensor_data:
real_speed = sensor_data['speed']
elif real_speed is None and 'Speed' in sensor_data:
real_speed = sensor_data['Speed']
sensor_info.append({
'sensor_id': sensor_id,
'Latitude': sensor_data['Latitude'],
'Longitude': sensor_data['Longitude'],
'road_name': sensor_data['road_name'],
'direction': sensor_data['direction'],
'lanes': sensor_data['lanes'],
'predicted_speed': prediction,
'real_speed': real_speed,
'target_time': target_time
})
# Create results DataFrame
results_df = pd.DataFrame(sensor_info)
print(f"Generated predictions for {len(results_df)} sensors")
print(f"Predicted speed range: {results_df['predicted_speed'].min():.1f} - {results_df['predicted_speed'].max():.1f} mph")
# Print real speed statistics if available
real_speeds = results_df['real_speed'].dropna()
if len(real_speeds) > 0:
print(f"Real speed range: {real_speeds.min():.1f} - {real_speeds.max():.1f} mph")
print(f"Real speed available for {len(real_speeds)}/{len(results_df)} sensors")
else:
print("No real speed data available")
return results_df
def predict_multiple_times(
self,
df: pd.DataFrame,
road_name: str,
direction: str,
target_times: List[datetime]
) -> pd.DataFrame:
"""
Predict speeds for multiple time points.
Args:
df: DataFrame with traffic data
road_name: Name of the road
direction: Direction
target_times: List of times to predict for
Returns:
DataFrame with predictions for all sensors at all times
"""
all_predictions = []
for target_time in target_times:
try:
predictions = self.predict_road_speeds(df, road_name, direction, target_time)
all_predictions.append(predictions)
except Exception as e:
print(f"Error predicting for {target_time}: {e}")
continue
if not all_predictions:
raise ValueError("No successful predictions generated")
# Combine all predictions
combined_df = pd.concat(all_predictions, ignore_index=True)
return combined_df
def predict_road_speeds(
df: pd.DataFrame,
road_name: str,
direction: str,
target_time: datetime,
model_path: str,
encoder_path: str,
device: str = "auto"
) -> pd.DataFrame:
"""
Convenience function to predict speeds for all sensors on a road.
Args:
df: DataFrame with traffic data
road_name: Name of the road (e.g., "I 405")
direction: Direction (e.g., "North", "South", "East", "West")
target_time: Time to predict for
model_path: Path to trained model (.pt file)
encoder_path: Path to fitted encoder (.pkl file)
device: Device to use (auto, cpu, cuda, mps)
Returns:
DataFrame with predictions for each sensor
"""
predictor = RoadPredictor(model_path, encoder_path, device)
return predictor.predict_road_speeds(df, road_name, direction, target_time)
def main():
"""Example usage of the road predictor."""
import argparse
parser = argparse.ArgumentParser(description="Predict traffic speeds for a specific road and direction")
parser.add_argument("--csv", required=True, help="Path to CSV file with traffic data")
parser.add_argument("--model", required=True, help="Path to trained model (.pt file)")
parser.add_argument("--encoder", required=True, help="Path to fitted encoder (.pkl file)")
parser.add_argument("--road", required=True, help="Road name (e.g., 'I 405')")
parser.add_argument("--direction", required=True, help="Direction (e.g., 'North', 'South', 'East', 'West')")
parser.add_argument("--time", required=True, help="Target time (YYYY-MM-DD HH:MM:SS)")
parser.add_argument("--output", help="Path to save predictions CSV")
args = parser.parse_args()
# Load data
print(f"Loading data from {args.csv}")
df = pd.read_csv(args.csv)
# Parse target time
target_time = datetime.strptime(args.time, "%Y-%m-%d %H:%M:%S")
# Make predictions
predictions = predict_road_speeds(
df, args.road, args.direction, target_time,
args.model, args.encoder
)
# Print results
print(f"\nPredictions for {args.road} {args.direction} at {target_time}:")
print("=" * 60)
for _, row in predictions.iterrows():
print(f"Sensor {row['sensor_id']}: {row['predicted_speed']:.1f} mph")
# Save if requested
if args.output:
predictions.to_csv(args.output, index=False)
print(f"\nPredictions saved to {args.output}")
if __name__ == "__main__":
main()