import os import yaml import pandas as pd import joblib from typing import Dict, Any from utils.telemetry_state import TelemetryState class TelemetryInferenceEngineLite: def __init__(self): BASE_DIR = os.path.abspath(os.path.dirname(__file__)) # /app/src PROJECT_ROOT = os.path.abspath(os.path.join(BASE_DIR, "..")) # /app feature_path = os.path.join(BASE_DIR, "config", "features.yaml") if not os.path.exists(feature_path): raise FileNotFoundError(f"Feature config not found at: {feature_path}") with open(feature_path) as f: self.features_cfg = yaml.safe_load(f) self.models = { "lap_time_regressor": joblib.load( os.path.join(PROJECT_ROOT, "models/regression/model.pkl") ), "gear_classifier": joblib.load( os.path.join(PROJECT_ROOT, "models/classification/model.pkl") ), "driving_behavior": joblib.load( os.path.join(PROJECT_ROOT, "models/clustering/model.pkl") ), } self.scaler = joblib.load( os.path.join(PROJECT_ROOT, "models/clustering/scaler.pkl") ) self.state = TelemetryState() def process_row(self, row: pd.Series) -> Dict[str, Any]: output = { "lap_number": row.get("lap_number"), "race_position": row.get("race_position"), "driving_behavior": None, } # -------- REGRESSION -------- reg_feats = self.features_cfg["lap_time_regressor"]["features"] X_reg = row[reg_feats].values.reshape(1, -1) output["predicted_lap_time"] = float( self.models["lap_time_regressor"].predict(X_reg)[0] ) # -------- CLASSIFICATION -------- clf_feats = self.features_cfg["gear_classifier"]["features"] X_clf = row[clf_feats].values.reshape(1, -1) output["predicted_gear"] = int( self.models["gear_classifier"].predict(X_clf)[0] ) # -------- CLUSTERING -------- completed_lap = self.state.update(row) if completed_lap is not None: agg_map = self.features_cfg["driving_behavior"]["aggregation"] required_cols = list(agg_map.keys()) completed_lap = completed_lap.reindex( columns=required_cols, fill_value=0.0 ) agg_df = completed_lap.agg(agg_map) feature_row = {} for feature, stats in agg_map.items(): for stat in stats: feature_row[f"{feature}_{stat}"] = agg_df.loc[stat, feature] lap_features = pd.DataFrame([feature_row]).fillna(0.0) label = self.models["driving_behavior"].predict(lap_features)[0] output["driving_behavior"] = ( "Aggressive Driving" if label == 1 else "Smooth Driving" ) return output