| | |
| | import logging |
| | import numpy as np |
| | from itertools import count |
| | from typing import List, Tuple |
| | import torch |
| | import tqdm |
| | from fvcore.common.timer import Timer |
| |
|
| | from detectron2.utils import comm |
| |
|
| | from .build import build_batch_data_loader |
| | from .common import DatasetFromList, MapDataset |
| | from .samplers import TrainingSampler |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class _EmptyMapDataset(torch.utils.data.Dataset): |
| | """ |
| | Map anything to emptiness. |
| | """ |
| |
|
| | def __init__(self, dataset): |
| | self.ds = dataset |
| |
|
| | def __len__(self): |
| | return len(self.ds) |
| |
|
| | def __getitem__(self, idx): |
| | _ = self.ds[idx] |
| | return [0] |
| |
|
| |
|
| | def iter_benchmark( |
| | iterator, num_iter: int, warmup: int = 5, max_time_seconds: float = 60 |
| | ) -> Tuple[float, List[float]]: |
| | """ |
| | Benchmark an iterator/iterable for `num_iter` iterations with an extra |
| | `warmup` iterations of warmup. |
| | End early if `max_time_seconds` time is spent on iterations. |
| | |
| | Returns: |
| | float: average time (seconds) per iteration |
| | list[float]: time spent on each iteration. Sometimes useful for further analysis. |
| | """ |
| | num_iter, warmup = int(num_iter), int(warmup) |
| |
|
| | iterator = iter(iterator) |
| | for _ in range(warmup): |
| | next(iterator) |
| | timer = Timer() |
| | all_times = [] |
| | for curr_iter in tqdm.trange(num_iter): |
| | start = timer.seconds() |
| | if start > max_time_seconds: |
| | num_iter = curr_iter |
| | break |
| | next(iterator) |
| | all_times.append(timer.seconds() - start) |
| | avg = timer.seconds() / num_iter |
| | return avg, all_times |
| |
|
| |
|
| | class DataLoaderBenchmark: |
| | """ |
| | Some common benchmarks that help understand perf bottleneck of a standard dataloader |
| | made of dataset, mapper and sampler. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | dataset, |
| | *, |
| | mapper, |
| | sampler=None, |
| | total_batch_size, |
| | num_workers=0, |
| | max_time_seconds: int = 90, |
| | ): |
| | """ |
| | Args: |
| | max_time_seconds (int): maximum time to spent for each benchmark |
| | other args: same as in `build.py:build_detection_train_loader` |
| | """ |
| | if isinstance(dataset, list): |
| | dataset = DatasetFromList(dataset, copy=False, serialize=True) |
| | if sampler is None: |
| | sampler = TrainingSampler(len(dataset)) |
| |
|
| | self.dataset = dataset |
| | self.mapper = mapper |
| | self.sampler = sampler |
| | self.total_batch_size = total_batch_size |
| | self.num_workers = num_workers |
| | self.per_gpu_batch_size = self.total_batch_size // comm.get_world_size() |
| |
|
| | self.max_time_seconds = max_time_seconds |
| |
|
| | def _benchmark(self, iterator, num_iter, warmup, msg=None): |
| | avg, all_times = iter_benchmark(iterator, num_iter, warmup, self.max_time_seconds) |
| | if msg is not None: |
| | self._log_time(msg, avg, all_times) |
| | return avg, all_times |
| |
|
| | def _log_time(self, msg, avg, all_times, distributed=False): |
| | percentiles = [np.percentile(all_times, k, interpolation="nearest") for k in [1, 5, 95, 99]] |
| | if not distributed: |
| | logger.info( |
| | f"{msg}: avg={1.0/avg:.1f} it/s, " |
| | f"p1={percentiles[0]:.2g}s, p5={percentiles[1]:.2g}s, " |
| | f"p95={percentiles[2]:.2g}s, p99={percentiles[3]:.2g}s." |
| | ) |
| | return |
| | avg_per_gpu = comm.all_gather(avg) |
| | percentiles_per_gpu = comm.all_gather(percentiles) |
| | if comm.get_rank() > 0: |
| | return |
| | for idx, avg, percentiles in zip(count(), avg_per_gpu, percentiles_per_gpu): |
| | logger.info( |
| | f"GPU{idx} {msg}: avg={1.0/avg:.1f} it/s, " |
| | f"p1={percentiles[0]:.2g}s, p5={percentiles[1]:.2g}s, " |
| | f"p95={percentiles[2]:.2g}s, p99={percentiles[3]:.2g}s." |
| | ) |
| |
|
| | def benchmark_dataset(self, num_iter, warmup=5): |
| | """ |
| | Benchmark the speed of taking raw samples from the dataset. |
| | """ |
| |
|
| | def loader(): |
| | while True: |
| | for k in self.sampler: |
| | yield self.dataset[k] |
| |
|
| | self._benchmark(loader(), num_iter, warmup, "Dataset Alone") |
| |
|
| | def benchmark_mapper(self, num_iter, warmup=5): |
| | """ |
| | Benchmark the speed of taking raw samples from the dataset and map |
| | them in a single process. |
| | """ |
| |
|
| | def loader(): |
| | while True: |
| | for k in self.sampler: |
| | yield self.mapper(self.dataset[k]) |
| |
|
| | self._benchmark(loader(), num_iter, warmup, "Single Process Mapper (sec/sample)") |
| |
|
| | def benchmark_workers(self, num_iter, warmup=10): |
| | """ |
| | Benchmark the dataloader by tuning num_workers to [0, 1, self.num_workers]. |
| | """ |
| | candidates = [0, 1] |
| | if self.num_workers not in candidates: |
| | candidates.append(self.num_workers) |
| |
|
| | dataset = MapDataset(self.dataset, self.mapper) |
| | for n in candidates: |
| | loader = build_batch_data_loader( |
| | dataset, |
| | self.sampler, |
| | self.total_batch_size, |
| | num_workers=n, |
| | ) |
| | self._benchmark( |
| | iter(loader), |
| | num_iter * max(n, 1), |
| | warmup * max(n, 1), |
| | f"DataLoader ({n} workers, bs={self.per_gpu_batch_size})", |
| | ) |
| | del loader |
| |
|
| | def benchmark_IPC(self, num_iter, warmup=10): |
| | """ |
| | Benchmark the dataloader where each worker outputs nothing. This |
| | eliminates the IPC overhead compared to the regular dataloader. |
| | |
| | PyTorch multiprocessing's IPC only optimizes for torch tensors. |
| | Large numpy arrays or other data structure may incur large IPC overhead. |
| | """ |
| | n = self.num_workers |
| | dataset = _EmptyMapDataset(MapDataset(self.dataset, self.mapper)) |
| | loader = build_batch_data_loader( |
| | dataset, self.sampler, self.total_batch_size, num_workers=n |
| | ) |
| | self._benchmark( |
| | iter(loader), |
| | num_iter * max(n, 1), |
| | warmup * max(n, 1), |
| | f"DataLoader ({n} workers, bs={self.per_gpu_batch_size}) w/o comm", |
| | ) |
| |
|
| | def benchmark_distributed(self, num_iter, warmup=10): |
| | """ |
| | Benchmark the dataloader in each distributed worker, and log results of |
| | all workers. This helps understand the final performance as well as |
| | the variances among workers. |
| | |
| | It also prints startup time (first iter) of the dataloader. |
| | """ |
| | gpu = comm.get_world_size() |
| | dataset = MapDataset(self.dataset, self.mapper) |
| | n = self.num_workers |
| | loader = build_batch_data_loader( |
| | dataset, self.sampler, self.total_batch_size, num_workers=n |
| | ) |
| |
|
| | timer = Timer() |
| | loader = iter(loader) |
| | next(loader) |
| | startup_time = timer.seconds() |
| | logger.info("Dataloader startup time: {:.2f} seconds".format(startup_time)) |
| |
|
| | comm.synchronize() |
| |
|
| | avg, all_times = self._benchmark(loader, num_iter * max(n, 1), warmup * max(n, 1)) |
| | del loader |
| | self._log_time( |
| | f"DataLoader ({gpu} GPUs x {n} workers, total bs={self.total_batch_size})", |
| | avg, |
| | all_times, |
| | True, |
| | ) |
| |
|