| import os | |
| import torch | |
| import warnings | |
| from .model_minimind import * | |
| from typing import Optional, Tuple, List | |
| from torch import nn | |
| from transformers import CLIPProcessor, CLIPModel | |
| from typing import List | |
| warnings.filterwarnings('ignore') | |
| class VLMConfig(MiniMindConfig): | |
| model_type = "minimind-v" | |
| def __init__( | |
| self, | |
| image_special_token: str = '@' * 196, | |
| image_ids: List = [34] * 196, | |
| **kwargs, | |
| ): | |
| self.image_special_token = image_special_token | |
| self.image_ids = image_ids | |
| super().__init__(**kwargs) | |
| class VisionProj(nn.Module): | |
| def __init__(self, ve_hidden_size=768, hidden_size=512): | |
| super().__init__() | |
| self.ve_hidden_size = ve_hidden_size | |
| self.hidden_size = hidden_size | |
| self.vision_proj = nn.Sequential( | |
| nn.Linear(self.ve_hidden_size, self.hidden_size) | |
| ) | |
| def forward(self, image_encoders): | |
| vision_proj = self.vision_proj(image_encoders) | |
| return vision_proj | |
| # 继承自语言模型 | |
| class MiniMindVLM(MiniMindForCausalLM): | |
| config_class = VLMConfig | |
| def __init__(self, params: VLMConfig = None, vision_model_path="./model/vision_model/clip-vit-base-patch16"): | |
| super().__init__(params) | |
| if not params: params = VLMConfig() | |
| self.params = params | |
| self.vision_encoder, self.processor = self.__class__.get_vision_model(vision_model_path) | |
| self.vision_proj = VisionProj(hidden_size=params.hidden_size) | |
| def get_vision_model(model_path: str): | |
| from transformers import logging as hf_logging | |
| hf_logging.set_verbosity_error() | |
| if not os.path.exists(model_path): | |
| return None, None | |
| model = CLIPModel.from_pretrained(model_path) | |
| processor = CLIPProcessor.from_pretrained(model_path) | |
| # 冻结 vision_encoder 的所有参数 | |
| for param in model.parameters(): | |
| param.requires_grad = False | |
| return model.eval(), processor | |
| def image2tensor(image, processor): | |
| if image.mode in ['RGBA', 'LA']: image = image.convert('RGB') | |
| inputs = processor(images=image, return_tensors="pt")['pixel_values'] | |
| return inputs | |
| def get_image_embeddings(image_tensors, vision_model): | |
| with torch.no_grad(): | |
| outputs = vision_model.vision_model(pixel_values=image_tensors) | |
| img_embedding = outputs.last_hidden_state[:, 1:, :].squeeze() | |
| return img_embedding | |
| def count_vision_proj(self, tokens, h, vision_tensors=None, seqlen=512): | |
| def find_indices(tokens, image_ids): | |
| image_ids_tensor = torch.tensor(image_ids).to(tokens.device) | |
| len_image_ids = len(image_ids) | |
| if len_image_ids > tokens.size(1): | |
| return None | |
| tokens_view = tokens.unfold(1, len_image_ids, 1) | |
| matches = (tokens_view == image_ids_tensor).all(dim=2) | |
| return { | |
| batch_idx: [(idx.item(), idx.item() + len_image_ids - 1) for idx in | |
| matches[batch_idx].nonzero(as_tuple=True)[0]] | |
| for batch_idx in range(tokens.size(0)) if matches[batch_idx].any() | |
| } or None | |
| image_indices = find_indices(tokens, self.params.image_ids) | |
| if vision_tensors is not None and image_indices: | |
| vision_proj = self.vision_proj(vision_tensors) | |
| if len(vision_proj.shape) == 3: | |
| vision_proj = vision_proj.unsqueeze(0) | |
| new_h = [] | |
| for i in range(h.size(0)): | |
| if i in image_indices: | |
| h_i = h[i] | |
| img_idx = 0 | |
| for start_idx, end_idx in image_indices[i]: | |
| if img_idx < vision_proj.size(1): | |
| h_i = torch.cat((h_i[:start_idx], vision_proj[i][img_idx], h_i[end_idx + 1:]), dim=0)[ | |
| :seqlen] | |
| img_idx += 1 | |
| new_h.append(h_i) | |
| else: | |
| new_h.append(h[i]) | |
| return torch.stack(new_h, dim=0) | |
| return h | |
| def forward(self, | |
| input_ids: Optional[torch.Tensor] = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| past_key_values: Optional[List[Tuple[torch.Tensor, torch.Tensor]]] = None, | |
| use_cache: bool = False, | |
| logits_to_keep: Union[int, torch.Tensor] = 0, | |
| pixel_values: Optional[torch.FloatTensor] = None, | |
| **args): | |
| batch_size, seq_length = input_ids.shape | |
| if hasattr(past_key_values, 'layers'): past_key_values = None | |
| past_key_values = past_key_values or [None] * len(self.model.layers) | |
| start_pos = past_key_values[0][0].shape[1] if past_key_values[0] is not None else 0 | |
| hidden_states = self.model.dropout(self.model.embed_tokens(input_ids)) | |
| if pixel_values is not None and start_pos == 0: | |
| if len(pixel_values.shape) == 6: | |
| pixel_values = pixel_values.squeeze(2) | |
| bs, num, c, im_h, im_w = pixel_values.shape | |
| stack_dim = 1 if bs > 1 else 0 | |
| vision_tensors = torch.stack([ | |
| MiniMindVLM.get_image_embeddings(pixel_values[:, i, :, :, :], self.vision_encoder) | |
| for i in range(num) | |
| ], dim=stack_dim) | |
| hidden_states = self.count_vision_proj(tokens=input_ids, h=hidden_states, vision_tensors=vision_tensors, | |
| seqlen=input_ids.shape[1]) | |
| position_embeddings = ( | |
| self.model.freqs_cos[start_pos:start_pos + seq_length], | |
| self.model.freqs_sin[start_pos:start_pos + seq_length] | |
| ) | |
| presents = [] | |
| for layer_idx, (layer, past_key_value) in enumerate(zip(self.model.layers, past_key_values)): | |
| hidden_states, present = layer( | |
| hidden_states, | |
| position_embeddings, | |
| past_key_value=past_key_value, | |
| use_cache=use_cache, | |
| attention_mask=attention_mask | |
| ) | |
| presents.append(present) | |
| hidden_states = self.model.norm(hidden_states) | |
| aux_loss = sum( | |
| layer.mlp.aux_loss | |
| for layer in self.model.layers | |
| if isinstance(layer.mlp, MOEFeedForward) | |
| ) | |
| slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep | |
| logits = self.lm_head(hidden_states[:, slice_indices, :]) | |
| self.OUT.__setitem__('last_hidden_state', hidden_states) | |
| self.OUT.__setitem__('logits', logits) | |
| self.OUT.__setitem__('aux_loss', aux_loss) | |
| self.OUT.__setitem__('past_key_values', presents) | |
| return self.OUT | |