| | import torch |
| | import torch.nn as nn |
| | from torch.nn import functional as F |
| | from torch.nn.utils.rnn import pad_sequence |
| |
|
| | try: |
| | import torch.distributed.nn |
| | from torch import distributed as dist |
| |
|
| | has_distributed = True |
| | except ImportError: |
| | has_distributed = False |
| |
|
| | try: |
| | import horovod.torch as hvd |
| | except ImportError: |
| | hvd = None |
| |
|
| |
|
| | def gather_features( |
| | image_features, |
| | text_features, |
| | local_loss=False, |
| | gather_with_grad=False, |
| | rank=0, |
| | world_size=1, |
| | use_horovod=False |
| | ): |
| | assert has_distributed, 'torch.distributed did not import correctly, please use a PyTorch version with support.' |
| | if use_horovod: |
| | assert hvd is not None, 'Please install horovod' |
| | if gather_with_grad: |
| | all_image_features = hvd.allgather(image_features) |
| | all_text_features = hvd.allgather(text_features) |
| | else: |
| | with torch.no_grad(): |
| | all_image_features = hvd.allgather(image_features) |
| | all_text_features = hvd.allgather(text_features) |
| | if not local_loss: |
| | |
| | gathered_image_features = list(all_image_features.chunk(world_size, dim=0)) |
| | gathered_text_features = list(all_text_features.chunk(world_size, dim=0)) |
| | gathered_image_features[rank] = image_features |
| | gathered_text_features[rank] = text_features |
| | all_image_features = torch.cat(gathered_image_features, dim=0) |
| | all_text_features = torch.cat(gathered_text_features, dim=0) |
| | else: |
| | |
| | if gather_with_grad: |
| | all_image_features = torch.cat(torch.distributed.nn.all_gather(image_features), dim=0) |
| | all_text_features = torch.cat(torch.distributed.nn.all_gather(text_features), dim=0) |
| | else: |
| | gathered_image_features = [torch.zeros_like(image_features) for _ in range(world_size)] |
| | gathered_text_features = [torch.zeros_like(text_features) for _ in range(world_size)] |
| | dist.all_gather(gathered_image_features, image_features) |
| | dist.all_gather(gathered_text_features, text_features) |
| | if not local_loss: |
| | |
| | gathered_image_features[rank] = image_features |
| | gathered_text_features[rank] = text_features |
| | all_image_features = torch.cat(gathered_image_features, dim=0) |
| | all_text_features = torch.cat(gathered_text_features, dim=0) |
| |
|
| | return all_image_features, all_text_features |
| |
|
| |
|
| | class ClipLoss(nn.Module): |
| |
|
| | def __init__( |
| | self, |
| | local_loss=False, |
| | gather_with_grad=False, |
| | cache_labels=False, |
| | rank=0, |
| | world_size=1, |
| | use_horovod=False, |
| | ): |
| | super().__init__() |
| | self.local_loss = local_loss |
| | self.gather_with_grad = gather_with_grad |
| | self.cache_labels = cache_labels |
| | self.rank = rank |
| | self.world_size = world_size |
| | self.use_horovod = use_horovod |
| |
|
| | |
| | self.prev_num_logits = 0 |
| | self.labels = {} |
| |
|
| | def get_ground_truth(self, device, num_logits) -> torch.Tensor: |
| | |
| | if self.prev_num_logits != num_logits or device not in self.labels: |
| | labels = torch.arange(num_logits, device=device, dtype=torch.long) |
| | if self.world_size > 1 and self.local_loss: |
| | labels = labels + num_logits * self.rank |
| | if self.cache_labels: |
| | self.labels[device] = labels |
| | self.prev_num_logits = num_logits |
| | else: |
| | labels = self.labels[device] |
| | return labels |
| |
|
| | def get_logits(self, image_features, text_features, logit_scale): |
| | if self.world_size > 1: |
| | all_image_features, all_text_features = gather_features( |
| | image_features, text_features, |
| | self.local_loss, self.gather_with_grad, self.rank, self.world_size, self.use_horovod) |
| |
|
| | if self.local_loss: |
| | logits_per_image = logit_scale * image_features @ all_text_features.T |
| | logits_per_text = logit_scale * text_features @ all_image_features.T |
| | else: |
| | logits_per_image = logit_scale * all_image_features @ all_text_features.T |
| | logits_per_text = logits_per_image.T |
| | else: |
| | logits_per_image = logit_scale * image_features @ text_features.T |
| | logits_per_text = logit_scale * text_features @ image_features.T |
| | |
| | return logits_per_image, logits_per_text |
| |
|
| | def forward(self, image_features, text_features, logit_scale, output_dict=False): |
| | device = image_features.device |
| | logits_per_image, logits_per_text = self.get_logits(image_features, text_features, logit_scale) |
| |
|
| | labels = self.get_ground_truth(device, logits_per_image.shape[0]) |
| |
|
| | total_loss = ( |
| | F.cross_entropy(logits_per_image, labels) + |
| | F.cross_entropy(logits_per_text, labels) |
| | ) / 2 |
| | return total_loss |
| |
|
| | class PreferenceLoss(nn.Module): |
| |
|
| | def forward(self, logits_per_image, num_images, labels): |
| |
|
| | paired_logits_list = [logit[:,i] for i, logit in enumerate(logits_per_image.split(num_images.tolist()))] |
| | paired_logits = pad_sequence(paired_logits_list, batch_first=True, padding_value=-999) |
| |
|
| | ce_loss = F.cross_entropy(paired_logits, labels) |
| | return ce_loss |
| |
|
| | class HPSLoss(nn.Module): |
| |
|
| | def forward(self, text_logits, labels): |
| |
|
| | device = text_logits.device |
| | text_0_logits, text_1_logits = text_logits.chunk(2, dim=-1) |
| | label_0, label_1 = labels.chunk(2, dim=-1) |
| |
|
| | index = torch.arange(text_0_logits.shape[0], device=device, dtype=torch.long) |
| | text_0_logits = text_0_logits[index, index] |
| | text_1_logits = text_1_logits[index, index] |
| | text_logits = torch.stack([text_0_logits, text_1_logits], dim=-1) |
| | text_0_labels = torch.zeros(text_logits.shape[0], device=device, dtype=torch.long) |
| | text_1_labels = text_0_labels + 1 |
| |
|
| | text_0_loss = torch.nn.functional.cross_entropy(text_logits, text_0_labels, reduction="none") |
| | text_1_loss = torch.nn.functional.cross_entropy(text_logits, text_1_labels, reduction="none") |
| |
|
| | text_loss = label_0 * text_0_loss + label_1 * text_1_loss |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | text_loss = text_loss.sum() |
| | return text_loss |
| |
|
| | class RankingLoss(nn.Module): |
| |
|
| | def forward(self, logits_per_image, num_images, labels, margin = 1.0): |
| | paired_logits_list = [logit[:,i] for i, logit in enumerate(logits_per_image.split(num_images.tolist()))] |
| | label_list = [label for label in labels.split(num_images.tolist())] |
| | |
| |
|
| | paired_logits = pad_sequence(paired_logits_list, batch_first=True, padding_value=-1) |
| | padded_labels = pad_sequence(label_list, batch_first=True, padding_value=10) |
| |
|
| | |
| |
|
| | diff = paired_logits.unsqueeze(1) - paired_logits.unsqueeze(2) |
| | |
| | |
| | diff_label = - (padded_labels.unsqueeze(1) - padded_labels.unsqueeze(2)) |
| | mask = torch.triu(torch.ones(diff.shape[1], diff.shape[1]), diagonal=1).bool().detach() |
| |
|
| | loss = torch.clamp(margin - torch.mul(diff[:, ~mask],diff_label[:,~mask]), min=0).mean() |
| | return loss |
| |
|
| | class CoCaLoss(ClipLoss): |
| | def __init__( |
| | self, |
| | caption_loss_weight, |
| | clip_loss_weight, |
| | pad_id=0, |
| | local_loss=False, |
| | gather_with_grad=False, |
| | cache_labels=False, |
| | rank=0, |
| | world_size=1, |
| | use_horovod=False, |
| | ): |
| | super().__init__( |
| | local_loss=local_loss, |
| | gather_with_grad=gather_with_grad, |
| | cache_labels=cache_labels, |
| | rank=rank, |
| | world_size=world_size, |
| | use_horovod=use_horovod |
| | ) |
| |
|
| | self.clip_loss_weight = clip_loss_weight |
| | self.caption_loss_weight = caption_loss_weight |
| | self.caption_loss = nn.CrossEntropyLoss(ignore_index=pad_id) |
| |
|
| | def forward(self, image_features, text_features, logits, labels, logit_scale, output_dict=False): |
| | clip_loss = super().forward(image_features, text_features, logit_scale) |
| | clip_loss = self.clip_loss_weight * clip_loss |
| |
|
| | caption_loss = self.caption_loss( |
| | logits.permute(0, 2, 1), |
| | labels, |
| | ) |
| | caption_loss = caption_loss * self.caption_loss_weight |
| |
|
| | if output_dict: |
| | return {"contrastive_loss": clip_loss, "caption_loss": caption_loss} |
| |
|
| | return clip_loss, caption_loss |
| |
|
| |
|
| | class DistillClipLoss(ClipLoss): |
| |
|
| | def dist_loss(self, teacher_logits, student_logits): |
| | return -(teacher_logits.softmax(dim=1) * student_logits.log_softmax(dim=1)).sum(dim=1).mean(dim=0) |
| |
|
| | def forward( |
| | self, |
| | image_features, |
| | text_features, |
| | logit_scale, |
| | dist_image_features, |
| | dist_text_features, |
| | dist_logit_scale, |
| | output_dict=False, |
| | ): |
| | logits_per_image, logits_per_text = \ |
| | self.get_logits(image_features, text_features, logit_scale) |
| |
|
| | dist_logits_per_image, dist_logits_per_text = \ |
| | self.get_logits(dist_image_features, dist_text_features, dist_logit_scale) |
| |
|
| | labels = self.get_ground_truth(image_features.device, logits_per_image.shape[0]) |
| |
|
| | contrastive_loss = ( |
| | F.cross_entropy(logits_per_image, labels) + |
| | F.cross_entropy(logits_per_text, labels) |
| | ) / 2 |
| |
|
| | distill_loss = ( |
| | self.dist_loss(dist_logits_per_image, logits_per_image) + |
| | self.dist_loss(dist_logits_per_text, logits_per_text) |
| | ) / 2 |
| |
|
| | if output_dict: |
| | return {"contrastive_loss": contrastive_loss, "distill_loss": distill_loss} |
| |
|
| | return contrastive_loss, distill_loss |
| |
|