| | |
| | import copy |
| | import torch |
| | import torch.nn.functional as F |
| | from transformers import AutoImageProcessor, Dinov2Config, Dinov2Model, SiglipImageProcessor, SiglipVisionConfig, SiglipVisionModel |
| | from abc import ABC, abstractmethod |
| | import torch.nn as nn |
| |
|
| |
|
| | class ProcessorWrapper: |
| | def __init__( |
| | self, |
| | transform, |
| | height=378, |
| | width=378, |
| | image_mean=[0.48145466, 0.4578275, 0.40821073], |
| | ): |
| | self._crop_size = { |
| | "height": height, |
| | "width": width, |
| | } |
| | self._transforms = transform |
| | |
| | self.image_mean = image_mean |
| |
|
| | @property |
| | def crop_size(self): |
| | return self._crop_size |
| |
|
| | def preprocess(self, image, return_tensors="pt"): |
| | |
| | output = {} |
| | output["pixel_values"] = [self._transforms(image)] |
| | return output |
| |
|
| |
|
| | class BaseVisionTower(nn.Module): |
| | def __init__(self, vision_tower_name, args, delay_load=False): |
| | super().__init__() |
| |
|
| | self.is_loaded = False |
| | self.args = args |
| |
|
| | self.vision_tower_name = vision_tower_name |
| | self.select_layer = args.mm_vision_select_layer |
| | self.select_feature = getattr(args, "mm_vision_select_feature", "patch") |
| | self.unfreeze_mm_vision_tower = getattr(args, "unfreeze_mm_vision_tower", False) |
| | self.delay_load = delay_load |
| |
|
| | @abstractmethod |
| | def load_model(self, device_map=None): |
| | raise NotImplementedError("Subclasses must implement load_model") |
| |
|
| | @abstractmethod |
| | def _forward(self, images): |
| | raise NotImplementedError("Subclasses must implement forward") |
| |
|
| | def forward(self, images): |
| | if type(images) is list: |
| | image_features = [self._forward(image.unsqueeze(0)) for image in images] |
| | else: |
| | image_features = self._forward(images) |
| |
|
| | return image_features |
| |
|
| | @property |
| | def dummy_feature(self): |
| | return torch.zeros(1, self.hidden_size, device=self.device, dtype=self.dtype) |
| |
|
| | @property |
| | def dtype(self): |
| | |
| | if hasattr(self.vision_tower, "dtype"): |
| | return self.vision_tower.dtype |
| | else: |
| | params = list(self.vision_tower.parameters()) |
| | return ( |
| | params[0].dtype if len(params) > 0 else torch.float32 |
| | ) |
| |
|
| | @property |
| | def device(self): |
| | |
| | if hasattr(self.vision_tower, "device"): |
| | return self.vision_tower.device |
| | else: |
| | params = list(self.vision_tower.parameters()) |
| | return ( |
| | params[0].device if len(params) > 0 else torch.device("cpu") |
| | ) |
| |
|
| | @property |
| | def config(self): |
| | if self.is_loaded: |
| | return self.vision_tower.config |
| | else: |
| | return self.cfg_only |
| |
|
| | @property |
| | def hidden_size(self): |
| | try: |
| | return self.config.hidden_size |
| | except: |
| | return self._hidden_size |
| |
|
| | @property |
| | def image_size(self): |
| | |
| | try: |
| | return self.config.image_size |
| | except: |
| | return self._image_size |
| |
|
| | @property |
| | def patch_size(self): |
| | |
| | try: |
| | return self.config.patch_size |
| | except: |
| | return self._patch_size |
| |
|
| | @property |
| | def num_patches_per_side(self): |
| | if self._interp_size is not None: |
| | return int(self._interp_size**0.5) |
| | try: |
| | return self.image_size // self.patch_size |
| | except: |
| | return self._num_patches_per_side |
| |
|
| | @property |
| | def num_patches(self): |
| | if self._interp_size is not None: |
| | return self._interp_size |
| | try: |
| | return self.num_patches_per_side**2 |
| | except: |
| | return self._num_patches |
| |
|
| |
|
| | class DinoVisionTower(BaseVisionTower): |
| | def __init__(self, vision_tower, args, delay_load=False): |
| | super(DinoVisionTower, self).__init__(vision_tower, args, delay_load) |
| |
|
| | model_path = "facebook/dinov2-giant" |
| | base_model_name, res, interp = model_path, 378, 576 |
| | self._vision_tower_name = vision_tower |
| | self.vision_tower_name = base_model_name |
| | self._image_size = res |
| | self._interp_size = interp |
| | self._patch_size = 14 |
| |
|
| | if not self.delay_load: |
| | self.load_model() |
| | else: |
| | self.cfg_only = Dinov2Config.from_pretrained(self.vision_tower_name) |
| |
|
| | def load_model(self, device_map=None): |
| |
|
| | self.vision_tower = Dinov2Model.from_pretrained(self.vision_tower_name) |
| | """ValueError: Dinov2Model does not support `device_map='auto'`. To implement support, the model class needs to implement the `_no_split_modules` attribute.""" |
| | self.vision_tower._no_split_modules = ["Dinov2SwiGLUFFN"] |
| |
|
| | _image_size = self.vision_tower.config.image_size |
| | if self._image_size is None: |
| | self._image_size = _image_size |
| |
|
| | |
| | default_shortest_ratio = 8 / 7 |
| | |
| | shortest_edge = self._image_size |
| |
|
| | processor = AutoImageProcessor.from_pretrained( |
| | self.vision_tower_name, |
| | crop_size=dict(height=self._image_size, width=self._image_size), |
| | size=dict(shortest_edge=shortest_edge), |
| | ) |
| | self.image_processor = processor |
| |
|
| | |
| | self._hidden_size = ( |
| | self.vision_tower.embeddings.patch_embeddings.projection.out_channels |
| | ) |
| | |
| | self._patch_size = ( |
| | self.vision_tower.embeddings.patch_embeddings.projection.stride[0] |
| | ) |
| |
|
| | |
| |
|
| | self.vision_tower.requires_grad_(self.unfreeze_mm_vision_tower) |
| | self.is_loaded = True |
| |
|
| | @property |
| | def image_size(self): |
| | return self._image_size |
| |
|
| | def feature_select(self, outputs): |
| | sequence_output = outputs[ |
| | "last_hidden_state" |
| | ] |
| |
|
| | if self.select_feature == "cls_patch": |
| | image_features = sequence_output |
| | elif self.select_feature == "patch": |
| | image_features = sequence_output[:, 1:] |
| | elif self.select_feature == "cls": |
| | image_features = sequence_output[:, 0] |
| | else: |
| | raise ValueError(f"Unexpected select feature: {self.select_feature}") |
| | return image_features |
| |
|
| | def interpolate(self, image_features): |
| | if self._interp_size is None: |
| | return image_features |
| |
|
| | b, num_tokens, dim = image_features.shape |
| |
|
| | if num_tokens != self.num_patches: |
| | target_h = target_w = int(self._interp_size**0.5) |
| | h = w = int(num_tokens**0.5) |
| |
|
| | image_features = image_features.view(b, h, w, dim) |
| | image_features = image_features.permute(0, 3, 1, 2).contiguous() |
| |
|
| | image_features = F.interpolate( |
| | image_features.to(torch.float32), |
| | size=(target_h, target_w), |
| | mode="bilinear", |
| | align_corners=False, |
| | ).to(image_features.dtype) |
| |
|
| | |
| | image_features = image_features.permute(0, 2, 3, 1).contiguous() |
| |
|
| | |
| | image_features = image_features.flatten(1, 2) |
| |
|
| | return image_features |
| |
|
| | def _forward(self, images): |
| | |
| | with torch.set_grad_enabled(self.unfreeze_mm_vision_tower): |
| | image_forward_outs = self.vision_tower.forward( |
| | images.to(device=self.device, dtype=self.dtype) |
| | ) |
| | |
| | image_features = self.feature_select(image_forward_outs).to(images.dtype) |
| | |
| | interp_features = self.interpolate(image_features) |
| | |
| | return interp_features |
| |
|
| | @property |
| | def num_patches_per_side(self): |
| | return int(self.num_patches**0.5) |
| |
|
| | @property |
| | def num_patches(self): |
| | if self._interp_size is None: |
| | return (self._image_size // self._patch_size) ** 2 |
| | else: |
| | return self._interp_size |
| | |
| | |
| | |
| | class SiglipVisionTower(BaseVisionTower): |
| | def __init__(self, vision_tower_name, args, delay_load=False): |
| | super(SiglipVisionTower, self).__init__(vision_tower_name, args, delay_load) |
| | |
| | model_path = "google/siglip-so400m-patch14-384" |
| | base_model_name, res, interp = model_path, 384, 576 |
| | self.vision_tower_name = base_model_name |
| | self._image_size = res if res is not None else 512 |
| | self._interp_size = interp |
| | if not self.delay_load: |
| | self.load_model() |
| | elif self.unfreeze_mm_vision_tower: |
| | self.load_model() |
| | else: |
| | self._hidden_size = 1152 |
| |
|
| | def load_model(self, device_map=None): |
| | self.vision_model = "siglip" |
| | |
| | self.vision_tower = SiglipVisionModel.from_pretrained(self.vision_tower_name) |
| |
|
| | |
| | self.vision_tower.output_tokens = True |
| |
|
| | self._hidden_size = self.vision_tower.config.hidden_size |
| | self._image_size = self.vision_tower.config.image_size |
| | self._patch_size = self.vision_tower.config.patch_size |
| | self.image_processor = SiglipImageProcessor.from_pretrained( |
| | self.vision_tower_name |
| | ) |
| |
|
| | self.vision_tower.requires_grad_(self.unfreeze_mm_vision_tower) |
| | self.is_loaded = True |
| |
|
| | def interpolate(self, image_features): |
| | if self._interp_size is None: |
| | return image_features |
| |
|
| | b, num_tokens, dim = image_features.shape |
| |
|
| | if num_tokens != self.num_patches: |
| | target_h = target_w = int(self._interp_size**0.5) |
| | h = w = int(num_tokens**0.5) |
| |
|
| | image_features = image_features.view(b, h, w, dim) |
| | image_features = image_features.permute(0, 3, 1, 2).contiguous() |
| |
|
| | image_features = F.interpolate( |
| | image_features.to(torch.float32), |
| | size=(target_h, target_w), |
| | mode="bilinear", |
| | align_corners=False, |
| | ).to(image_features.dtype) |
| |
|
| | |
| | image_features = image_features.permute(0, 2, 3, 1).contiguous() |
| |
|
| | |
| | image_features = image_features.flatten(1, 2) |
| |
|
| | return image_features |
| |
|
| | def _forward(self, images, interpolate_token=576): |
| | with torch.set_grad_enabled(self.unfreeze_mm_vision_tower): |
| | image_features = self.vision_tower.forward( |
| | images.to(device=self.device, dtype=self.dtype), |
| | output_hidden_states=True, |
| | ).hidden_states[-1] |
| | interp_features = self.interpolate(image_features) |
| | return interp_features |
| |
|
| |
|
| | def build_vision_tower_aux_list(vision_tower_cfg, **kwargs): |
| | vision_tower_aux_name_list = getattr( |
| | vision_tower_cfg, |
| | "mm_vision_tower_aux_list", |
| | getattr(vision_tower_cfg, "vision_tower_aux_list", None), |
| | ) |
| | vision_tower_aux_token_len_list = getattr( |
| | vision_tower_cfg, |
| | "mm_vision_tower_aux_token_len_list", |
| | getattr(vision_tower_cfg, "vision_tower_aux_token_len_list", None), |
| | ) |
| | vision_tower_aux_list = [] |
| | for vision_tower_aux_name, vision_tower_aux_token_len in zip( |
| | vision_tower_aux_name_list, vision_tower_aux_token_len_list |
| | ): |
| | config = copy.deepcopy(vision_tower_cfg) |
| | vision_tower_aux_name += "-interp{}".format(vision_tower_aux_token_len) |
| | if "siglip" in vision_tower_aux_name.lower(): |
| | vision_tower_aux_list.append( |
| | SiglipVisionTower(vision_tower_aux_name, args=config, **kwargs) |
| | ) |
| |
|
| | |
| | elif "dinov2" in vision_tower_aux_name.lower(): |
| | vision_tower_aux_list.append( |
| | DinoVisionTower(vision_tower_aux_name, args=config, **kwargs) |
| | ) |
| | else: |
| | raise ValueError(f"Unknown vision tower: {vision_tower_aux_name}") |
| | return vision_tower_aux_list |