import sys, os import json import pandas as pd from typing import Any from pathlib import Path from .data_preprocessing import DataPreprocessing from src.core.constants import PARAMS_FILE from src.core.logger import logging from src.core.exception import AppException from src.core.configuration import AppConfiguration from src.utils import create_directory, read_yaml, load_obj from sklearn.metrics import (accuracy_score, precision_score, recall_score,f1_score, roc_auc_score) import gc import dagshub import mlflow from mlflow.xgboost import log_model from src.script.model_wrapper import CustomModel from dotenv import load_dotenv load_dotenv() # get environment variables uri = os.getenv("MLFLOW_URI") dagshub_token = os.getenv("DAGSHUB_TOKEN") dagshub_username = os.getenv("OWNER") if not dagshub_token or not dagshub_username: raise EnvironmentError("Dagshub environment variables is not set") os.environ["MLFLOW_TRACKING_USERNAME"] = dagshub_username os.environ["MLFLOW_TRACKING_PASSWORD"] = dagshub_token mlflow.set_tracking_uri(uri) # type: ignore # For local use # ================================================================================= # repo_owner = os.getenv("OWNER") # repo_name = os.getenv("REPO") # # mlflow.set_tracking_uri(uri) # if repo_owner is None: # raise EnvironmentError("Missing dagshub logging environment credentials.") # dagshub.init(repo_owner=repo_owner, repo_name=repo_name, mlflow=True) # ================================================================================== class ModelEvaluation: def __init__(self, config = AppConfiguration()): """ Initializes the ModelEvaluation object by creating a model evaluation configuration. Args: config (AppConfiguration): The configuration object containing the application configuration. """ try: self.evaluation_config = config.model_evaluation_config() except Exception as e: logging.error(f"Failed to create model evaluation configuration: {e}", exc_info=True) raise AppException(e, sys) def save_report(self, report: dict): """ Saves the model evaluation metrics to a JSON file in the "reports" directory. Args: report (dict): A dictionary containing the model evaluation metrics. """ try: create_directory(Path("reports")) with open("reports/metrics.json", 'w') as f: json.dump(report, f, indent=4) except Exception as e: logging.error(f"Failed to save model evaluation report: {e}", exc_info=True) raise AppException(e, sys) def save_experiment_info(self, model_name: str, run_id: str): """ Saves the model name and the Mlflow run ID to a JSON file for logging purposes. Args: model_name (str): The name of the model run_id (str): The Mlflow run ID """ try: experiment_info = { 'model' : model_name, 'run_id' : run_id, } exp_info_path = Path("reports/experiment.json") with open(exp_info_path, 'w') as f: json.dump(experiment_info, f, indent=4) except Exception as e: logging.error(f"Failed to save model experiment info: {e}", exc_info=True) raise AppException(e, sys) def evaluate(self, model:Any, model_name:str, vectorizer:Any, vectorizer_name:str, eval_threshold:float, df:pd.DataFrame) -> tuple[dict, dict]: """ Evaluates the given model and vectorizer on the given dataset. Args: model (Any): The model to evaluate. model_name (str): The name of the model. vectorizer (Any): The vectorizer to use. vectorizer_name (str): The name of the vectorizer. df (pd.DataFrame): The test dataset to evaluate on. Returns: tuple: A tuple containing the evaluation report and the model parameters. """ try: X_test = df.drop(columns='Label') # Perform feature extraction X_test = vectorizer.transform(X_test["Content"]) logging.info(f"Evaluating model: {model_name}") # Predict using the model y_probs = model.predict_proba(X_test)[:, 1] y_pred = (y_probs >= eval_threshold).astype(int) # Get the true labels y_test = df['Label'].values evaluation_report = { "model": model_name, "vectorizer": vectorizer_name, "threshold": eval_threshold, "accuracy": accuracy_score(y_test, y_pred), # type: ignore "precision": precision_score(y_test, y_pred), # type: ignore "recall": recall_score(y_test, y_pred), # type: ignore "f1 score": f1_score(y_test, y_pred), # type: ignore "roc_auc": roc_auc_score(y_test, y_probs) # type: ignore } # Get XGBoost model parameters model_params: dict = model.get_xgb_params() return ( evaluation_report, model_params ) except Exception as e: logging.error(f"Failed in model evaluation process: {e}", exc_info=True) raise AppException(e, sys) def initiate_model_evaluation(): """ Initiates the model evaluation process by creating a ModelEvaluation object, which then evaluates the model using the evaluate method and logs the evaluation metrics in Mlflow. """ eval_obj = ModelEvaluation() preprocessor = DataPreprocessing() logging.info(f"{'='*20}Model Evaluation{'='*20}") # get model name config_params = read_yaml(PARAMS_FILE) model_name = config_params.model_training.model_name vectorizer_name = config_params.feature_engineering.vectorizer eval_threshold = config_params.model_evaluation.threshold test_data_path = eval_obj.evaluation_config.test_data_path model_path = eval_obj.evaluation_config.models_dir test_df = pd.read_parquet(test_data_path) test_df.dropna(how='any', inplace=True) processed_test_df = preprocessor.preprocess(test_df, filename="preprocessed_test_data.feather") mlflow.set_experiment("DVC Pipeline Model Experiments") with mlflow.start_run(run_name=model_name) as run: try: model = load_obj(location_path=model_path, obj_name="model.joblib") vectorizer = load_obj(location_path=model_path, obj_name="vectorizer.joblib") evaluation_report, model_params = eval_obj.evaluate(model=model, model_name=model_name, vectorizer=vectorizer, vectorizer_name=vectorizer_name, eval_threshold=eval_threshold, df=processed_test_df) eval_obj.save_report(evaluation_report) logging.info("Logging model and different parameters in Mlflow") # Log model evaluation metrics in Mlflow # remove the model and vectorizer name from the report evaluation_report.pop("model", None) evaluation_report.pop("vectorizer", None) for metric_name, metric_score in evaluation_report.items(): mlflow.log_metric(metric_name, metric_score) # log model parameters in Mlflow if model_params is not None: for param_name, param_value in model_params.items(): mlflow.log_param(param_name, param_value) mlflow.log_param("model", model_name) mlflow.log_param("vectorizer", vectorizer_name) else: logging.warning("No model parameters found. Skipping parameter logging") # Custom Model wraps the classifer and the vectorizer into a single Python Model object final_model = CustomModel(model=model, vectorizer=vectorizer) # Log the model and artifacts mlflow.pyfunc.log_model( artifact_path = model_name, python_model = final_model, artifacts={"vectorizer": os.path.join(model_path, "vectorizer.joblib"), "model": os.path.join(model_path, "model.joblib"), "booster": os.path.join(model_path, "booster.json"), "metrics": os.path.join("reports", "metrics.json")} ) # save model info eval_obj.save_experiment_info(model_name=model_name, run_id=run.info.run_id) logging.info("Model evaluation completed") #free memory del test_df, processed_test_df, model, vectorizer gc.collect() except Exception as e: logging.error(f"Error during model evaluation: {e}", exc_info=True) raise AppException(e, sys) if __name__ == "__main__": initiate_model_evaluation()