Race-Telemetry / src /inference.py
nasim-raj-laskar
Updated
1e5badb
import os
import yaml
import pandas as pd
import joblib
from typing import Dict, Any
from utils.telemetry_state import TelemetryState
class TelemetryInferenceEngineLite:
def __init__(self):
BASE_DIR = os.path.abspath(os.path.dirname(__file__)) # /app/src
PROJECT_ROOT = os.path.abspath(os.path.join(BASE_DIR, "..")) # /app
feature_path = os.path.join(BASE_DIR, "config", "features.yaml")
if not os.path.exists(feature_path):
raise FileNotFoundError(f"Feature config not found at: {feature_path}")
with open(feature_path) as f:
self.features_cfg = yaml.safe_load(f)
self.models = {
"lap_time_regressor": joblib.load(
os.path.join(PROJECT_ROOT, "models/regression/model.pkl")
),
"gear_classifier": joblib.load(
os.path.join(PROJECT_ROOT, "models/classification/model.pkl")
),
"driving_behavior": joblib.load(
os.path.join(PROJECT_ROOT, "models/clustering/model.pkl")
),
}
self.scaler = joblib.load(
os.path.join(PROJECT_ROOT, "models/clustering/scaler.pkl")
)
self.state = TelemetryState()
def process_row(self, row: pd.Series) -> Dict[str, Any]:
output = {
"lap_number": row.get("lap_number"),
"race_position": row.get("race_position"),
"driving_behavior": None,
}
# -------- REGRESSION --------
reg_feats = self.features_cfg["lap_time_regressor"]["features"]
X_reg = row[reg_feats].values.reshape(1, -1)
output["predicted_lap_time"] = float(
self.models["lap_time_regressor"].predict(X_reg)[0]
)
# -------- CLASSIFICATION --------
clf_feats = self.features_cfg["gear_classifier"]["features"]
X_clf = row[clf_feats].values.reshape(1, -1)
output["predicted_gear"] = int(
self.models["gear_classifier"].predict(X_clf)[0]
)
# -------- CLUSTERING --------
completed_lap = self.state.update(row)
if completed_lap is not None:
agg_map = self.features_cfg["driving_behavior"]["aggregation"]
required_cols = list(agg_map.keys())
completed_lap = completed_lap.reindex(
columns=required_cols,
fill_value=0.0
)
agg_df = completed_lap.agg(agg_map)
feature_row = {}
for feature, stats in agg_map.items():
for stat in stats:
feature_row[f"{feature}_{stat}"] = agg_df.loc[stat, feature]
lap_features = pd.DataFrame([feature_row]).fillna(0.0)
label = self.models["driving_behavior"].predict(lap_features)[0]
output["driving_behavior"] = (
"Aggressive Driving" if label == 1 else "Smooth Driving"
)
return output