| from typing import Optional, Any, Callable, List |
|
|
| import torch |
| import torchmetrics |
|
|
| from torchmetrics.metric import Metric |
| from torchmetrics import AUROC, PrecisionRecallCurve |
| from torchmetrics.functional import auroc |
| from torchmetrics.utilities.data import dim_zero_cat |
| import logging |
| import numpy as np |
|
|
|
|
| class PR_AUC(Metric): |
| def __init__(self, num_classes, compute_on_step=False, dist_sync_on_step=False): |
| super().__init__(compute_on_step=compute_on_step, dist_sync_on_step=dist_sync_on_step) |
| self.add_state("prauc", default=[], dist_reduce_fx='cat') |
| self.pr_curve = PrecisionRecallCurve(num_classes=num_classes).to(self.device) |
| self.auc = torchmetrics.AUC().to(self.device) |
|
|
| def update(self, prediction: torch.Tensor, target: torch.Tensor): |
| precision, recall, thresholds = self.pr_curve(prediction, target) |
| auc_values = [self.auc(r, p) for r, p in zip(recall, precision)] |
|
|
| pr_auc = torch.mean(torch.tensor([v for v in auc_values if not v.isnan()])).to(self.device) |
| self.prauc += [pr_auc.detach()] |
|
|
| def compute(self): |
| return torch.mean(self.prauc.detach()) |
|
|
|
|
| class PR_AUCPerBucket(PR_AUC): |
| def __init__(self, num_classes, bucket, compute_on_step=False, dist_sync_on_step=False): |
| super().__init__(num_classes=len(bucket), compute_on_step=compute_on_step, dist_sync_on_step=dist_sync_on_step) |
| self.bucket = set(bucket) |
| self.num_classes = num_classes |
|
|
| def update(self, prediction: torch.Tensor, target: torch.Tensor): |
|
|
| mask = np.zeros((self.num_classes), dtype=bool) |
| for c in range(self.num_classes): |
| if c in self.bucket: |
| mask[c] = True |
| filtered_target = target[:, mask] |
| filtered_preds = prediction[:, mask] |
|
|
| if len((filtered_target > 0).nonzero()) > 0: |
| precision, recall, thresholds = self.pr_curve(filtered_preds, filtered_target) |
| auc_values = [self.auc(r, p) for r, p in zip(recall, precision)] |
|
|
| pr_auc = torch.mean(torch.tensor([v for v in auc_values if not v.isnan()])).to(self.device) |
| self.prauc += [pr_auc.detach()] |
|
|
|
|
| def calculate_pr_auc(prediction: torch.Tensor, target: torch.Tensor, num_classes, device): |
| pr_curve = PrecisionRecallCurve(num_classes=num_classes).to(device) |
| auc = torchmetrics.AUC().to(device) |
|
|
| precision, recall, thresholds = pr_curve(prediction, target) |
| auc_values = [auc(r, p) for r, p in zip(recall, precision)] |
|
|
| pr_auc = torch.mean(torch.tensor([v for v in auc_values if not v.isnan()])).to(device) |
| return pr_auc.detach() |
|
|
|
|
| class FilteredAUROC(AUROC): |
| def compute(self) -> torch.Tensor: |
|
|
| preds = dim_zero_cat(self.preds) |
| target = dim_zero_cat(self.target) |
|
|
| mask = np.ones((self.num_classes), dtype=bool) |
| for c in range(self.num_classes): |
| if torch.max(target[:, c]) == 0: |
| mask[c] = False |
| filtered_target = target[:, mask] |
| filtered_preds = preds[:, mask] |
|
|
| num_filtered_cols = np.count_nonzero(mask == False) |
| logging.info(f"{num_filtered_cols} columns not considered for ROC AUC calculation!") |
|
|
| return _auroc_compute( |
| filtered_preds, |
| filtered_target, |
| self.mode, |
| self.num_classes - num_filtered_cols, |
| self.pos_label, |
| self.average, |
| self.max_fpr, |
| ) |
|
|
|
|
| class FilteredAUROCPerBucket(AUROC): |
| def __init__( |
| self, |
| bucket: List[int], |
| num_classes: Optional[int] = None, |
| pos_label: Optional[int] = None, |
| average: Optional[str] = "macro", |
| max_fpr: Optional[float] = None, |
| compute_on_step: bool = True, |
| dist_sync_on_step: bool = False, |
| process_group: Optional[Any] = None, |
| dist_sync_fn: Callable = None |
| ): |
| super().__init__(num_classes, pos_label, average, max_fpr, compute_on_step, dist_sync_on_step, process_group, |
| dist_sync_fn) |
| self.bucket = set(bucket) |
|
|
| def compute(self) -> torch.Tensor: |
|
|
| preds = dim_zero_cat(self.preds) |
| target = dim_zero_cat(self.target) |
|
|
| mask = np.zeros((self.num_classes), dtype=bool) |
| for c in range(self.num_classes): |
| if torch.max(target[:, c]) > 0 and c in self.bucket: |
| mask[c] = True |
| filtered_target = target[:, mask] |
| filtered_preds = preds[:, mask] |
|
|
| num_filtered_cols = np.count_nonzero(mask == False) |
| logging.info(f"{num_filtered_cols} columns not considered for ROC AUC calculation!") |
|
|
| return _auroc_compute( |
| filtered_preds, |
| filtered_target, |
| self.mode, |
| self.num_classes - num_filtered_cols, |
| self.pos_label, |
| self.average, |
| self.max_fpr, |
| ) |
|
|