| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """ |
| A unified tracking interface that supports logging data to different backend |
| """ |
|
|
| import dataclasses |
| import json |
| import logging |
| import os |
| from enum import Enum |
| from functools import partial |
| from pathlib import Path |
| from typing import Any |
|
|
| import orjson |
|
|
| logger = logging.getLogger(__name__) |
|
|
| MLFLOW_MAX_ATTEMPTS = 3 |
| MLFLOW_SLEEP_SECONDS = 5 |
|
|
|
|
| class Tracking: |
| """A unified tracking interface for logging experiment data to multiple backends. |
| |
| This class provides a centralized way to log experiment metrics, parameters, and artifacts |
| to various tracking backends including WandB, MLflow, SwanLab, TensorBoard, and console. |
| |
| Attributes: |
| supported_backend: List of supported tracking backends. |
| logger: Dictionary of initialized logger instances for each backend. |
| """ |
|
|
| supported_backend = [ |
| "wandb", |
| "mlflow", |
| "swanlab", |
| "vemlp_wandb", |
| "tensorboard", |
| "console", |
| "clearml", |
| "trackio", |
| "file", |
| ] |
|
|
| def __init__(self, project_name, experiment_name, default_backend: str | list[str] = "console", config=None): |
| if isinstance(default_backend, str): |
| default_backend = [default_backend] |
| for backend in default_backend: |
| if backend == "tracking": |
| import warnings |
|
|
| warnings.warn("`tracking` logger is deprecated. use `wandb` instead.", DeprecationWarning, stacklevel=2) |
| else: |
| assert backend in self.supported_backend, f"{backend} is not supported" |
|
|
| self.logger = {} |
|
|
| if "tracking" in default_backend or "wandb" in default_backend: |
| import os |
|
|
| import wandb |
|
|
| settings = None |
| if config and config["trainer"].get("wandb_proxy", None): |
| settings = wandb.Settings(https_proxy=config["trainer"]["wandb_proxy"]) |
| entity = os.environ.get("WANDB_ENTITY", None) |
| wandb.init(project=project_name, name=experiment_name, entity=entity, config=config, settings=settings) |
| self.logger["wandb"] = wandb |
|
|
| if "trackio" in default_backend: |
| import trackio |
|
|
| trackio.init(project=project_name, name=experiment_name, config=config) |
| self.logger["trackio"] = trackio |
|
|
| if "mlflow" in default_backend: |
| import os |
| import time |
|
|
| import mlflow |
|
|
| for _mlflow_attempt in range(1, MLFLOW_MAX_ATTEMPTS + 1): |
| try: |
| MLFLOW_TRACKING_URI = os.environ.get("MLFLOW_TRACKING_URI", "sqlite:////tmp/mlruns.db") |
| logger.info("Using MLFlow tracking URI: %s", MLFLOW_TRACKING_URI) |
| mlflow.set_tracking_uri(MLFLOW_TRACKING_URI) |
|
|
| |
| |
| run_id = os.environ.get("MLFLOW_RUN_ID") |
| if run_id: |
| mlflow.start_run(run_id=run_id) |
| else: |
| |
| |
| experiment = mlflow.set_experiment(project_name) |
| mlflow.start_run(experiment_id=experiment.experiment_id, run_name=experiment_name) |
|
|
| mlflow.log_params(_compute_mlflow_params_from_objects(config)) |
| self.logger["mlflow"] = _MlflowLoggingAdapter() |
| break |
| except Exception as e: |
| logger.warning( |
| "MLflow initialization attempt %d/%d failed: %s", _mlflow_attempt, MLFLOW_MAX_ATTEMPTS, e |
| ) |
| if _mlflow_attempt < MLFLOW_MAX_ATTEMPTS: |
| time.sleep(MLFLOW_SLEEP_SECONDS) |
| else: |
| logger.warning("All MLflow initialization attempts failed. Proceeding without MLflow tracking.") |
|
|
| if "swanlab" in default_backend: |
| import os |
|
|
| import swanlab |
|
|
| SWANLAB_API_KEY = os.environ.get("SWANLAB_API_KEY", None) |
| SWANLAB_LOG_DIR = os.environ.get("SWANLAB_LOG_DIR", "swanlog") |
| SWANLAB_MODE = os.environ.get("SWANLAB_MODE", "cloud") |
| if SWANLAB_API_KEY: |
| swanlab.login(SWANLAB_API_KEY) |
|
|
| if config is None: |
| config = {} |
| swanlab.init( |
| project=project_name, |
| experiment_name=experiment_name, |
| config={"FRAMEWORK": "verl", **config}, |
| logdir=SWANLAB_LOG_DIR, |
| mode=SWANLAB_MODE, |
| ) |
| self.logger["swanlab"] = swanlab |
|
|
| if "vemlp_wandb" in default_backend: |
| import os |
|
|
| import volcengine_ml_platform |
| from volcengine_ml_platform import wandb as vemlp_wandb |
|
|
| volcengine_ml_platform.init( |
| ak=os.environ["VOLC_ACCESS_KEY_ID"], |
| sk=os.environ["VOLC_SECRET_ACCESS_KEY"], |
| region=os.environ["MLP_TRACKING_REGION"], |
| ) |
|
|
| vemlp_wandb.init( |
| project=project_name, |
| name=experiment_name, |
| config=config, |
| sync_tensorboard=True, |
| ) |
| self.logger["vemlp_wandb"] = vemlp_wandb |
|
|
| if "tensorboard" in default_backend: |
| self.logger["tensorboard"] = _TensorboardAdapter(project_name, experiment_name) |
|
|
| if "console" in default_backend: |
| from verl.utils.logger import LocalLogger |
|
|
| self.console_logger = LocalLogger(print_to_console=True) |
| self.logger["console"] = self.console_logger |
|
|
| if "clearml" in default_backend: |
| self.logger["clearml"] = ClearMLLogger(project_name, experiment_name, config) |
|
|
| if "file" in default_backend: |
| self.logger["file"] = FileLogger(project_name, experiment_name) |
|
|
| def log(self, data, step, backend=None): |
| for default_backend, logger_instance in self.logger.items(): |
| if backend is None or default_backend in backend: |
| logger_instance.log(data=data, step=step) |
|
|
| def __del__(self): |
| if "wandb" in self.logger: |
| self.logger["wandb"].finish(exit_code=0) |
| if "swanlab" in self.logger: |
| self.logger["swanlab"].finish() |
| if "vemlp_wandb" in self.logger: |
| self.logger["vemlp_wandb"].finish(exit_code=0) |
| if "tensorboard" in self.logger: |
| self.logger["tensorboard"].finish() |
| if "clearml" in self.logger: |
| self.logger["clearml"].finish() |
| if "trackio" in self.logger: |
| self.logger["trackio"].finish() |
| if "file" in self.logger: |
| self.logger["file"].finish() |
|
|
|
|
| class ClearMLLogger: |
| def __init__(self, project_name: str, experiment_name: str, config): |
| self.project_name = project_name |
| self.experiment_name = experiment_name |
|
|
| import clearml |
|
|
| self._task: clearml.Task = clearml.Task.init( |
| task_name=experiment_name, |
| project_name=project_name, |
| continue_last_task=True, |
| output_uri=False, |
| ) |
|
|
| self._task.connect_configuration(config, name="Hyperparameters") |
|
|
| def _get_logger(self): |
| return self._task.get_logger() |
|
|
| def log(self, data, step): |
| import numpy as np |
| import pandas as pd |
|
|
| |
| logger = self._get_logger() |
| for k, v in data.items(): |
| title, series = k.split("/", 1) |
|
|
| if isinstance(v, int | float | np.floating | np.integer): |
| logger.report_scalar( |
| title=title, |
| series=series, |
| value=v, |
| iteration=step, |
| ) |
| elif isinstance(v, pd.DataFrame): |
| logger.report_table( |
| title=title, |
| series=series, |
| table_plot=v, |
| iteration=step, |
| ) |
| else: |
| logger.warning( |
| f'Trainer is attempting to log a value of "{v}" of type {type(v)} for key "{k}". This ' |
| f"invocation of ClearML logger's function is incorrect so this attribute was dropped. " |
| ) |
|
|
| def finish(self): |
| self._task.close() |
|
|
|
|
| class FileLogger: |
| def __init__(self, project_name: str, experiment_name: str): |
| self.project_name = project_name |
| self.experiment_name = experiment_name |
|
|
| self.filepath = os.getenv("VERL_FILE_LOGGER_PATH", None) |
| if self.filepath is None: |
| root_path = os.path.expanduser(os.getenv("VERL_FILE_LOGGER_ROOT", ".")) |
| directory = os.path.join(root_path, self.project_name) |
| os.makedirs(directory, exist_ok=True) |
| self.filepath = os.path.join(directory, f"{self.experiment_name}.jsonl") |
| print(f"Creating file logger at {self.filepath}") |
| self.fp = open(self.filepath, "wb", buffering=0) |
|
|
| def log(self, data, step): |
| data = {"step": step, "data": data} |
| self.fp.write(orjson.dumps(data, option=orjson.OPT_SERIALIZE_NUMPY) + b"\n") |
|
|
| def finish(self): |
| self.fp.close() |
|
|
|
|
| class _TensorboardAdapter: |
| def __init__(self, project_name, experiment_name): |
| import os |
|
|
| from torch.utils.tensorboard import SummaryWriter |
|
|
| tensorboard_dir = os.environ.get("TENSORBOARD_DIR", f"tensorboard_log/{project_name}/{experiment_name}") |
| os.makedirs(tensorboard_dir, exist_ok=True) |
| print(f"Saving tensorboard log to {tensorboard_dir}.") |
| self.writer = SummaryWriter(tensorboard_dir) |
|
|
| def log(self, data, step): |
| for key in data: |
| self.writer.add_scalar(key, data[key], step) |
|
|
| def finish(self): |
| self.writer.close() |
|
|
|
|
| class _MlflowLoggingAdapter: |
| def __init__(self): |
| import logging |
| import re |
|
|
| self.logger = logging.getLogger(__name__) |
| |
| logging.getLogger("botocore.credentials").setLevel(logging.WARNING) |
| |
| |
| |
| |
| self._invalid_chars_pattern = re.compile( |
| r"[^/\w.\- :]" |
| ) |
| self._consecutive_slashes_pattern = re.compile(r"/+") |
| self._sanitized_key_cache = {} |
|
|
| def _sanitize_key(self, key): |
| if key in self._sanitized_key_cache: |
| return self._sanitized_key_cache[key] or key |
| |
| sanitized = key.replace("@", "_at_") |
| |
| sanitized = self._consecutive_slashes_pattern.sub("/", sanitized) |
| |
| sanitized = self._invalid_chars_pattern.sub("_", sanitized) |
| if sanitized == key: |
| self._sanitized_key_cache[key] = None |
| else: |
| self.logger.warning("[MLflow] Metric key '%s' sanitized to '%s' due to invalid characters.", key, sanitized) |
| self._sanitized_key_cache[key] = sanitized |
| return sanitized |
|
|
| def log(self, data, step): |
| import mlflow |
|
|
| results = {self._sanitize_key(k): v for k, v in data.items()} |
| mlflow.log_metrics(metrics=results, step=step) |
|
|
|
|
| def _compute_mlflow_params_from_objects(params) -> dict[str, Any]: |
| if params is None: |
| return {} |
|
|
| return _flatten_dict(_transform_params_to_json_serializable(params, convert_list_to_dict=True), sep="/") |
|
|
|
|
| def _transform_params_to_json_serializable(x, convert_list_to_dict: bool): |
| _transform = partial(_transform_params_to_json_serializable, convert_list_to_dict=convert_list_to_dict) |
|
|
| if dataclasses.is_dataclass(x): |
| return _transform(dataclasses.asdict(x)) |
| if isinstance(x, dict): |
| return {k: _transform(v) for k, v in x.items()} |
| if isinstance(x, list): |
| if convert_list_to_dict: |
| return {"list_len": len(x)} | {f"{i}": _transform(v) for i, v in enumerate(x)} |
| else: |
| return [_transform(v) for v in x] |
| if isinstance(x, Path): |
| return str(x) |
| if isinstance(x, Enum): |
| return x.value |
|
|
| return x |
|
|
|
|
| def _flatten_dict(raw: dict[str, Any], *, sep: str) -> dict[str, Any]: |
| import pandas as pd |
|
|
| ans = pd.json_normalize(raw, sep=sep).to_dict(orient="records")[0] |
| assert isinstance(ans, dict) |
| return ans |
|
|
|
|
| @dataclasses.dataclass |
| class ValidationGenerationsLogger: |
| project_name: str = None |
| experiment_name: str = None |
|
|
| def log(self, loggers, samples, step): |
| if "wandb" in loggers: |
| self.log_generations_to_wandb(samples, step) |
| if "swanlab" in loggers: |
| self.log_generations_to_swanlab(samples, step) |
| if "mlflow" in loggers: |
| self.log_generations_to_mlflow(samples, step) |
|
|
| if "clearml" in loggers: |
| self.log_generations_to_clearml(samples, step) |
| if "tensorboard" in loggers: |
| self.log_generations_to_tensorboard(samples, step) |
|
|
| if "vemlp_wandb" in loggers: |
| self.log_generations_to_vemlp_wandb(samples, step) |
|
|
| def log_generations_to_vemlp_wandb(self, samples, step): |
| from volcengine_ml_platform import wandb as vemlp_wandb |
|
|
| self._log_generations_to_wandb(samples, step, vemlp_wandb) |
|
|
| def log_generations_to_wandb(self, samples, step): |
| import wandb |
|
|
| self._log_generations_to_wandb(samples, step, wandb) |
|
|
| def _log_generations_to_wandb(self, samples, step, wandb): |
| """Log samples to wandb as a table""" |
|
|
| |
| columns = ["step"] + sum( |
| [[f"input_{i + 1}", f"output_{i + 1}", f"score_{i + 1}"] for i in range(len(samples))], [] |
| ) |
|
|
| if not hasattr(self, "validation_table"): |
| |
| self.validation_table = wandb.Table(columns=columns) |
|
|
| |
| |
| new_table = wandb.Table(columns=columns, data=self.validation_table.data) |
|
|
| |
| row_data = [] |
| row_data.append(step) |
| for sample in samples: |
| row_data.extend(sample) |
|
|
| new_table.add_data(*row_data) |
|
|
| |
| if wandb.run is not None: |
| wandb.log({"val/generations": new_table}, step=step) |
| self.validation_table = new_table |
|
|
| def log_generations_to_swanlab(self, samples, step): |
| """Log samples to swanlab as text""" |
| import swanlab |
|
|
| swanlab_table = swanlab.echarts.Table() |
|
|
| |
| headers = ["step", "input", "output", "score"] |
|
|
| swanlab_row_list = [[step, *sample] for sample in samples] |
| swanlab_table.add(headers=headers, rows=swanlab_row_list) |
|
|
| |
| swanlab.log({"val/generations": swanlab_table}, step=step) |
|
|
| def log_generations_to_mlflow(self, samples, step): |
| """Log validation generation to mlflow as artifacts""" |
| |
|
|
| import tempfile |
|
|
| import mlflow |
|
|
| try: |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| validation_gen_step_file = Path(tmp_dir, f"val_step{step}.json") |
| row_data = [] |
| for sample in samples: |
| data = {"input": sample[0], "output": sample[1], "score": sample[2]} |
| row_data.append(data) |
| with open(validation_gen_step_file, "w") as file: |
| json.dump(row_data, file) |
| mlflow.log_artifact(validation_gen_step_file) |
| except Exception as e: |
| print(f"WARNING: save validation generation file to mlflow failed with error {e}") |
|
|
| def log_generations_to_clearml(self, samples, step): |
| """Log validation generation to clearml as table""" |
|
|
| import clearml |
| import pandas as pd |
|
|
| task: clearml.Task | None = clearml.Task.current_task() |
| if task is None: |
| return |
|
|
| table = [ |
| { |
| "step": step, |
| "input": sample[0], |
| "output": sample[1], |
| "score": sample[2], |
| } |
| for sample in samples |
| ] |
|
|
| logger = task.get_logger() |
| logger.report_table( |
| series="Validation generations", |
| title="Validation", |
| table_plot=pd.DataFrame.from_records(table), |
| iteration=step, |
| ) |
|
|
| def log_generations_to_tensorboard(self, samples, step): |
| """Log samples to tensorboard as text""" |
| |
| if not hasattr(self, "writer"): |
| from torch.utils.tensorboard import SummaryWriter |
|
|
| |
| if self.project_name and self.experiment_name: |
| default_dir = os.path.join("tensorboard_log", self.project_name, self.experiment_name) |
| else: |
| default_dir = "tensorboard_log" |
|
|
| tensorboard_dir = os.environ.get("TENSORBOARD_DIR", default_dir) |
| os.makedirs(tensorboard_dir, exist_ok=True) |
| self.writer = SummaryWriter(log_dir=tensorboard_dir) |
|
|
| |
| text_content = f"**Generation Results - Step {step}**\n\n" |
|
|
| for i, sample in enumerate(samples): |
| text_content += f"### Sample {i + 1}\n" |
|
|
| |
| if len(sample) >= 3: |
| input_text, output_text, score = sample[0], sample[1], sample[2] |
|
|
| text_content += f"**Input:** {input_text}\n\n" |
| text_content += f"**Output:** {output_text}\n\n" |
| text_content += f"**Score:** {score}\n\n" |
| else: |
| |
| text_content += f"**Data:** {sample}\n\n" |
|
|
| text_content += "---\n\n" |
|
|
| |
| self.writer.add_text("val/generations", text_content, step) |
| |
| self.writer.flush() |
|
|