Spaces:
Sleeping
Sleeping
File size: 5,007 Bytes
c65b1a0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
"""
This script is used to train a model and save it to S3.
It will require the following environment variables:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- AWS_REGION
- MLFLOW_TRACKING_URI
"""
import pandas as pd
import mlflow
import mlflow.sklearn
import pickle
import boto3
import datetime
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV
import os
from dotenv import load_dotenv
def average_columns_with_suffix(df, suffix):
"""Calculate average of columns with given suffix."""
# Filter columns matching the suffix
cols = df.filter(regex=f".*{suffix}")
# Convert all values to numeric, setting errors='coerce' to convert non-numeric to NaN
cols = cols.apply(pd.to_numeric, errors='coerce')
# Return row-wise mean
return cols.mean(axis=1)
def load_and_process_data(parquet_path):
"""Load and process the parquet data."""
# Load data
df = pd.read_parquet(parquet_path)
df = df.apply(lambda x: x.replace("Inconnu", None))
# Traffic status dictionary
traffic_status = {
None: None,
"Fluide": 0., # freeflow in realtime api
"Pré-saturé": 1., # heavy in realtime api
"Saturé": 1., # heavy in realtime api
"Bloqué": 2. # congested in realtime api
}
# Replace values in columns ending with 'Etat trafic'
for col in df.columns:
if col.endswith("Etat trafic"):
df[col] = df[col].map(traffic_status)
# Create final dataframe
dict_of_columns = {
"Timestamp": df["Timestamp"].copy(),
"Pressure": average_columns_with_suffix(df, "_PSTAT"),
"Temperature": average_columns_with_suffix(df, "_T"),
"Wind Speed": average_columns_with_suffix(df, "_FF"),
"Humidity": average_columns_with_suffix(df, "_U"),
"Traffic Status": average_columns_with_suffix(df, "_Etat trafic"),
"NOX": average_columns_with_suffix(df, "NOX"),
"PM10": average_columns_with_suffix(df, "PM10"),
"PM25": average_columns_with_suffix(df, "PM25"),
"O3": average_columns_with_suffix(df, "O3"),
}
final_df = pd.concat(dict_of_columns, axis=1)
final_df.drop(columns=["Timestamp"], inplace=True)
final_df.dropna(inplace=True)
return final_df
def train_and_save_model():
"""Main function to train and save the model."""
# Load environment variables
load_dotenv()
# AWS S3 session
session = boto3.Session(
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
region_name=os.getenv("AWS_REGION")
)
s3 = session.client('s3')
# Configure MLflow
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI"))
mlflow.set_experiment("air_quality_prediction")
# Enable autologging
mlflow.sklearn.autolog()
with mlflow.start_run():
# Load and prepare data
final_df = load_and_process_data("2024_semester2_merged_v2.parquet")
# Define features and target
x_columns = ["Pressure", "Temperature", "Wind Speed", "Humidity", "Traffic Status"]
y_columns = ["NOX", "PM10", "PM25", "O3"]
X = final_df[x_columns].copy()
y = final_df[y_columns].copy()
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
# Grid search
param_grid = {
"n_estimators": [5, 10, 20, 50, 100, 200, 300],
}
base_model = RandomForestRegressor(random_state=42)
grid_search = GridSearchCV(
estimator=base_model,
param_grid=param_grid,
cv=3,
n_jobs=-1,
scoring="r2"
)
model_base_name = "random_forest_grid_search"
grid_search.fit(X_train, y_train)
model = grid_search.best_estimator_
# Save model
model_filename = model_base_name + ".pkl"
model_filename_for_s3 = model_base_name + "_" + datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + ".pkl"
# Save locally
with open(model_filename, "wb") as f:
pickle.dump(model, f)
# Upload to S3
s3.upload_file(model_filename, "jedha-quality-air", f"models/{model_filename_for_s3}")
print(f"Model saved to S3 as {model_filename_for_s3}")
# Evaluate
score = model.score(X_test, y_test)
print(f"\nTest Score: {score:.4f}")
# Test prediction
test_values = {
"Pressure": 999,
"Temperature": 22,
"Wind Speed": 10,
"Humidity": 50,
"Traffic Status": 0,
}
prediction = model.predict(pd.DataFrame([test_values]))
print("\nTest prediction:", prediction)
print("\nModel Parameters:", model.get_params())
if __name__ == "__main__":
train_and_save_model()
|