| |
| import logging |
| import numpy as np |
| from itertools import count |
| from typing import List, Tuple |
| import torch |
| import tqdm |
| from fvcore.common.timer import Timer |
|
|
| from detectron2.utils import comm |
|
|
| from .build import build_batch_data_loader |
| from .common import DatasetFromList, MapDataset |
| from .samplers import TrainingSampler |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class _EmptyMapDataset(torch.utils.data.Dataset): |
| """ |
| Map anything to emptiness. |
| """ |
|
|
| def __init__(self, dataset): |
| self.ds = dataset |
|
|
| def __len__(self): |
| return len(self.ds) |
|
|
| def __getitem__(self, idx): |
| _ = self.ds[idx] |
| return [0] |
|
|
|
|
| def iter_benchmark( |
| iterator, num_iter: int, warmup: int = 5, max_time_seconds: float = 60 |
| ) -> Tuple[float, List[float]]: |
| """ |
| Benchmark an iterator/iterable for `num_iter` iterations with an extra |
| `warmup` iterations of warmup. |
| End early if `max_time_seconds` time is spent on iterations. |
| |
| Returns: |
| float: average time (seconds) per iteration |
| list[float]: time spent on each iteration. Sometimes useful for further analysis. |
| """ |
| num_iter, warmup = int(num_iter), int(warmup) |
|
|
| iterator = iter(iterator) |
| for _ in range(warmup): |
| next(iterator) |
| timer = Timer() |
| all_times = [] |
| for curr_iter in tqdm.trange(num_iter): |
| start = timer.seconds() |
| if start > max_time_seconds: |
| num_iter = curr_iter |
| break |
| next(iterator) |
| all_times.append(timer.seconds() - start) |
| avg = timer.seconds() / num_iter |
| return avg, all_times |
|
|
|
|
| class DataLoaderBenchmark: |
| """ |
| Some common benchmarks that help understand perf bottleneck of a standard dataloader |
| made of dataset, mapper and sampler. |
| """ |
|
|
| def __init__( |
| self, |
| dataset, |
| *, |
| mapper, |
| sampler=None, |
| total_batch_size, |
| num_workers=0, |
| max_time_seconds: int = 90, |
| ): |
| """ |
| Args: |
| max_time_seconds (int): maximum time to spent for each benchmark |
| other args: same as in `build.py:build_detection_train_loader` |
| """ |
| if isinstance(dataset, list): |
| dataset = DatasetFromList(dataset, copy=False, serialize=True) |
| if sampler is None: |
| sampler = TrainingSampler(len(dataset)) |
|
|
| self.dataset = dataset |
| self.mapper = mapper |
| self.sampler = sampler |
| self.total_batch_size = total_batch_size |
| self.num_workers = num_workers |
| self.per_gpu_batch_size = self.total_batch_size // comm.get_world_size() |
|
|
| self.max_time_seconds = max_time_seconds |
|
|
| def _benchmark(self, iterator, num_iter, warmup, msg=None): |
| avg, all_times = iter_benchmark(iterator, num_iter, warmup, self.max_time_seconds) |
| if msg is not None: |
| self._log_time(msg, avg, all_times) |
| return avg, all_times |
|
|
| def _log_time(self, msg, avg, all_times, distributed=False): |
| percentiles = [np.percentile(all_times, k, interpolation="nearest") for k in [1, 5, 95, 99]] |
| if not distributed: |
| logger.info( |
| f"{msg}: avg={1.0/avg:.1f} it/s, " |
| f"p1={percentiles[0]:.2g}s, p5={percentiles[1]:.2g}s, " |
| f"p95={percentiles[2]:.2g}s, p99={percentiles[3]:.2g}s." |
| ) |
| return |
| avg_per_gpu = comm.all_gather(avg) |
| percentiles_per_gpu = comm.all_gather(percentiles) |
| if comm.get_rank() > 0: |
| return |
| for idx, avg, percentiles in zip(count(), avg_per_gpu, percentiles_per_gpu): |
| logger.info( |
| f"GPU{idx} {msg}: avg={1.0/avg:.1f} it/s, " |
| f"p1={percentiles[0]:.2g}s, p5={percentiles[1]:.2g}s, " |
| f"p95={percentiles[2]:.2g}s, p99={percentiles[3]:.2g}s." |
| ) |
|
|
| def benchmark_dataset(self, num_iter, warmup=5): |
| """ |
| Benchmark the speed of taking raw samples from the dataset. |
| """ |
|
|
| def loader(): |
| while True: |
| for k in self.sampler: |
| yield self.dataset[k] |
|
|
| self._benchmark(loader(), num_iter, warmup, "Dataset Alone") |
|
|
| def benchmark_mapper(self, num_iter, warmup=5): |
| """ |
| Benchmark the speed of taking raw samples from the dataset and map |
| them in a single process. |
| """ |
|
|
| def loader(): |
| while True: |
| for k in self.sampler: |
| yield self.mapper(self.dataset[k]) |
|
|
| self._benchmark(loader(), num_iter, warmup, "Single Process Mapper (sec/sample)") |
|
|
| def benchmark_workers(self, num_iter, warmup=10): |
| """ |
| Benchmark the dataloader by tuning num_workers to [0, 1, self.num_workers]. |
| """ |
| candidates = [0, 1] |
| if self.num_workers not in candidates: |
| candidates.append(self.num_workers) |
|
|
| dataset = MapDataset(self.dataset, self.mapper) |
| for n in candidates: |
| loader = build_batch_data_loader( |
| dataset, |
| self.sampler, |
| self.total_batch_size, |
| num_workers=n, |
| ) |
| self._benchmark( |
| iter(loader), |
| num_iter * max(n, 1), |
| warmup * max(n, 1), |
| f"DataLoader ({n} workers, bs={self.per_gpu_batch_size})", |
| ) |
| del loader |
|
|
| def benchmark_IPC(self, num_iter, warmup=10): |
| """ |
| Benchmark the dataloader where each worker outputs nothing. This |
| eliminates the IPC overhead compared to the regular dataloader. |
| |
| PyTorch multiprocessing's IPC only optimizes for torch tensors. |
| Large numpy arrays or other data structure may incur large IPC overhead. |
| """ |
| n = self.num_workers |
| dataset = _EmptyMapDataset(MapDataset(self.dataset, self.mapper)) |
| loader = build_batch_data_loader( |
| dataset, self.sampler, self.total_batch_size, num_workers=n |
| ) |
| self._benchmark( |
| iter(loader), |
| num_iter * max(n, 1), |
| warmup * max(n, 1), |
| f"DataLoader ({n} workers, bs={self.per_gpu_batch_size}) w/o comm", |
| ) |
|
|
| def benchmark_distributed(self, num_iter, warmup=10): |
| """ |
| Benchmark the dataloader in each distributed worker, and log results of |
| all workers. This helps understand the final performance as well as |
| the variances among workers. |
| |
| It also prints startup time (first iter) of the dataloader. |
| """ |
| gpu = comm.get_world_size() |
| dataset = MapDataset(self.dataset, self.mapper) |
| n = self.num_workers |
| loader = build_batch_data_loader( |
| dataset, self.sampler, self.total_batch_size, num_workers=n |
| ) |
|
|
| timer = Timer() |
| loader = iter(loader) |
| next(loader) |
| startup_time = timer.seconds() |
| logger.info("Dataloader startup time: {:.2f} seconds".format(startup_time)) |
|
|
| comm.synchronize() |
|
|
| avg, all_times = self._benchmark(loader, num_iter * max(n, 1), warmup * max(n, 1)) |
| del loader |
| self._log_time( |
| f"DataLoader ({gpu} GPUs x {n} workers, total bs={self.total_batch_size})", |
| avg, |
| all_times, |
| True, |
| ) |
|
|