martper56's picture
import training code from main branch
c65b1a0
"""
This script is used to train a model and save it to S3.
It will require the following environment variables:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- AWS_REGION
- MLFLOW_TRACKING_URI
"""
import pandas as pd
import mlflow
import mlflow.sklearn
import pickle
import boto3
import datetime
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV
import os
from dotenv import load_dotenv
def average_columns_with_suffix(df, suffix):
"""Calculate average of columns with given suffix."""
# Filter columns matching the suffix
cols = df.filter(regex=f".*{suffix}")
# Convert all values to numeric, setting errors='coerce' to convert non-numeric to NaN
cols = cols.apply(pd.to_numeric, errors='coerce')
# Return row-wise mean
return cols.mean(axis=1)
def load_and_process_data(parquet_path):
"""Load and process the parquet data."""
# Load data
df = pd.read_parquet(parquet_path)
df = df.apply(lambda x: x.replace("Inconnu", None))
# Traffic status dictionary
traffic_status = {
None: None,
"Fluide": 0., # freeflow in realtime api
"Pré-saturé": 1., # heavy in realtime api
"Saturé": 1., # heavy in realtime api
"Bloqué": 2. # congested in realtime api
}
# Replace values in columns ending with 'Etat trafic'
for col in df.columns:
if col.endswith("Etat trafic"):
df[col] = df[col].map(traffic_status)
# Create final dataframe
dict_of_columns = {
"Timestamp": df["Timestamp"].copy(),
"Pressure": average_columns_with_suffix(df, "_PSTAT"),
"Temperature": average_columns_with_suffix(df, "_T"),
"Wind Speed": average_columns_with_suffix(df, "_FF"),
"Humidity": average_columns_with_suffix(df, "_U"),
"Traffic Status": average_columns_with_suffix(df, "_Etat trafic"),
"NOX": average_columns_with_suffix(df, "NOX"),
"PM10": average_columns_with_suffix(df, "PM10"),
"PM25": average_columns_with_suffix(df, "PM25"),
"O3": average_columns_with_suffix(df, "O3"),
}
final_df = pd.concat(dict_of_columns, axis=1)
final_df.drop(columns=["Timestamp"], inplace=True)
final_df.dropna(inplace=True)
return final_df
def train_and_save_model():
"""Main function to train and save the model."""
# Load environment variables
load_dotenv()
# AWS S3 session
session = boto3.Session(
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
region_name=os.getenv("AWS_REGION")
)
s3 = session.client('s3')
# Configure MLflow
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI"))
mlflow.set_experiment("air_quality_prediction")
# Enable autologging
mlflow.sklearn.autolog()
with mlflow.start_run():
# Load and prepare data
final_df = load_and_process_data("2024_semester2_merged_v2.parquet")
# Define features and target
x_columns = ["Pressure", "Temperature", "Wind Speed", "Humidity", "Traffic Status"]
y_columns = ["NOX", "PM10", "PM25", "O3"]
X = final_df[x_columns].copy()
y = final_df[y_columns].copy()
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
# Grid search
param_grid = {
"n_estimators": [5, 10, 20, 50, 100, 200, 300],
}
base_model = RandomForestRegressor(random_state=42)
grid_search = GridSearchCV(
estimator=base_model,
param_grid=param_grid,
cv=3,
n_jobs=-1,
scoring="r2"
)
model_base_name = "random_forest_grid_search"
grid_search.fit(X_train, y_train)
model = grid_search.best_estimator_
# Save model
model_filename = model_base_name + ".pkl"
model_filename_for_s3 = model_base_name + "_" + datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + ".pkl"
# Save locally
with open(model_filename, "wb") as f:
pickle.dump(model, f)
# Upload to S3
s3.upload_file(model_filename, "jedha-quality-air", f"models/{model_filename_for_s3}")
print(f"Model saved to S3 as {model_filename_for_s3}")
# Evaluate
score = model.score(X_test, y_test)
print(f"\nTest Score: {score:.4f}")
# Test prediction
test_values = {
"Pressure": 999,
"Temperature": 22,
"Wind Speed": 10,
"Humidity": 50,
"Traffic Status": 0,
}
prediction = model.predict(pd.DataFrame([test_values]))
print("\nTest prediction:", prediction)
print("\nModel Parameters:", model.get_params())
if __name__ == "__main__":
train_and_save_model()