File size: 2,980 Bytes
b5c1e09
a5446fc
 
 
 
69c8b54
a5446fc
b5c1e09
a5446fc
 
1e5badb
b5c1e09
 
 
1e5badb
b5c1e09
 
 
 
 
 
a5446fc
 
 
b5c1e09
1e5badb
b5c1e09
 
1e5badb
b5c1e09
 
1e5badb
b5c1e09
a5446fc
 
b5c1e09
1e5badb
b5c1e09
a5446fc
 
 
1e5badb
a5446fc
 
 
 
 
 
 
 
b5c1e09
 
a5446fc
 
 
 
 
 
b5c1e09
 
a5446fc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b5c1e09
a5446fc
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import os
import yaml
import pandas as pd
import joblib
from typing import Dict, Any
from utils.telemetry_state import TelemetryState


class TelemetryInferenceEngineLite:
    def __init__(self):

        BASE_DIR = os.path.abspath(os.path.dirname(__file__))          # /app/src
        PROJECT_ROOT = os.path.abspath(os.path.join(BASE_DIR, ".."))   # /app


        feature_path = os.path.join(BASE_DIR, "config", "features.yaml")

        if not os.path.exists(feature_path):
            raise FileNotFoundError(f"Feature config not found at: {feature_path}")

        with open(feature_path) as f:
            self.features_cfg = yaml.safe_load(f)

        self.models = {
            "lap_time_regressor": joblib.load(
                os.path.join(PROJECT_ROOT, "models/regression/model.pkl")
            ),
            "gear_classifier": joblib.load(
                os.path.join(PROJECT_ROOT, "models/classification/model.pkl")
            ),
            "driving_behavior": joblib.load(
                os.path.join(PROJECT_ROOT, "models/clustering/model.pkl")
            ),
        }

        self.scaler = joblib.load(
            os.path.join(PROJECT_ROOT, "models/clustering/scaler.pkl")
        )

        self.state = TelemetryState()


    def process_row(self, row: pd.Series) -> Dict[str, Any]:
        output = {
            "lap_number": row.get("lap_number"),
            "race_position": row.get("race_position"),
            "driving_behavior": None,
        }

        # -------- REGRESSION --------
        reg_feats = self.features_cfg["lap_time_regressor"]["features"]
        X_reg = row[reg_feats].values.reshape(1, -1)

        output["predicted_lap_time"] = float(
            self.models["lap_time_regressor"].predict(X_reg)[0]
        )

        # -------- CLASSIFICATION --------
        clf_feats = self.features_cfg["gear_classifier"]["features"]
        X_clf = row[clf_feats].values.reshape(1, -1)

        output["predicted_gear"] = int(
            self.models["gear_classifier"].predict(X_clf)[0]
        )

        # -------- CLUSTERING --------
        completed_lap = self.state.update(row)

        if completed_lap is not None:
            agg_map = self.features_cfg["driving_behavior"]["aggregation"]

            required_cols = list(agg_map.keys())

            completed_lap = completed_lap.reindex(
                columns=required_cols,
                fill_value=0.0
            )

            agg_df = completed_lap.agg(agg_map)

            feature_row = {}

            for feature, stats in agg_map.items():
                for stat in stats:
                    feature_row[f"{feature}_{stat}"] = agg_df.loc[stat, feature]

            lap_features = pd.DataFrame([feature_row]).fillna(0.0)

            label = self.models["driving_behavior"].predict(lap_features)[0]

            output["driving_behavior"] = (
                "Aggressive Driving" if label == 1 else "Smooth Driving"
            )

        return output