Abeshith's picture
Add MLflow tracking integration
d463732
import pandas as pd
import mlflow
import os
from mlpipeline.entity import ModelTrainerConfig, ModelTrainerArtifact
from mlpipeline.automl import AutoMLFactory
from mlpipeline.logging.logger import get_logger
from mlpipeline.exception import ModelTrainingException
from mlpipeline.constants import AUTOML_CONFIG_FILE_PATH
from mlpipeline.utils.common import read_yaml
from pathlib import Path
import sys
logger = get_logger(__name__)
class AutoMLTrainer:
def __init__(self, config: ModelTrainerConfig):
self.config = config
def train(self) -> ModelTrainerArtifact:
try:
logger.info("Starting model training")
tracking_uri = os.getenv("MLFLOW_TRACKING_URI", "https://dagshub.com/abheshith7/AutoML-MLOps-PipeLine.mlflow/")
dagshub_token = os.getenv("DAGSHUB_TOKEN")
mlflow_enabled = False
if dagshub_token and "dagshub.com" in tracking_uri:
try:
os.environ["MLFLOW_TRACKING_USERNAME"] = os.getenv("DAGSHUB_USERNAME", "abheshith7")
os.environ["MLFLOW_TRACKING_PASSWORD"] = dagshub_token
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment("automl_experiment")
mlflow_enabled = True
logger.info(f"MLflow tracking enabled: {tracking_uri}")
except Exception as e:
logger.warning(f"MLflow tracking disabled: {str(e)}")
mlflow_enabled = False
else:
logger.warning("MLflow tracking disabled: DAGSHUB_TOKEN not set")
if mlflow_enabled:
mlflow.start_run()
train_df = pd.read_csv(self.config.train_data_path)
automl_config = read_yaml(Path(AUTOML_CONFIG_FILE_PATH))
library_config = automl_config[self.config.automl_library]
if mlflow_enabled:
mlflow.log_param("automl_library", self.config.automl_library)
mlflow.log_param("target_column", self.config.target_column)
mlflow.log_param("train_samples", len(train_df))
mlflow.log_params(library_config)
trainer = AutoMLFactory.create_trainer(
self.config.automl_library,
library_config
)
os.makedirs(self.config.root_dir, exist_ok=True)
if self.config.automl_library == 'autogluon':
result = trainer.train(train_df, self.config.target_column, self.config.model_path)
if isinstance(result, tuple):
metrics, feature_importance = result
else:
metrics = result
feature_importance = None
else:
X_train = train_df.drop(columns=[self.config.target_column])
y_train = train_df[self.config.target_column]
metrics = trainer.train(X_train, y_train, self.config.model_path)
feature_importance = None
if mlflow_enabled:
# Separate numeric metrics from string values
numeric_metrics = {}
string_values = {}
for key, value in metrics.items():
if isinstance(value, (int, float)):
numeric_metrics[key] = value
else:
string_values[key] = str(value)
# Log numeric metrics only
if numeric_metrics:
mlflow.log_metrics(numeric_metrics)
# Log string values as tags
for key, value in string_values.items():
mlflow.set_tag(key, value)
# Log feature importance as artifact
if feature_importance is not None:
import json
fi_dict = feature_importance.to_dict() if hasattr(feature_importance, 'to_dict') else {}
fi_path = Path(self.config.root_dir) / "feature_importance.json"
with open(fi_path, 'w') as f:
json.dump(fi_dict, f, indent=2)
mlflow.log_artifact(str(fi_path))
# Log model leaderboard
try:
from autogluon.tabular import TabularPredictor
predictor = TabularPredictor.load(str(self.config.model_path))
leaderboard = predictor.leaderboard(silent=True)
lb_path = Path(self.config.root_dir) / "leaderboard.csv"
leaderboard.to_csv(lb_path, index=False)
mlflow.log_artifact(str(lb_path))
except:
pass
# Set additional tags
mlflow.set_tag("model_type", "AutoML")
mlflow.set_tag("framework", self.config.automl_library)
run_id = mlflow.active_run().info.run_id
logger.info(f"MLflow run logged: {run_id}")
mlflow.end_run()
logger.info(f"Model trained with metrics: {metrics}")
return ModelTrainerArtifact(
model_path=self.config.model_path,
train_metrics=metrics,
is_trained=True,
message=f"Model trained successfully with score: {metrics.get('score', 0.0):.4f}"
)
except Exception as e:
if mlflow.active_run():
mlflow.end_run()
raise ModelTrainingException(str(e), sys)