| from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer, PreTrainedModel |
| import torch |
| import torch.nn as nn |
| from typing import List, Optional, Tuple, Union |
|
|
| from transformers.utils import ( |
| add_code_sample_docstrings, |
| add_start_docstrings, |
| add_start_docstrings_to_model_forward, |
| logging, |
| replace_return_docstrings, |
| ) |
|
|
| from transformers.modeling_outputs import ( |
| BaseModelOutputWithPast, |
| CausalLMOutputWithPast, |
| QuestionAnsweringModelOutput, |
| SequenceClassifierOutputWithPast, |
| ) |
|
|
| from transformers import OPTConfig |
| from transformers.models.opt.modeling_opt import OPTModel |
| from transformers.models.opt.modeling_opt import OPTPreTrainedModel |
|
|
| class OPT_PromptTuned_For_SentimentAnalysis(OPTPreTrainedModel): |
| _tied_weights_keys = ["lm_head.weight"] |
| _CONFIG_FOR_DOC = "OPTConfig" |
| config_class = OPTConfig |
|
|
|
|
| def __init__(self, config): |
|
|
| self.config = config |
| super().__init__(config) |
| self.model = OPTModel(config) |
| self.lm_head = nn.Linear(config.word_embed_proj_dim, config.vocab_size, bias=False) |
| self.embedding = nn.Embedding(8, config.word_embed_proj_dim) |
| self.post_init() |
|
|
|
|
| def get_input_embeddings(self): |
| return self.model.decoder.embed_tokens |
|
|
| def set_input_embeddings(self, value): |
| self.model.decoder.embed_tokens = value |
|
|
| def get_output_embeddings(self): |
| return self.lm_head |
|
|
| def set_output_embeddings(self, new_embeddings): |
| self.lm_head = new_embeddings |
|
|
| def set_decoder(self, decoder): |
| self.model.decoder = decoder |
|
|
| def get_decoder(self): |
| return self.model.decoder |
|
|
| def load_prompts(self): |
| self.embedding.load_state_dict(torch.load(self.config.prompt_dict_path)) |
| return self |
|
|
| @replace_return_docstrings(output_type=CausalLMOutputWithPast, config_class=_CONFIG_FOR_DOC) |
| def forward( |
| self, |
| input_ids: torch.LongTensor = None, |
| attention_mask: Optional[torch.Tensor] = None, |
| head_mask: Optional[torch.Tensor] = None, |
| past_key_values: Optional[List[torch.FloatTensor]] = None, |
| inputs_embeds: Optional[torch.FloatTensor] = None, |
| labels: Optional[torch.LongTensor] = None, |
| use_cache: Optional[bool] = None, |
| output_attentions: Optional[bool] = None, |
| output_hidden_states: Optional[bool] = None, |
| return_dict: Optional[bool] = None |
| ) -> Union[Tuple, CausalLMOutputWithPast]: |
| r""" |
| Args: |
| input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`): |
| Indices of input sequence tokens in the vocabulary. Padding will be ignored by default should you |
| provide it. |
| |
| Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and |
| [`PreTrainedTokenizer.__call__`] for details. |
| |
| [What are input IDs?](../glossary#input-ids) |
| attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): |
| Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: |
| |
| - 1 for tokens that are **not masked**, |
| - 0 for tokens that are **masked**. |
| |
| [What are attention masks?](../glossary#attention-mask) |
| head_mask (`torch.Tensor` of shape `(num_hidden_layers, num_attention_heads)`, *optional*): |
| Mask to nullify selected heads of the attention modules. Mask values selected in `[0, 1]`: |
| |
| - 1 indicates the head is **not masked**, |
| - 0 indicates the head is **masked**. |
| |
| past_key_values (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`): |
| Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of |
| shape `(batch_size, num_heads, sequence_length, embed_size_per_head)`) and 2 additional tensors of |
| shape `(batch_size, num_heads, encoder_sequence_length, embed_size_per_head)`. The two additional |
| tensors are only required when the model is used as a decoder in a Sequence to Sequence model. |
| |
| Contains pre-computed hidden-states (key and values in the self-attention blocks and in the |
| cross-attention blocks) that can be used (see `past_key_values` input) to speed up sequential decoding. |
| |
| If `past_key_values` are used, the user can optionally input only the last `decoder_input_ids` (those |
| that don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of |
| all `decoder_input_ids` of shape `(batch_size, sequence_length)`. |
| inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): |
| Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. |
| This is useful if you want more control over how to convert `input_ids` indices into associated vectors |
| than the model's internal embedding lookup matrix. |
| labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): |
| Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., |
| config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored |
| (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. |
| use_cache (`bool`, *optional*): |
| If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding |
| (see `past_key_values`). |
| output_attentions (`bool`, *optional*): |
| Whether or not to return the attentions tensors of all attention layers. See `attentions` under |
| returned tensors for more detail. |
| output_hidden_states (`bool`, *optional*): |
| Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors |
| for more detail. |
| return_dict (`bool`, *optional*): |
| Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. |
| |
| Returns: |
| |
| Example: |
| |
| ```python |
| >>> from transformers import AutoTokenizer, OPTForCausalLM |
| |
| >>> model = OPTForCausalLM.from_pretrained("facebook/opt-350m") |
| >>> tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") |
| |
| >>> prompt = "Hey, are you conscious? Can you talk to me?" |
| >>> inputs = tokenizer(prompt, return_tensors="pt") |
| |
| >>> # Generate |
| >>> generate_ids = model.generate(inputs.input_ids, max_length=30) |
| >>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] |
| "Hey, are you conscious? Can you talk to me?\nI'm not conscious. I'm just a little bit of a weirdo." |
| ```""" |
|
|
| output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions |
| output_hidden_states = ( |
| output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states |
| ) |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
| |
|
|
|
|
| outputs = self.model.decoder( |
| input_ids=input_ids, |
| attention_mask=attention_mask, |
| head_mask=head_mask, |
| past_key_values=past_key_values, |
| inputs_embeds=inputs_embeds, |
| use_cache=True, |
| output_attentions=output_attentions, |
| output_hidden_states=output_hidden_states, |
| return_dict=return_dict |
| ) |
|
|
| logits = self.lm_head(outputs[0]).contiguous() |
|
|
| loss = None |
| if labels is not None: |
| |
| labels = labels.to(logits.device) |
| |
| shift_logits = logits[..., :-1, :].contiguous() |
| shift_labels = labels[..., 1:].contiguous() |
| |
| loss_fct = CrossEntropyLoss() |
| loss = loss_fct(shift_logits.view(-1, self.config.vocab_size), shift_labels.view(-1)) |
|
|
| if not return_dict: |
| output = (logits,) + outputs[1:] |
| return (loss,) + output if loss is not None else output |
|
|
| return CausalLMOutputWithPast( |
| loss=loss, |
| logits=logits, |
| past_key_values=outputs.past_key_values, |
| hidden_states=outputs.hidden_states, |
| attentions=outputs.attentions, |
| ) |
|
|
| def prepare_inputs_for_generation( |
| self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs |
| ): |
| |
| if past_key_values: |
| input_ids = input_ids[:, -1:] |
| |
| if inputs_embeds is not None and past_key_values is None: |
| input = torch.tensor([0,1,2,3,4,5,6,7]).to(inputs_embeds.device) |
| inputs_embeds = torch.cat([self.embedding(input).unsqueeze(0), inputs_embeds], dim=1) |
| attention_mask = torch.cat( |
| [torch.ones((attention_mask.shape[0], 8), |
| device=attention_mask.device), |
| attention_mask], dim=1) |
| model_inputs = {"inputs_embeds": inputs_embeds} |
| else: |
| model_inputs = {"input_ids": input_ids} |
| attention_mask = torch.cat( |
| [torch.ones((attention_mask.shape[0], 8), |
| device=attention_mask.device), |
| attention_mask], dim=1) |
|
|
| model_inputs.update( |
| { |
| "past_key_values": past_key_values, |
| "use_cache": kwargs.get("use_cache"), |
| "attention_mask": attention_mask, |
| } |
| ) |
| return model_inputs |
|
|
| def generate(self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs): |
| max_new_tokens = 3 |
| input_embeddings = self.get_input_embeddings()(input_ids).to(input_ids.device) |
| |
| return super().generate(input_ids=input_ids, inputs_embeds=input_embeddings,max_new_tokens=max_new_tokens,attention_mask=attention_mask, **kwargs) |
|
|
| @staticmethod |
| def _reorder_cache(past_key_values, beam_idx): |
| reordered_past = () |
| for layer_past in past_key_values: |
| reordered_past += (tuple(past_state.index_select(0, beam_idx) for past_state in layer_past),) |
| return reordered_past |