import pandas as pd import mlflow import os from mlpipeline.entity import ModelTrainerConfig, ModelTrainerArtifact from mlpipeline.automl import AutoMLFactory from mlpipeline.logging.logger import get_logger from mlpipeline.exception import ModelTrainingException from mlpipeline.constants import AUTOML_CONFIG_FILE_PATH from mlpipeline.utils.common import read_yaml from pathlib import Path import sys logger = get_logger(__name__) class AutoMLTrainer: def __init__(self, config: ModelTrainerConfig): self.config = config def train(self) -> ModelTrainerArtifact: try: logger.info("Starting model training") tracking_uri = os.getenv("MLFLOW_TRACKING_URI", "https://dagshub.com/abheshith7/AutoML-MLOps-PipeLine.mlflow/") dagshub_token = os.getenv("DAGSHUB_TOKEN") mlflow_enabled = False if dagshub_token and "dagshub.com" in tracking_uri: try: os.environ["MLFLOW_TRACKING_USERNAME"] = os.getenv("DAGSHUB_USERNAME", "abheshith7") os.environ["MLFLOW_TRACKING_PASSWORD"] = dagshub_token mlflow.set_tracking_uri(tracking_uri) mlflow.set_experiment("automl_experiment") mlflow_enabled = True logger.info(f"MLflow tracking enabled: {tracking_uri}") except Exception as e: logger.warning(f"MLflow tracking disabled: {str(e)}") mlflow_enabled = False else: logger.warning("MLflow tracking disabled: DAGSHUB_TOKEN not set") if mlflow_enabled: mlflow.start_run() train_df = pd.read_csv(self.config.train_data_path) automl_config = read_yaml(Path(AUTOML_CONFIG_FILE_PATH)) library_config = automl_config[self.config.automl_library] if mlflow_enabled: mlflow.log_param("automl_library", self.config.automl_library) mlflow.log_param("target_column", self.config.target_column) mlflow.log_param("train_samples", len(train_df)) mlflow.log_params(library_config) trainer = AutoMLFactory.create_trainer( self.config.automl_library, library_config ) os.makedirs(self.config.root_dir, exist_ok=True) if self.config.automl_library == 'autogluon': result = trainer.train(train_df, self.config.target_column, self.config.model_path) if isinstance(result, tuple): metrics, feature_importance = result else: metrics = result feature_importance = None else: X_train = train_df.drop(columns=[self.config.target_column]) y_train = train_df[self.config.target_column] metrics = trainer.train(X_train, y_train, self.config.model_path) feature_importance = None if mlflow_enabled: # Separate numeric metrics from string values numeric_metrics = {} string_values = {} for key, value in metrics.items(): if isinstance(value, (int, float)): numeric_metrics[key] = value else: string_values[key] = str(value) # Log numeric metrics only if numeric_metrics: mlflow.log_metrics(numeric_metrics) # Log string values as tags for key, value in string_values.items(): mlflow.set_tag(key, value) # Log feature importance as artifact if feature_importance is not None: import json fi_dict = feature_importance.to_dict() if hasattr(feature_importance, 'to_dict') else {} fi_path = Path(self.config.root_dir) / "feature_importance.json" with open(fi_path, 'w') as f: json.dump(fi_dict, f, indent=2) mlflow.log_artifact(str(fi_path)) # Log model leaderboard try: from autogluon.tabular import TabularPredictor predictor = TabularPredictor.load(str(self.config.model_path)) leaderboard = predictor.leaderboard(silent=True) lb_path = Path(self.config.root_dir) / "leaderboard.csv" leaderboard.to_csv(lb_path, index=False) mlflow.log_artifact(str(lb_path)) except: pass # Set additional tags mlflow.set_tag("model_type", "AutoML") mlflow.set_tag("framework", self.config.automl_library) run_id = mlflow.active_run().info.run_id logger.info(f"MLflow run logged: {run_id}") mlflow.end_run() logger.info(f"Model trained with metrics: {metrics}") return ModelTrainerArtifact( model_path=self.config.model_path, train_metrics=metrics, is_trained=True, message=f"Model trained successfully with score: {metrics.get('score', 0.0):.4f}" ) except Exception as e: if mlflow.active_run(): mlflow.end_run() raise ModelTrainingException(str(e), sys)