|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
|
from abc import ABCMeta, abstractmethod |
|
|
from typing import Any, List, Optional, Sequence, Union |
|
|
|
|
|
from torch import Tensor |
|
|
|
|
|
from mmengine.dist import (broadcast_object_list, collect_results, |
|
|
is_main_process) |
|
|
from mmengine.fileio import dump |
|
|
from mmengine.logging import print_log |
|
|
from mmengine.registry import METRICS |
|
|
from mmengine.structures import BaseDataElement |
|
|
|
|
|
|
|
|
class BaseMetric(metaclass=ABCMeta): |
|
|
"""Base class for a metric. |
|
|
|
|
|
The metric first processes each batch of data_samples and predictions, |
|
|
and appends the processed results to the results list. Then it |
|
|
collects all results together from all ranks if distributed training |
|
|
is used. Finally, it computes the metrics of the entire dataset. |
|
|
|
|
|
A subclass of class:`BaseMetric` should assign a meaningful value to the |
|
|
class attribute `default_prefix`. See the argument `prefix` for details. |
|
|
|
|
|
Args: |
|
|
collect_device (str): Device name used for collecting results from |
|
|
different ranks during distributed training. Must be 'cpu' or |
|
|
'gpu'. Defaults to 'cpu'. |
|
|
prefix (str, optional): The prefix that will be added in the metric |
|
|
names to disambiguate homonymous metrics of different evaluators. |
|
|
If prefix is not provided in the argument, self.default_prefix |
|
|
will be used instead. Default: None |
|
|
collect_dir: (str, optional): Synchronize directory for collecting data |
|
|
from different ranks. This argument should only be configured when |
|
|
``collect_device`` is 'cpu'. Defaults to None. |
|
|
`New in version 0.7.3.` |
|
|
""" |
|
|
|
|
|
default_prefix: Optional[str] = None |
|
|
|
|
|
def __init__(self, |
|
|
collect_device: str = 'cpu', |
|
|
prefix: Optional[str] = None, |
|
|
collect_dir: Optional[str] = None) -> None: |
|
|
if collect_dir is not None and collect_device != 'cpu': |
|
|
raise ValueError('`collec_dir` could only be configured when ' |
|
|
"`collect_device='cpu'`") |
|
|
|
|
|
self._dataset_meta: Union[None, dict] = None |
|
|
self.collect_device = collect_device |
|
|
self.results: List[Any] = [] |
|
|
self.prefix = prefix or self.default_prefix |
|
|
self.collect_dir = collect_dir |
|
|
|
|
|
if self.prefix is None: |
|
|
print_log( |
|
|
'The prefix is not set in metric class ' |
|
|
f'{self.__class__.__name__}.', |
|
|
logger='current', |
|
|
level=logging.WARNING) |
|
|
|
|
|
@property |
|
|
def dataset_meta(self) -> Optional[dict]: |
|
|
"""Optional[dict]: Meta info of the dataset.""" |
|
|
return self._dataset_meta |
|
|
|
|
|
@dataset_meta.setter |
|
|
def dataset_meta(self, dataset_meta: dict) -> None: |
|
|
"""Set the dataset meta info to the metric.""" |
|
|
self._dataset_meta = dataset_meta |
|
|
|
|
|
@abstractmethod |
|
|
def process(self, data_batch: Any, data_samples: Sequence[dict]) -> None: |
|
|
"""Process one batch of data samples and predictions. The processed |
|
|
results should be stored in ``self.results``, which will be used to |
|
|
compute the metrics when all batches have been processed. |
|
|
|
|
|
Args: |
|
|
data_batch (Any): A batch of data from the dataloader. |
|
|
data_samples (Sequence[dict]): A batch of outputs from |
|
|
the model. |
|
|
""" |
|
|
|
|
|
@abstractmethod |
|
|
def compute_metrics(self, results: list) -> dict: |
|
|
"""Compute the metrics from processed results. |
|
|
|
|
|
Args: |
|
|
results (list): The processed results of each batch. |
|
|
|
|
|
Returns: |
|
|
dict: The computed metrics. The keys are the names of the metrics, |
|
|
and the values are corresponding results. |
|
|
""" |
|
|
|
|
|
def evaluate(self, size: int) -> dict: |
|
|
"""Evaluate the model performance of the whole dataset after processing |
|
|
all batches. |
|
|
|
|
|
Args: |
|
|
size (int): Length of the entire validation dataset. When batch |
|
|
size > 1, the dataloader may pad some data samples to make |
|
|
sure all ranks have the same length of dataset slice. The |
|
|
``collect_results`` function will drop the padded data based on |
|
|
this size. |
|
|
|
|
|
Returns: |
|
|
dict: Evaluation metrics dict on the val dataset. The keys are the |
|
|
names of the metrics, and the values are corresponding results. |
|
|
""" |
|
|
if len(self.results) == 0: |
|
|
print_log( |
|
|
f'{self.__class__.__name__} got empty `self.results`. Please ' |
|
|
'ensure that the processed results are properly added into ' |
|
|
'`self.results` in `process` method.', |
|
|
logger='current', |
|
|
level=logging.WARNING) |
|
|
|
|
|
if self.collect_device == 'cpu': |
|
|
results = collect_results( |
|
|
self.results, |
|
|
size, |
|
|
self.collect_device, |
|
|
tmpdir=self.collect_dir) |
|
|
else: |
|
|
results = collect_results(self.results, size, self.collect_device) |
|
|
|
|
|
if is_main_process(): |
|
|
|
|
|
results = _to_cpu(results) |
|
|
_metrics = self.compute_metrics(results) |
|
|
|
|
|
if self.prefix: |
|
|
_metrics = { |
|
|
'/'.join((self.prefix, k)): v |
|
|
for k, v in _metrics.items() |
|
|
} |
|
|
metrics = [_metrics] |
|
|
else: |
|
|
metrics = [None] |
|
|
|
|
|
broadcast_object_list(metrics) |
|
|
|
|
|
|
|
|
self.results.clear() |
|
|
return metrics[0] |
|
|
|
|
|
|
|
|
@METRICS.register_module() |
|
|
class DumpResults(BaseMetric): |
|
|
"""Dump model predictions to a pickle file for offline evaluation. |
|
|
|
|
|
Args: |
|
|
out_file_path (str): Path of the dumped file. Must end with '.pkl' |
|
|
or '.pickle'. |
|
|
collect_device (str): Device name used for collecting results from |
|
|
different ranks during distributed training. Must be 'cpu' or |
|
|
'gpu'. Defaults to 'cpu'. |
|
|
collect_dir: (str, optional): Synchronize directory for collecting data |
|
|
from different ranks. This argument should only be configured when |
|
|
``collect_device`` is 'cpu'. Defaults to None. |
|
|
`New in version 0.7.3.` |
|
|
""" |
|
|
|
|
|
def __init__(self, |
|
|
out_file_path: str, |
|
|
collect_device: str = 'cpu', |
|
|
collect_dir: Optional[str] = None) -> None: |
|
|
super().__init__( |
|
|
collect_device=collect_device, collect_dir=collect_dir) |
|
|
if not out_file_path.endswith(('.pkl', '.pickle')): |
|
|
raise ValueError('The output file must be a pkl file.') |
|
|
self.out_file_path = out_file_path |
|
|
|
|
|
def process(self, data_batch: Any, predictions: Sequence[dict]) -> None: |
|
|
"""transfer tensors in predictions to CPU.""" |
|
|
self.results.extend(_to_cpu(predictions)) |
|
|
|
|
|
def compute_metrics(self, results: list) -> dict: |
|
|
"""dump the prediction results to a pickle file.""" |
|
|
dump(results, self.out_file_path) |
|
|
print_log( |
|
|
f'Results has been saved to {self.out_file_path}.', |
|
|
logger='current') |
|
|
return {} |
|
|
|
|
|
|
|
|
def _to_cpu(data: Any) -> Any: |
|
|
"""transfer all tensors and BaseDataElement to cpu.""" |
|
|
if isinstance(data, (Tensor, BaseDataElement)): |
|
|
return data.to('cpu') |
|
|
elif isinstance(data, list): |
|
|
return [_to_cpu(d) for d in data] |
|
|
elif isinstance(data, tuple): |
|
|
return tuple(_to_cpu(d) for d in data) |
|
|
elif isinstance(data, dict): |
|
|
return {k: _to_cpu(v) for k, v in data.items()} |
|
|
else: |
|
|
return data |
|
|
|