""" This script is used to train a model and save it to S3. It will require the following environment variables: - AWS_ACCESS_KEY_ID - AWS_SECRET_ACCESS_KEY - AWS_REGION - MLFLOW_TRACKING_URI """ import pandas as pd import mlflow import mlflow.sklearn import pickle import boto3 import datetime from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import GridSearchCV import os from dotenv import load_dotenv def average_columns_with_suffix(df, suffix): """Calculate average of columns with given suffix.""" # Filter columns matching the suffix cols = df.filter(regex=f".*{suffix}") # Convert all values to numeric, setting errors='coerce' to convert non-numeric to NaN cols = cols.apply(pd.to_numeric, errors='coerce') # Return row-wise mean return cols.mean(axis=1) def load_and_process_data(parquet_path): """Load and process the parquet data.""" # Load data df = pd.read_parquet(parquet_path) df = df.apply(lambda x: x.replace("Inconnu", None)) # Traffic status dictionary traffic_status = { None: None, "Fluide": 0., # freeflow in realtime api "Pré-saturé": 1., # heavy in realtime api "Saturé": 1., # heavy in realtime api "Bloqué": 2. # congested in realtime api } # Replace values in columns ending with 'Etat trafic' for col in df.columns: if col.endswith("Etat trafic"): df[col] = df[col].map(traffic_status) # Create final dataframe dict_of_columns = { "Timestamp": df["Timestamp"].copy(), "Pressure": average_columns_with_suffix(df, "_PSTAT"), "Temperature": average_columns_with_suffix(df, "_T"), "Wind Speed": average_columns_with_suffix(df, "_FF"), "Humidity": average_columns_with_suffix(df, "_U"), "Traffic Status": average_columns_with_suffix(df, "_Etat trafic"), "NOX": average_columns_with_suffix(df, "NOX"), "PM10": average_columns_with_suffix(df, "PM10"), "PM25": average_columns_with_suffix(df, "PM25"), "O3": average_columns_with_suffix(df, "O3"), } final_df = pd.concat(dict_of_columns, axis=1) final_df.drop(columns=["Timestamp"], inplace=True) final_df.dropna(inplace=True) return final_df def train_and_save_model(): """Main function to train and save the model.""" # Load environment variables load_dotenv() # AWS S3 session session = boto3.Session( aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), region_name=os.getenv("AWS_REGION") ) s3 = session.client('s3') # Configure MLflow mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI")) mlflow.set_experiment("air_quality_prediction") # Enable autologging mlflow.sklearn.autolog() with mlflow.start_run(): # Load and prepare data final_df = load_and_process_data("2024_semester2_merged_v2.parquet") # Define features and target x_columns = ["Pressure", "Temperature", "Wind Speed", "Humidity", "Traffic Status"] y_columns = ["NOX", "PM10", "PM25", "O3"] X = final_df[x_columns].copy() y = final_df[y_columns].copy() # Split data X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) # Grid search param_grid = { "n_estimators": [5, 10, 20, 50, 100, 200, 300], } base_model = RandomForestRegressor(random_state=42) grid_search = GridSearchCV( estimator=base_model, param_grid=param_grid, cv=3, n_jobs=-1, scoring="r2" ) model_base_name = "random_forest_grid_search" grid_search.fit(X_train, y_train) model = grid_search.best_estimator_ # Save model model_filename = model_base_name + ".pkl" model_filename_for_s3 = model_base_name + "_" + datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + ".pkl" # Save locally with open(model_filename, "wb") as f: pickle.dump(model, f) # Upload to S3 s3.upload_file(model_filename, "jedha-quality-air", f"models/{model_filename_for_s3}") print(f"Model saved to S3 as {model_filename_for_s3}") # Evaluate score = model.score(X_test, y_test) print(f"\nTest Score: {score:.4f}") # Test prediction test_values = { "Pressure": 999, "Temperature": 22, "Wind Speed": 10, "Humidity": 50, "Traffic Status": 0, } prediction = model.predict(pd.DataFrame([test_values])) print("\nTest prediction:", prediction) print("\nModel Parameters:", model.get_params()) if __name__ == "__main__": train_and_save_model()