Spaces:
Sleeping
Sleeping
| import os | |
| import yaml | |
| import pandas as pd | |
| import joblib | |
| from typing import Dict, Any | |
| from utils.telemetry_state import TelemetryState | |
| class TelemetryInferenceEngineLite: | |
| def __init__(self): | |
| BASE_DIR = os.path.abspath(os.path.dirname(__file__)) # /app/src | |
| PROJECT_ROOT = os.path.abspath(os.path.join(BASE_DIR, "..")) # /app | |
| feature_path = os.path.join(BASE_DIR, "config", "features.yaml") | |
| if not os.path.exists(feature_path): | |
| raise FileNotFoundError(f"Feature config not found at: {feature_path}") | |
| with open(feature_path) as f: | |
| self.features_cfg = yaml.safe_load(f) | |
| self.models = { | |
| "lap_time_regressor": joblib.load( | |
| os.path.join(PROJECT_ROOT, "models/regression/model.pkl") | |
| ), | |
| "gear_classifier": joblib.load( | |
| os.path.join(PROJECT_ROOT, "models/classification/model.pkl") | |
| ), | |
| "driving_behavior": joblib.load( | |
| os.path.join(PROJECT_ROOT, "models/clustering/model.pkl") | |
| ), | |
| } | |
| self.scaler = joblib.load( | |
| os.path.join(PROJECT_ROOT, "models/clustering/scaler.pkl") | |
| ) | |
| self.state = TelemetryState() | |
| def process_row(self, row: pd.Series) -> Dict[str, Any]: | |
| output = { | |
| "lap_number": row.get("lap_number"), | |
| "race_position": row.get("race_position"), | |
| "driving_behavior": None, | |
| } | |
| # -------- REGRESSION -------- | |
| reg_feats = self.features_cfg["lap_time_regressor"]["features"] | |
| X_reg = row[reg_feats].values.reshape(1, -1) | |
| output["predicted_lap_time"] = float( | |
| self.models["lap_time_regressor"].predict(X_reg)[0] | |
| ) | |
| # -------- CLASSIFICATION -------- | |
| clf_feats = self.features_cfg["gear_classifier"]["features"] | |
| X_clf = row[clf_feats].values.reshape(1, -1) | |
| output["predicted_gear"] = int( | |
| self.models["gear_classifier"].predict(X_clf)[0] | |
| ) | |
| # -------- CLUSTERING -------- | |
| completed_lap = self.state.update(row) | |
| if completed_lap is not None: | |
| agg_map = self.features_cfg["driving_behavior"]["aggregation"] | |
| required_cols = list(agg_map.keys()) | |
| completed_lap = completed_lap.reindex( | |
| columns=required_cols, | |
| fill_value=0.0 | |
| ) | |
| agg_df = completed_lap.agg(agg_map) | |
| feature_row = {} | |
| for feature, stats in agg_map.items(): | |
| for stat in stats: | |
| feature_row[f"{feature}_{stat}"] = agg_df.loc[stat, feature] | |
| lap_features = pd.DataFrame([feature_row]).fillna(0.0) | |
| label = self.models["driving_behavior"].predict(lap_features)[0] | |
| output["driving_behavior"] = ( | |
| "Aggressive Driving" if label == 1 else "Smooth Driving" | |
| ) | |
| return output |