Spaces:
Sleeping
Sleeping
| """ | |
| This script is used to train a model and save it to S3. | |
| It will require the following environment variables: | |
| - AWS_ACCESS_KEY_ID | |
| - AWS_SECRET_ACCESS_KEY | |
| - AWS_REGION | |
| - MLFLOW_TRACKING_URI | |
| """ | |
| import pandas as pd | |
| import mlflow | |
| import mlflow.sklearn | |
| import pickle | |
| import boto3 | |
| import datetime | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.ensemble import RandomForestRegressor | |
| from sklearn.model_selection import GridSearchCV | |
| import os | |
| from dotenv import load_dotenv | |
| def average_columns_with_suffix(df, suffix): | |
| """Calculate average of columns with given suffix.""" | |
| # Filter columns matching the suffix | |
| cols = df.filter(regex=f".*{suffix}") | |
| # Convert all values to numeric, setting errors='coerce' to convert non-numeric to NaN | |
| cols = cols.apply(pd.to_numeric, errors='coerce') | |
| # Return row-wise mean | |
| return cols.mean(axis=1) | |
| def load_and_process_data(parquet_path): | |
| """Load and process the parquet data.""" | |
| # Load data | |
| df = pd.read_parquet(parquet_path) | |
| df = df.apply(lambda x: x.replace("Inconnu", None)) | |
| # Traffic status dictionary | |
| traffic_status = { | |
| None: None, | |
| "Fluide": 0., # freeflow in realtime api | |
| "Pré-saturé": 1., # heavy in realtime api | |
| "Saturé": 1., # heavy in realtime api | |
| "Bloqué": 2. # congested in realtime api | |
| } | |
| # Replace values in columns ending with 'Etat trafic' | |
| for col in df.columns: | |
| if col.endswith("Etat trafic"): | |
| df[col] = df[col].map(traffic_status) | |
| # Create final dataframe | |
| dict_of_columns = { | |
| "Timestamp": df["Timestamp"].copy(), | |
| "Pressure": average_columns_with_suffix(df, "_PSTAT"), | |
| "Temperature": average_columns_with_suffix(df, "_T"), | |
| "Wind Speed": average_columns_with_suffix(df, "_FF"), | |
| "Humidity": average_columns_with_suffix(df, "_U"), | |
| "Traffic Status": average_columns_with_suffix(df, "_Etat trafic"), | |
| "NOX": average_columns_with_suffix(df, "NOX"), | |
| "PM10": average_columns_with_suffix(df, "PM10"), | |
| "PM25": average_columns_with_suffix(df, "PM25"), | |
| "O3": average_columns_with_suffix(df, "O3"), | |
| } | |
| final_df = pd.concat(dict_of_columns, axis=1) | |
| final_df.drop(columns=["Timestamp"], inplace=True) | |
| final_df.dropna(inplace=True) | |
| return final_df | |
| def train_and_save_model(): | |
| """Main function to train and save the model.""" | |
| # Load environment variables | |
| load_dotenv() | |
| # AWS S3 session | |
| session = boto3.Session( | |
| aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), | |
| aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), | |
| region_name=os.getenv("AWS_REGION") | |
| ) | |
| s3 = session.client('s3') | |
| # Configure MLflow | |
| mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI")) | |
| mlflow.set_experiment("air_quality_prediction") | |
| # Enable autologging | |
| mlflow.sklearn.autolog() | |
| with mlflow.start_run(): | |
| # Load and prepare data | |
| final_df = load_and_process_data("2024_semester2_merged_v2.parquet") | |
| # Define features and target | |
| x_columns = ["Pressure", "Temperature", "Wind Speed", "Humidity", "Traffic Status"] | |
| y_columns = ["NOX", "PM10", "PM25", "O3"] | |
| X = final_df[x_columns].copy() | |
| y = final_df[y_columns].copy() | |
| # Split data | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) | |
| # Grid search | |
| param_grid = { | |
| "n_estimators": [5, 10, 20, 50, 100, 200, 300], | |
| } | |
| base_model = RandomForestRegressor(random_state=42) | |
| grid_search = GridSearchCV( | |
| estimator=base_model, | |
| param_grid=param_grid, | |
| cv=3, | |
| n_jobs=-1, | |
| scoring="r2" | |
| ) | |
| model_base_name = "random_forest_grid_search" | |
| grid_search.fit(X_train, y_train) | |
| model = grid_search.best_estimator_ | |
| # Save model | |
| model_filename = model_base_name + ".pkl" | |
| model_filename_for_s3 = model_base_name + "_" + datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + ".pkl" | |
| # Save locally | |
| with open(model_filename, "wb") as f: | |
| pickle.dump(model, f) | |
| # Upload to S3 | |
| s3.upload_file(model_filename, "jedha-quality-air", f"models/{model_filename_for_s3}") | |
| print(f"Model saved to S3 as {model_filename_for_s3}") | |
| # Evaluate | |
| score = model.score(X_test, y_test) | |
| print(f"\nTest Score: {score:.4f}") | |
| # Test prediction | |
| test_values = { | |
| "Pressure": 999, | |
| "Temperature": 22, | |
| "Wind Speed": 10, | |
| "Humidity": 50, | |
| "Traffic Status": 0, | |
| } | |
| prediction = model.predict(pd.DataFrame([test_values])) | |
| print("\nTest prediction:", prediction) | |
| print("\nModel Parameters:", model.get_params()) | |
| if __name__ == "__main__": | |
| train_and_save_model() | |