| | from typing import List, Optional, Tuple, Union |
| |
|
| | import torch |
| | import torch.nn as nn |
| | import math |
| | import pdb |
| | from typing import Dict, Any |
| | from PIL import Image |
| |
|
| | from transformers import AutoConfig, AutoModelForCausalLM, PretrainedConfig, PreTrainedModel |
| | |
| |
|
| | from transformers.modeling_outputs import CausalLMOutputWithPast |
| |
|
| | from .llava_arch import LlavaMetaModel, LlavaMetaForCausalLM |
| |
|
| | from transformers.cache_utils import Cache, DynamicCache |
| |
|
| | from transformers.generation.utils import GenerationConfig |
| |
|
| | import sys |
| | from .modeling_phi import PhiForCausalLM, PhiModel, PhiConfig |
| | from .generation_utils import build_allava_input |
| |
|
| |
|
| |
|
| |
|
| | |
| |
|
| | class LlavaPhiConfig(PhiConfig): |
| | model_type = "llava_phi" |
| |
|
| | class LlavaPhiModel(LlavaMetaModel, PhiModel): |
| | config_class = LlavaPhiConfig |
| |
|
| | def __init__(self, config: PhiConfig): |
| | super(LlavaPhiModel, self).__init__(config) |
| |
|
| |
|
| |
|
| | class LlavaPhiForCausalLM(PhiForCausalLM, LlavaMetaForCausalLM): |
| | config_class = LlavaPhiConfig |
| |
|
| | def __init__(self, config, init_vision_encoder_from_ckpt=True): |
| | |
| | config._attn_implementation = "flash_attention_2" |
| |
|
| | super(PhiForCausalLM, self).__init__(config) |
| | |
| | self.model = LlavaPhiModel(config) |
| | if hasattr(self.model, '_use_flash_attention_2'): |
| | assert self.model._use_flash_attention_2, 'flash attn is not enabled. check it out!' |
| | self.vocab_size = config.vocab_size |
| | self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) |
| | |
| | if init_vision_encoder_from_ckpt: |
| | vision_tower = self.get_vision_tower() |
| | print(f'loading from CLIP first. This should only be used at inference!!!') |
| | vision_tower.load_model() |
| | |
| | |
| | self.post_init() |
| |
|
| | def get_model(self): |
| | return self.model |
| | |
| | def get_tokenizer(self): |
| | return self.tokenizer |
| | |
| | def get_processor(self): |
| | return self.model.vision_tower.image_processor |
| | |
| |
|
| | def forward( |
| | self, |
| | input_ids: torch.LongTensor = None, |
| | attention_mask: Optional[torch.Tensor] = None, |
| | position_ids: Optional[torch.LongTensor] = None, |
| | past_key_values: Optional[List[torch.FloatTensor]] = None, |
| | inputs_embeds: Optional[torch.FloatTensor] = None, |
| | labels: Optional[torch.LongTensor] = None, |
| | use_cache: Optional[bool] = None, |
| | output_attentions: Optional[bool] = None, |
| | output_hidden_states: Optional[bool] = None, |
| | images: Optional[torch.FloatTensor] = None, |
| | return_dict: Optional[bool] = None, |
| | ) -> Union[Tuple, CausalLMOutputWithPast]: |
| | |
| |
|
| | if inputs_embeds is None: |
| | ( |
| | input_ids, |
| | position_ids, |
| | attention_mask, |
| | past_key_values, |
| | inputs_embeds, |
| | labels |
| | |
| | ) = self.prepare_inputs_labels_for_multimodal_new( |
| | input_ids, |
| | position_ids, |
| | attention_mask, |
| | past_key_values, |
| | labels, |
| | images |
| | ) |
| |
|
| | |
| | return super().forward( |
| | input_ids=input_ids, |
| | attention_mask=attention_mask, |
| | position_ids=position_ids, |
| | past_key_values=past_key_values, |
| | inputs_embeds=inputs_embeds, |
| | labels=labels, |
| | use_cache=use_cache, |
| | output_attentions=output_attentions, |
| | output_hidden_states=output_hidden_states, |
| | return_dict=return_dict |
| | ) |
| |
|
| | def prepare_inputs_for_generation(self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs): |
| | ''' |
| | This function is called for each token at inference |
| | ''' |
| | |
| | images = kwargs.pop("images", None) |
| |
|
| | |
| | |
| | |
| |
|
| | if past_key_values is not None: |
| | if isinstance(past_key_values, Cache): |
| | cache_length = past_key_values.get_seq_length() |
| | past_length = past_key_values.seen_tokens |
| | max_cache_length = past_key_values.get_max_length() |
| | else: |
| | cache_length = past_length = past_key_values[0][0].shape[2] |
| | max_cache_length = None |
| |
|
| | |
| | |
| | |
| | |
| | if attention_mask is not None and attention_mask.shape[1] > input_ids.shape[1]: |
| | input_ids = input_ids[:, -(attention_mask.shape[1] - past_length) :] |
| | |
| | |
| | elif past_length < input_ids.shape[1]: |
| | input_ids = input_ids[:, past_length:] |
| | |
| | elif past_length >= input_ids.shape[1]: |
| | input_ids = input_ids[:, [-1]] |
| |
|
| | |
| | if ( |
| | max_cache_length is not None |
| | and attention_mask is not None |
| | and cache_length + input_ids.shape[1] > max_cache_length |
| | ): |
| | attention_mask = attention_mask[:, -max_cache_length:] |
| |
|
| | position_ids = kwargs.get("position_ids", None) |
| | if attention_mask is not None and position_ids is None: |
| | |
| | position_ids = attention_mask.long().cumsum(-1) - 1 |
| | position_ids.masked_fill_(attention_mask == 0, 1) |
| | if past_key_values: |
| | position_ids = position_ids[:, -input_ids.shape[1] :] |
| |
|
| | |
| | if inputs_embeds is not None and past_key_values is None: |
| | model_inputs = {"inputs_embeds": inputs_embeds} |
| | else: |
| | model_inputs = {"input_ids": input_ids} |
| | |
| | model_inputs.update( |
| | { |
| | "position_ids": position_ids, |
| | "past_key_values": past_key_values, |
| | "use_cache": kwargs.get("use_cache"), |
| | "attention_mask": attention_mask, |
| | } |
| | ) |
| | |
| | |
| | |
| |
|
| |
|
| | if images is not None: |
| | model_inputs['images'] = images |
| | return model_inputs |
| |
|
| |
|
| |
|
| |
|
| | def chat( |
| | self, |
| | texts: Optional[str | list[list[str, str]]], |
| | images: Optional[str | list[str]] = None, |
| | history: Optional[list[str]] = None, |
| | stream = False, |
| | return_history = False, |
| | **kwargs |
| | ): |
| | ''' |
| | texts: if `str`, then generate for a single round; if list[dict], |
| | images: str (optional), local path to an image. |
| | ''' |
| | use_cache = kwargs.pop('use_cache', True) |
| |
|
| |
|
| | |
| | |
| | |
| | input_ids, image_tensors, history = build_allava_input( |
| | tokenizer = self.get_tokenizer(), |
| | processor = self.get_processor(), |
| | texts = texts, |
| | images = images, |
| | history=history, |
| | return_history=return_history, |
| | device = self.device |
| | ) |
| |
|
| | |
| | |
| | |
| | |
| | if 'cuda' in str(self.device): |
| | device_type = 'cuda' |
| | else: |
| | device_type = 'cpu' |
| |
|
| | with torch.autocast(device_type=device_type, dtype=self.dtype): |
| | output_ids = self.generate( |
| | inputs=input_ids, |
| | images=image_tensors, |
| | use_cache=use_cache, |
| | **kwargs) |
| |
|
| | answer = self.get_tokenizer().decode(output_ids[0, input_ids.shape[1]:], skip_special_tokens=True).strip() |
| |
|
| | if return_history: |
| | history[-1][-1] = answer |
| | return answer, history |
| | return answer |
| |
|
| |
|
| | AutoConfig.register("llava_phi", LlavaPhiConfig) |
| | AutoModelForCausalLM.register(LlavaPhiConfig, LlavaPhiForCausalLM) |