Spaces:
Sleeping
Sleeping
File size: 2,980 Bytes
b5c1e09 a5446fc 69c8b54 a5446fc b5c1e09 a5446fc 1e5badb b5c1e09 1e5badb b5c1e09 a5446fc b5c1e09 1e5badb b5c1e09 1e5badb b5c1e09 1e5badb b5c1e09 a5446fc b5c1e09 1e5badb b5c1e09 a5446fc 1e5badb a5446fc b5c1e09 a5446fc b5c1e09 a5446fc b5c1e09 a5446fc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | import os
import yaml
import pandas as pd
import joblib
from typing import Dict, Any
from utils.telemetry_state import TelemetryState
class TelemetryInferenceEngineLite:
def __init__(self):
BASE_DIR = os.path.abspath(os.path.dirname(__file__)) # /app/src
PROJECT_ROOT = os.path.abspath(os.path.join(BASE_DIR, "..")) # /app
feature_path = os.path.join(BASE_DIR, "config", "features.yaml")
if not os.path.exists(feature_path):
raise FileNotFoundError(f"Feature config not found at: {feature_path}")
with open(feature_path) as f:
self.features_cfg = yaml.safe_load(f)
self.models = {
"lap_time_regressor": joblib.load(
os.path.join(PROJECT_ROOT, "models/regression/model.pkl")
),
"gear_classifier": joblib.load(
os.path.join(PROJECT_ROOT, "models/classification/model.pkl")
),
"driving_behavior": joblib.load(
os.path.join(PROJECT_ROOT, "models/clustering/model.pkl")
),
}
self.scaler = joblib.load(
os.path.join(PROJECT_ROOT, "models/clustering/scaler.pkl")
)
self.state = TelemetryState()
def process_row(self, row: pd.Series) -> Dict[str, Any]:
output = {
"lap_number": row.get("lap_number"),
"race_position": row.get("race_position"),
"driving_behavior": None,
}
# -------- REGRESSION --------
reg_feats = self.features_cfg["lap_time_regressor"]["features"]
X_reg = row[reg_feats].values.reshape(1, -1)
output["predicted_lap_time"] = float(
self.models["lap_time_regressor"].predict(X_reg)[0]
)
# -------- CLASSIFICATION --------
clf_feats = self.features_cfg["gear_classifier"]["features"]
X_clf = row[clf_feats].values.reshape(1, -1)
output["predicted_gear"] = int(
self.models["gear_classifier"].predict(X_clf)[0]
)
# -------- CLUSTERING --------
completed_lap = self.state.update(row)
if completed_lap is not None:
agg_map = self.features_cfg["driving_behavior"]["aggregation"]
required_cols = list(agg_map.keys())
completed_lap = completed_lap.reindex(
columns=required_cols,
fill_value=0.0
)
agg_df = completed_lap.agg(agg_map)
feature_row = {}
for feature, stats in agg_map.items():
for stat in stats:
feature_row[f"{feature}_{stat}"] = agg_df.loc[stat, feature]
lap_features = pd.DataFrame([feature_row]).fillna(0.0)
label = self.models["driving_behavior"].predict(lap_features)[0]
output["driving_behavior"] = (
"Aggressive Driving" if label == 1 else "Smooth Driving"
)
return output |