|
|
"A `Callback` that saves tracked metrics and notebook file into MLflow server." |
|
|
from ..torch_core import * |
|
|
from ..callback import * |
|
|
from ..basic_train import Learner, LearnerCallback |
|
|
|
|
|
try: import mlflow |
|
|
except: print("To use this tracker, please run 'pip install mlflow'") |
|
|
|
|
|
class MLFlowTracker(LearnerCallback): |
|
|
"A `TrackerCallback` that tracks the loss and metrics into MLFlow" |
|
|
def __init__(self, learn:Learner, exp_name: str, params: dict, nb_path: str, uri: str = "http://localhost:5000"): |
|
|
super().__init__(learn) |
|
|
self.learn,self.exp_name,self.params,self.nb_path,self.uri = learn,exp_name,params,nb_path,uri |
|
|
self.metrics_names = ['train_loss', 'valid_loss'] + [o.__name__ for o in learn.metrics] |
|
|
|
|
|
def on_train_begin(self, **kwargs: Any) -> None: |
|
|
"Prepare MLflow experiment and log params" |
|
|
self.client = mlflow.tracking.MlflowClient(self.uri) |
|
|
exp = self.client.get_experiment_by_name(self.exp_name) |
|
|
self.exp_id = self.client.create_experiment(self.exp_name) if exp is None else exp.experiment_id |
|
|
run = self.client.create_run(experiment_id=self.exp_id) |
|
|
self.run = run.info.run_uuid |
|
|
for k,v in self.params.items(): |
|
|
self.client.log_param(run_id=self.run, key=k, value=v) |
|
|
|
|
|
def on_epoch_end(self, epoch, **kwargs:Any)->None: |
|
|
"Send loss and metrics values to MLFlow after each epoch" |
|
|
if kwargs['smooth_loss'] is None or kwargs["last_metrics"] is None: return |
|
|
metrics = [kwargs['smooth_loss']] + kwargs["last_metrics"] |
|
|
for name, val in zip(self.metrics_names, metrics): |
|
|
self.client.log_metric(self.run, name, np.float(val), step=epoch) |
|
|
|
|
|
def on_train_end(self, **kwargs: Any) -> None: |
|
|
"Store the notebook and stop run" |
|
|
self.client.log_artifact(run_id=self.run, local_path=self.nb_path) |
|
|
self.client.set_terminated(run_id=self.run) |
|
|
|