Spaces:
Build error
Build error
| """Benchmark all the algorithms in the repo.""" | |
| # Copyright (C) 2020 Intel Corporation | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, | |
| # software distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions | |
| # and limitations under the License. | |
| import functools | |
| import io | |
| import logging | |
| import math | |
| import multiprocessing | |
| import sys | |
| import time | |
| import warnings | |
| from concurrent.futures import ProcessPoolExecutor, as_completed | |
| from pathlib import Path | |
| from tempfile import TemporaryDirectory | |
| from typing import Dict, List, Union, cast | |
| import torch | |
| from omegaconf import DictConfig, ListConfig, OmegaConf | |
| from pytorch_lightning import Trainer, seed_everything | |
| from utils import convert_to_openvino, upload_to_wandb, write_metrics | |
| from anomalib.config import get_configurable_parameters, update_input_size_config | |
| from anomalib.data import get_datamodule | |
| from anomalib.models import get_model | |
| from anomalib.utils.loggers import configure_logger | |
| from anomalib.utils.sweep import ( | |
| get_meta_data, | |
| get_openvino_throughput, | |
| get_run_config, | |
| get_sweep_callbacks, | |
| get_torch_throughput, | |
| set_in_nested_config, | |
| ) | |
| warnings.filterwarnings("ignore") | |
| logger = logging.getLogger(__name__) | |
| configure_logger() | |
| pl_logger = logging.getLogger(__file__) | |
| for logger_name in ["pytorch_lightning", "torchmetrics", "os"]: | |
| logging.getLogger(logger_name).setLevel(logging.ERROR) | |
| def hide_output(func): | |
| """Decorator to hide output of the function. | |
| Args: | |
| func (function): Hides output of this function. | |
| Raises: | |
| Exception: Incase the execution of function fails, it raises an exception. | |
| Returns: | |
| object of the called function | |
| """ | |
| def wrapper(*args, **kwargs): | |
| std_out = sys.stdout | |
| sys.stdout = buf = io.StringIO() | |
| try: | |
| value = func(*args, **kwargs) | |
| except Exception as exp: | |
| raise Exception(buf.getvalue()) from exp | |
| sys.stdout = std_out | |
| return value | |
| return wrapper | |
| def get_single_model_metrics(model_config: Union[DictConfig, ListConfig], openvino_metrics: bool = False) -> Dict: | |
| """Collects metrics for `model_name` and returns a dict of results. | |
| Args: | |
| model_config (DictConfig, ListConfig): Configuration for run | |
| openvino_metrics (bool): If True, converts the model to OpenVINO format and gathers inference metrics. | |
| Returns: | |
| Dict: Collection of all the metrics such as time taken, throughput and performance scores. | |
| """ | |
| with TemporaryDirectory() as project_path: | |
| model_config.project.path = project_path | |
| datamodule = get_datamodule(model_config) | |
| model = get_model(model_config) | |
| callbacks = get_sweep_callbacks() | |
| trainer = Trainer(**model_config.trainer, logger=None, callbacks=callbacks) | |
| start_time = time.time() | |
| trainer.fit(model=model, datamodule=datamodule) | |
| # get start time | |
| training_time = time.time() - start_time | |
| # Creating new variable is faster according to https://stackoverflow.com/a/4330829 | |
| start_time = time.time() | |
| # get test results | |
| test_results = trainer.test(model=model, datamodule=datamodule) | |
| # get testing time | |
| testing_time = time.time() - start_time | |
| meta_data = get_meta_data(model, model_config.model.input_size) | |
| throughput = get_torch_throughput(model_config, model, datamodule.test_dataloader().dataset, meta_data) | |
| # Get OpenVINO metrics | |
| openvino_throughput = float("nan") | |
| if openvino_metrics: | |
| # Create dirs for openvino model export | |
| openvino_export_path = project_path / Path("exported_models") | |
| openvino_export_path.mkdir(parents=True, exist_ok=True) | |
| convert_to_openvino(model, openvino_export_path, model_config.model.input_size) | |
| openvino_throughput = get_openvino_throughput( | |
| model_config, openvino_export_path, datamodule.test_dataloader().dataset, meta_data | |
| ) | |
| # arrange the data | |
| data = { | |
| "Training Time (s)": training_time, | |
| "Testing Time (s)": testing_time, | |
| "Inference Throughput (fps)": throughput, | |
| "OpenVINO Inference Throughput (fps)": openvino_throughput, | |
| } | |
| for key, val in test_results[0].items(): | |
| data[key] = float(val) | |
| return data | |
| def compute_on_cpu(): | |
| """Compute all run configurations over a sigle CPU.""" | |
| sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml") | |
| for run_config in get_run_config(sweep_config.grid_search): | |
| model_metrics = sweep(run_config, 0, sweep_config.seed, False) | |
| write_metrics(model_metrics, sweep_config.writer) | |
| def compute_on_gpu( | |
| run_configs: Union[DictConfig, ListConfig], | |
| device: int, | |
| seed: int, | |
| writers: List[str], | |
| compute_openvino: bool = False, | |
| ): | |
| """Go over each run config and collect the result. | |
| Args: | |
| run_configs (Union[DictConfig, ListConfig]): List of run configurations. | |
| device (int): The GPU id used for running the sweep. | |
| seed (int): Fix a seed. | |
| writers (List[str]): Destinations to write to. | |
| compute_openvino (bool, optional): Compute OpenVINO throughput. Defaults to False. | |
| """ | |
| for run_config in run_configs: | |
| if isinstance(run_config, (DictConfig, ListConfig)): | |
| model_metrics = sweep(run_config, device, seed, compute_openvino) | |
| write_metrics(model_metrics, writers) | |
| else: | |
| raise ValueError( | |
| f"Expecting `run_config` of type DictConfig or ListConfig. Got {type(run_config)} instead." | |
| ) | |
| def distribute_over_gpus(): | |
| """Distribute metric collection over all available GPUs. This is done by splitting the list of configurations.""" | |
| sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml") | |
| with ProcessPoolExecutor( | |
| max_workers=torch.cuda.device_count(), mp_context=multiprocessing.get_context("spawn") | |
| ) as executor: | |
| run_configs = list(get_run_config(sweep_config.grid_search)) | |
| jobs = [] | |
| for device_id, run_split in enumerate( | |
| range(0, len(run_configs), math.ceil(len(run_configs) / torch.cuda.device_count())) | |
| ): | |
| jobs.append( | |
| executor.submit( | |
| compute_on_gpu, | |
| run_configs[run_split : run_split + math.ceil(len(run_configs) / torch.cuda.device_count())], | |
| device_id + 1, | |
| sweep_config.seed, | |
| sweep_config.writer, | |
| sweep_config.compute_openvino, | |
| ) | |
| ) | |
| for job in jobs: | |
| try: | |
| job.result() | |
| except Exception as exc: | |
| raise Exception(f"Error occurred while computing benchmark on device {job}") from exc | |
| def distribute(): | |
| """Run all cpu experiments on a single process. Distribute gpu experiments over all available gpus. | |
| Args: | |
| device_count (int, optional): If device count is 0, uses only cpu else spawn processes according | |
| to number of gpus available on the machine. Defaults to 0. | |
| """ | |
| sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml") | |
| devices = sweep_config.hardware | |
| if not torch.cuda.is_available() and "gpu" in devices: | |
| pl_logger.warning("Config requested GPU benchmarking but torch could not detect any cuda enabled devices") | |
| elif {"cpu", "gpu"}.issubset(devices): | |
| # Create process for gpu and cpu | |
| with ProcessPoolExecutor(max_workers=2, mp_context=multiprocessing.get_context("spawn")) as executor: | |
| jobs = [executor.submit(compute_on_cpu), executor.submit(distribute_over_gpus)] | |
| for job in as_completed(jobs): | |
| try: | |
| job.result() | |
| except Exception as exception: | |
| raise Exception(f"Error occurred while computing benchmark on device {job}") from exception | |
| elif "cpu" in devices: | |
| compute_on_cpu() | |
| elif "gpu" in devices: | |
| distribute_over_gpus() | |
| if "wandb" in sweep_config.writer: | |
| upload_to_wandb(team="anomalib") | |
| def sweep( | |
| run_config: Union[DictConfig, ListConfig], device: int = 0, seed: int = 42, convert_openvino: bool = False | |
| ) -> Dict[str, Union[float, str]]: | |
| """Go over all the values mentioned in `grid_search` parameter of the benchmarking config. | |
| Args: | |
| run_config: (Union[DictConfig, ListConfig], optional): Configuration for current run. | |
| device (int, optional): Name of the device on which the model is trained. Defaults to 0 "cpu". | |
| convert_openvino (bool, optional): Whether to convert the model to openvino format. Defaults to False. | |
| Returns: | |
| Dict[str, Union[float, str]]: Dictionary containing the metrics gathered from the sweep. | |
| """ | |
| seed_everything(seed, workers=True) | |
| # This assumes that `model_name` is always present in the sweep config. | |
| model_config = get_configurable_parameters(model_name=run_config.model_name) | |
| model_config.project.seed = seed | |
| model_config = cast(DictConfig, model_config) # placate mypy | |
| for param in run_config.keys(): | |
| # grid search keys are always assumed to be strings | |
| param = cast(str, param) # placate mypy | |
| set_in_nested_config(model_config, param.split("."), run_config[param]) # type: ignore | |
| # convert image size to tuple in case it was updated by run config | |
| model_config = update_input_size_config(model_config) | |
| # Set device in config. 0 - cpu, [0], [1].. - gpu id | |
| model_config.trainer.gpus = 0 if device == 0 else [device - 1] | |
| if run_config.model_name in ["patchcore", "cflow"]: | |
| convert_openvino = False # `torch.cdist` is not supported by onnx version 11 | |
| # TODO Remove this line when issue #40 is fixed https://github.com/openvinotoolkit/anomalib/issues/40 | |
| if model_config.model.input_size != (224, 224): | |
| return {} # go to next run | |
| # Run benchmarking for current config | |
| model_metrics = get_single_model_metrics(model_config=model_config, openvino_metrics=convert_openvino) | |
| output = f"One sweep run complete for model {model_config.model.name}" | |
| output += f" On category {model_config.dataset.category}" if model_config.dataset.category is not None else "" | |
| output += str(model_metrics) | |
| logger.info(output) | |
| # Append configuration of current run to the collected metrics | |
| for key, value in run_config.items(): | |
| # Skip adding model name to the dataframe | |
| if key != "model_name": | |
| model_metrics[key] = value | |
| # Add device name to list | |
| model_metrics["device"] = "gpu" if device > 0 else "cpu" | |
| model_metrics["model_name"] = run_config.model_name | |
| return model_metrics | |
| if __name__ == "__main__": | |
| # Benchmarking entry point. | |
| # Spawn multiple processes one for cpu and rest for the number of gpus available in the system. | |
| # The idea is to distribute metrics collection over all the available devices. | |
| logger.info("Benchmarking started πββοΈ. This will take a while β² depending on your configuration.") | |
| distribute() | |
| logger.info("Finished gathering results β‘") | |