File size: 5,007 Bytes
c65b1a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""
    This script is used to train a model and save it to S3.
    It will require the following environment variables:
    - AWS_ACCESS_KEY_ID
    - AWS_SECRET_ACCESS_KEY
    - AWS_REGION
    - MLFLOW_TRACKING_URI
"""


import pandas as pd
import mlflow
import mlflow.sklearn
import pickle
import boto3
import datetime
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV
import os
from dotenv import load_dotenv

def average_columns_with_suffix(df, suffix):
    """Calculate average of columns with given suffix."""
    # Filter columns matching the suffix
    cols = df.filter(regex=f".*{suffix}")
    
    # Convert all values to numeric, setting errors='coerce' to convert non-numeric to NaN
    cols = cols.apply(pd.to_numeric, errors='coerce')
    
    # Return row-wise mean
    return cols.mean(axis=1)

def load_and_process_data(parquet_path):
    """Load and process the parquet data."""
    # Load data
    df = pd.read_parquet(parquet_path)
    df = df.apply(lambda x: x.replace("Inconnu", None))

    # Traffic status dictionary
    traffic_status = {
        None: None,
        "Fluide": 0.,   # freeflow in realtime api
        "Pré-saturé": 1., # heavy in realtime api
        "Saturé": 1., # heavy in realtime api
        "Bloqué": 2. # congested in realtime api
    }

    # Replace values in columns ending with 'Etat trafic'
    for col in df.columns:
        if col.endswith("Etat trafic"):
            df[col] = df[col].map(traffic_status)

    # Create final dataframe
    dict_of_columns = {
        "Timestamp": df["Timestamp"].copy(),
        "Pressure": average_columns_with_suffix(df, "_PSTAT"),
        "Temperature": average_columns_with_suffix(df, "_T"),
        "Wind Speed": average_columns_with_suffix(df, "_FF"),
        "Humidity": average_columns_with_suffix(df, "_U"),
        "Traffic Status": average_columns_with_suffix(df, "_Etat trafic"),
        "NOX": average_columns_with_suffix(df, "NOX"),
        "PM10": average_columns_with_suffix(df, "PM10"),
        "PM25": average_columns_with_suffix(df, "PM25"),
        "O3": average_columns_with_suffix(df, "O3"),
    }

    final_df = pd.concat(dict_of_columns, axis=1)
    final_df.drop(columns=["Timestamp"], inplace=True)
    final_df.dropna(inplace=True)
    
    return final_df

def train_and_save_model():
    """Main function to train and save the model."""
    # Load environment variables
    load_dotenv()

    # AWS S3 session
    session = boto3.Session(
        aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
        aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
        region_name=os.getenv("AWS_REGION")
    )
    s3 = session.client('s3')

    # Configure MLflow
    mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI"))
    mlflow.set_experiment("air_quality_prediction")

    # Enable autologging
    mlflow.sklearn.autolog()

    with mlflow.start_run():
        # Load and prepare data
        final_df = load_and_process_data("2024_semester2_merged_v2.parquet")

        # Define features and target
        x_columns = ["Pressure", "Temperature", "Wind Speed", "Humidity", "Traffic Status"]
        y_columns = ["NOX", "PM10", "PM25", "O3"]
        
        X = final_df[x_columns].copy()
        y = final_df[y_columns].copy()

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

        # Grid search
        param_grid = {
            "n_estimators": [5, 10, 20, 50, 100, 200, 300],
        }

        base_model = RandomForestRegressor(random_state=42)
        grid_search = GridSearchCV(
            estimator=base_model,
            param_grid=param_grid,
            cv=3,
            n_jobs=-1,
            scoring="r2"
        )

        model_base_name = "random_forest_grid_search"
        grid_search.fit(X_train, y_train)
        model = grid_search.best_estimator_

        # Save model
        model_filename = model_base_name + ".pkl"
        model_filename_for_s3 = model_base_name + "_" + datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + ".pkl"

        # Save locally
        with open(model_filename, "wb") as f:
            pickle.dump(model, f)

        # Upload to S3
        s3.upload_file(model_filename, "jedha-quality-air", f"models/{model_filename_for_s3}")
        print(f"Model saved to S3 as {model_filename_for_s3}")

        # Evaluate
        score = model.score(X_test, y_test)
        print(f"\nTest Score: {score:.4f}")

        # Test prediction
        test_values = {
            "Pressure": 999,
            "Temperature": 22,
            "Wind Speed": 10,
            "Humidity": 50,
            "Traffic Status": 0,
        }
        prediction = model.predict(pd.DataFrame([test_values]))
        print("\nTest prediction:", prediction)
        print("\nModel Parameters:", model.get_params())

if __name__ == "__main__":
    train_and_save_model()