| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """ TF 2.0 OpenAI GPT-2 model. """ |
|
|
| from dataclasses import dataclass |
| from typing import List, Optional, Tuple |
|
|
| import tensorflow as tf |
|
|
| from ...activations_tf import get_tf_activation |
| from ...file_utils import ( |
| ModelOutput, |
| add_code_sample_docstrings, |
| add_start_docstrings, |
| add_start_docstrings_to_model_forward, |
| replace_return_docstrings, |
| ) |
| from ...modeling_tf_outputs import ( |
| TFBaseModelOutputWithPast, |
| TFCausalLMOutputWithPast, |
| TFSequenceClassifierOutputWithPast, |
| ) |
| from ...modeling_tf_utils import ( |
| TFCausalLanguageModelingLoss, |
| TFConv1D, |
| TFPreTrainedModel, |
| TFSequenceClassificationLoss, |
| TFSequenceSummary, |
| TFSharedEmbeddings, |
| get_initializer, |
| input_processing, |
| keras_serializable, |
| shape_list, |
| ) |
| from ...utils import logging |
| from .configuration_gpt2 import GPT2Config |
|
|
|
|
| logger = logging.get_logger(__name__) |
|
|
| _CHECKPOINT_FOR_DOC = "gpt2" |
| _CONFIG_FOR_DOC = "GPT2Config" |
| _TOKENIZER_FOR_DOC = "GPT2Tokenizer" |
|
|
| TF_GPT2_PRETRAINED_MODEL_ARCHIVE_LIST = [ |
| "gpt2", |
| "gpt2-medium", |
| "gpt2-large", |
| "gpt2-xl", |
| "distilgpt2", |
| |
| ] |
|
|
|
|
| class TFAttention(tf.keras.layers.Layer): |
| def __init__(self, nx, n_ctx, config, scale=False, **kwargs): |
| super().__init__(**kwargs) |
|
|
| n_state = nx |
| |
| assert n_state % config.n_head == 0 |
| self.n_ctx = n_ctx |
| self.n_head = config.n_head |
| self.split_size = n_state |
| self.scale = scale |
| self.output_attentions = config.output_attentions |
|
|
| self.c_attn = TFConv1D(n_state * 3, nx, initializer_range=config.initializer_range, name="c_attn") |
| self.c_proj = TFConv1D(n_state, nx, initializer_range=config.initializer_range, name="c_proj") |
| self.attn_dropout = tf.keras.layers.Dropout(config.attn_pdrop) |
| self.resid_dropout = tf.keras.layers.Dropout(config.resid_pdrop) |
| self.pruned_heads = set() |
|
|
| def prune_heads(self, heads): |
| pass |
|
|
| @staticmethod |
| def causal_attention_mask(nd, ns, dtype): |
| """ |
| 1's in the lower triangle, counting from the lower right corner. Same as tf.matrix_band_part(tf.ones([nd, ns]), |
| -1, ns-nd), but doesn't produce garbage on TPUs. |
| """ |
| i = tf.range(nd)[:, None] |
| j = tf.range(ns) |
| m = i >= j - ns + nd |
| return tf.cast(m, dtype) |
|
|
| def _attn(self, q, k, v, attention_mask, head_mask, output_attentions, training=False): |
| |
| w = tf.matmul(q, k, transpose_b=True) |
| if self.scale: |
| dk = tf.cast(shape_list(k)[-1], dtype=w.dtype) |
| w = w / tf.math.sqrt(dk) |
|
|
| |
| _, _, nd, ns = shape_list(w) |
| b = self.causal_attention_mask(nd, ns, dtype=w.dtype) |
| b = tf.reshape(b, [1, 1, nd, ns]) |
| w = w * b - 1e4 * (1 - b) |
|
|
| if attention_mask is not None: |
| |
| attention_mask = tf.cast(attention_mask, dtype=w.dtype) |
| w = w + attention_mask |
|
|
| w = tf.nn.softmax(w, axis=-1) |
| w = self.attn_dropout(w, training=training) |
|
|
| |
| if head_mask is not None: |
| w = w * head_mask |
|
|
| outputs = [tf.matmul(w, v)] |
| if output_attentions: |
| outputs.append(w) |
| return outputs |
|
|
| def merge_heads(self, x): |
| x = tf.transpose(x, [0, 2, 1, 3]) |
| x_shape = shape_list(x) |
| new_x_shape = x_shape[:-2] + [x_shape[-2] * x_shape[-1]] |
| return tf.reshape(x, new_x_shape) |
|
|
| def split_heads(self, x): |
| x_shape = shape_list(x) |
| new_x_shape = x_shape[:-1] + [self.n_head, x_shape[-1] // self.n_head] |
| x = tf.reshape(x, new_x_shape) |
| return tf.transpose(x, (0, 2, 1, 3)) |
|
|
| def call(self, x, layer_past, attention_mask, head_mask, use_cache, output_attentions, training=False): |
| x = self.c_attn(x) |
| query, key, value = tf.split(x, 3, axis=2) |
| query = self.split_heads(query) |
| key = self.split_heads(key) |
| value = self.split_heads(value) |
| if layer_past is not None: |
| past_key, past_value = tf.unstack(layer_past, axis=0) |
| key = tf.concat([past_key, key], axis=-2) |
| value = tf.concat([past_value, value], axis=-2) |
|
|
| |
| if use_cache: |
| present = tf.stack([key, value], axis=0) |
| else: |
| present = (None,) |
|
|
| attn_outputs = self._attn(query, key, value, attention_mask, head_mask, output_attentions, training=training) |
| a = attn_outputs[0] |
|
|
| a = self.merge_heads(a) |
| a = self.c_proj(a) |
| a = self.resid_dropout(a, training=training) |
|
|
| outputs = [a, present] + attn_outputs[1:] |
| return outputs |
|
|
|
|
| class TFMLP(tf.keras.layers.Layer): |
| def __init__(self, n_state, config, **kwargs): |
| super().__init__(**kwargs) |
| nx = config.n_embd |
| self.c_fc = TFConv1D(n_state, nx, initializer_range=config.initializer_range, name="c_fc") |
| self.c_proj = TFConv1D(nx, n_state, initializer_range=config.initializer_range, name="c_proj") |
| self.act = get_tf_activation("gelu") |
| self.dropout = tf.keras.layers.Dropout(config.resid_pdrop) |
|
|
| def call(self, x, training=False): |
| h = self.act(self.c_fc(x)) |
| h2 = self.c_proj(h) |
| h2 = self.dropout(h2, training=training) |
| return h2 |
|
|
|
|
| class TFBlock(tf.keras.layers.Layer): |
| def __init__(self, n_ctx, config, scale=False, **kwargs): |
| super().__init__(**kwargs) |
| nx = config.n_embd |
| inner_dim = config.n_inner if config.n_inner is not None else 4 * nx |
| self.ln_1 = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_epsilon, name="ln_1") |
| self.attn = TFAttention(nx, n_ctx, config, scale, name="attn") |
| self.ln_2 = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_epsilon, name="ln_2") |
| self.mlp = TFMLP(inner_dim, config, name="mlp") |
|
|
| def call(self, x, layer_past, attention_mask, head_mask, use_cache, output_attentions, training=False): |
| a = self.ln_1(x) |
| output_attn = self.attn( |
| a, layer_past, attention_mask, head_mask, use_cache, output_attentions, training=training |
| ) |
| a = output_attn[0] |
| x = x + a |
|
|
| m = self.ln_2(x) |
| m = self.mlp(m, training=training) |
| x = x + m |
|
|
| outputs = [x] + output_attn[1:] |
| return outputs |
|
|
|
|
| @keras_serializable |
| class TFGPT2MainLayer(tf.keras.layers.Layer): |
| config_class = GPT2Config |
|
|
| def __init__(self, config, *inputs, **kwargs): |
| super().__init__(*inputs, **kwargs) |
|
|
| self.config = config |
| self.output_attentions = config.output_attentions |
| self.output_hidden_states = config.output_hidden_states |
| self.use_cache = config.use_cache |
| self.return_dict = config.use_return_dict |
|
|
| self.num_hidden_layers = config.n_layer |
| self.vocab_size = config.vocab_size |
| self.n_embd = config.n_embd |
| self.n_positions = config.n_positions |
| self.initializer_range = config.initializer_range |
|
|
| self.wte = TFSharedEmbeddings( |
| config.vocab_size, config.hidden_size, initializer_range=config.initializer_range, name="wte" |
| ) |
| self.drop = tf.keras.layers.Dropout(config.embd_pdrop) |
| self.h = [TFBlock(config.n_ctx, config, scale=True, name=f"h_._{i}") for i in range(config.n_layer)] |
| self.ln_f = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_epsilon, name="ln_f") |
|
|
| def build(self, input_shape): |
| with tf.name_scope("wpe"): |
| self.wpe = self.add_weight( |
| name="embeddings", |
| shape=[self.n_positions, self.n_embd], |
| initializer=get_initializer(self.initializer_range), |
| ) |
|
|
| super().build(input_shape) |
|
|
| def get_input_embeddings(self): |
| return self.wte |
|
|
| def set_input_embeddings(self, value): |
| self.wte.weight = value |
| self.wte.vocab_size = shape_list(value)[0] |
|
|
| def _prune_heads(self, heads_to_prune): |
| """ |
| Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer} |
| """ |
| raise NotImplementedError |
|
|
| def call( |
| self, |
| input_ids=None, |
| past=None, |
| attention_mask=None, |
| token_type_ids=None, |
| position_ids=None, |
| head_mask=None, |
| inputs_embeds=None, |
| use_cache=None, |
| output_attentions=None, |
| output_hidden_states=None, |
| return_dict=None, |
| training=False, |
| **kwargs, |
| ): |
| inputs = input_processing( |
| func=self.call, |
| config=self.config, |
| input_ids=input_ids, |
| past=past, |
| attention_mask=attention_mask, |
| token_type_ids=token_type_ids, |
| position_ids=position_ids, |
| head_mask=head_mask, |
| inputs_embeds=inputs_embeds, |
| use_cache=use_cache, |
| output_attentions=output_attentions, |
| output_hidden_states=output_hidden_states, |
| return_dict=return_dict, |
| training=training, |
| kwargs_call=kwargs, |
| ) |
|
|
| if inputs["input_ids"] is not None and inputs["inputs_embeds"] is not None: |
| raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time") |
| elif inputs["input_ids"] is not None: |
| input_shape = shape_list(inputs["input_ids"]) |
| inputs["input_ids"] = tf.reshape(inputs["input_ids"], [-1, input_shape[-1]]) |
| elif inputs["inputs_embeds"] is not None: |
| input_shape = shape_list(inputs["inputs_embeds"])[:-1] |
| else: |
| raise ValueError("You have to specify either input_ids or inputs_embeds") |
|
|
| if inputs["past"] is None: |
| past_length = 0 |
| inputs["past"] = [None] * len(self.h) |
| else: |
| past_length = shape_list(inputs["past"][0][0])[-2] |
|
|
| if inputs["position_ids"] is None: |
| inputs["position_ids"] = tf.expand_dims(tf.range(past_length, input_shape[-1] + past_length), axis=0) |
|
|
| if inputs["attention_mask"] is not None: |
| |
| |
| |
| |
| |
| attention_mask_shape = shape_list(inputs["attention_mask"]) |
| inputs["attention_mask"] = tf.reshape( |
| inputs["attention_mask"], (attention_mask_shape[0], 1, 1, attention_mask_shape[1]) |
| ) |
|
|
| |
| |
| |
| |
| |
| one_cst = tf.constant(1.0) |
| inputs["attention_mask"] = tf.cast(inputs["attention_mask"], dtype=one_cst.dtype) |
| inputs["attention_mask"] = tf.multiply( |
| tf.subtract(one_cst, inputs["attention_mask"]), tf.constant(-10000.0) |
| ) |
|
|
| |
| |
| |
| |
| |
| if inputs["head_mask"] is not None: |
| raise NotImplementedError |
| else: |
| inputs["head_mask"] = [None] * self.num_hidden_layers |
| |
|
|
| inputs["position_ids"] = tf.reshape(inputs["position_ids"], [-1, shape_list(inputs["position_ids"])[-1]]) |
|
|
| if inputs["inputs_embeds"] is None: |
| inputs["inputs_embeds"] = self.wte(inputs["input_ids"], mode="embedding") |
|
|
| position_embeds = tf.gather(self.wpe, inputs["position_ids"]) |
|
|
| if inputs["token_type_ids"] is not None: |
| inputs["token_type_ids"] = tf.reshape( |
| inputs["token_type_ids"], [-1, shape_list(inputs["token_type_ids"])[-1]] |
| ) |
| token_type_embeds = self.wte(inputs["token_type_ids"], mode="embedding") |
| else: |
| token_type_embeds = tf.constant(0.0) |
|
|
| position_embeds = tf.cast(position_embeds, dtype=inputs["inputs_embeds"].dtype) |
| token_type_embeds = tf.cast(token_type_embeds, dtype=inputs["inputs_embeds"].dtype) |
| hidden_states = inputs["inputs_embeds"] + position_embeds + token_type_embeds |
| hidden_states = self.drop(hidden_states, training=inputs["training"]) |
|
|
| output_shape = input_shape + [shape_list(hidden_states)[-1]] |
|
|
| presents = () if inputs["use_cache"] else None |
| all_attentions = () if inputs["output_attentions"] else None |
| all_hidden_states = () if inputs["output_hidden_states"] else None |
| for i, (block, layer_past) in enumerate(zip(self.h, inputs["past"])): |
| if inputs["output_hidden_states"]: |
| all_hidden_states = all_hidden_states + (tf.reshape(hidden_states, output_shape),) |
|
|
| outputs = block( |
| hidden_states, |
| layer_past, |
| inputs["attention_mask"], |
| inputs["head_mask"][i], |
| inputs["use_cache"], |
| inputs["output_attentions"], |
| training=inputs["training"], |
| ) |
|
|
| hidden_states, present = outputs[:2] |
| if inputs["use_cache"]: |
| presents = presents + (present,) |
|
|
| if inputs["output_attentions"]: |
| all_attentions = all_attentions + (outputs[2],) |
|
|
| hidden_states = self.ln_f(hidden_states) |
|
|
| hidden_states = tf.reshape(hidden_states, output_shape) |
| |
| if inputs["output_hidden_states"]: |
| all_hidden_states = all_hidden_states + (hidden_states,) |
|
|
| if inputs["output_attentions"]: |
| |
| attention_output_shape = input_shape[:-1] + [-1] + shape_list(all_attentions[0])[-2:] |
| all_attentions = tuple(tf.reshape(t, attention_output_shape) for t in all_attentions) |
|
|
| if not inputs["return_dict"]: |
| return tuple(v for v in [hidden_states, presents, all_hidden_states, all_attentions] if v is not None) |
|
|
| return TFBaseModelOutputWithPast( |
| last_hidden_state=hidden_states, |
| past_key_values=presents, |
| hidden_states=all_hidden_states, |
| attentions=all_attentions, |
| ) |
|
|
|
|
| class TFGPT2PreTrainedModel(TFPreTrainedModel): |
| """ |
| An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained |
| models. |
| """ |
|
|
| config_class = GPT2Config |
| base_model_prefix = "transformer" |
| |
| _keys_to_ignore_on_load_unexpected = [r"h.\d+.attn.bias"] |
|
|
| @tf.function( |
| input_signature=[ |
| { |
| "input_ids": tf.TensorSpec((None, None), tf.int32, name="input_ids"), |
| "attention_mask": tf.TensorSpec((None, None), tf.int32, name="attention_mask"), |
| } |
| ] |
| ) |
| def serving(self, inputs): |
| output = self.call(inputs) |
|
|
| return self.serving_output(output) |
|
|
|
|
| @dataclass |
| class TFGPT2DoubleHeadsModelOutput(ModelOutput): |
| """ |
| Base class for outputs of models predicting if two sentences are consecutive or not. |
| |
| Args: |
| logits (:obj:`tf.Tensor` of shape :obj:`(batch_size, num_choices, sequence_length, config.vocab_size)`): |
| Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax). |
| mc_logits (:obj:`tf.Tensor` of shape :obj:`(batch_size, num_choices)`): |
| Prediction scores of the multiple choice classification head (scores for each choice before SoftMax). |
| past_key_values (:obj:`List[tf.Tensor]`, `optional`, returned when ``use_cache=True`` is passed or when ``config.use_cache=True``): |
| List of :obj:`tf.Tensor` of length :obj:`config.n_layers`, with each tensor of shape :obj:`(2, batch_size, |
| num_heads, sequence_length, embed_size_per_head)`). |
| |
| Contains pre-computed hidden-states (key and values in the attention blocks) that can be used (see |
| :obj:`past_key_values` input) to speed up sequential decoding. |
| hidden_states (:obj:`tuple(tf.Tensor)`, `optional`, returned when ``output_hidden_states=True`` is passed or when ``config.output_hidden_states=True``): |
| Tuple of :obj:`tf.Tensor` (one for the output of the embeddings + one for the output of each layer) of |
| shape :obj:`(batch_size, sequence_length, hidden_size)`. |
| |
| Hidden-states of the model at the output of each layer plus the initial embedding outputs. |
| attentions (:obj:`tuple(tf.Tensor)`, `optional`, returned when ``output_attentions=True`` is passed or when ``config.output_attentions=True``): |
| Tuple of :obj:`tf.Tensor` (one for each layer) of shape :obj:`(batch_size, num_heads, sequence_length, |
| sequence_length)`. |
| |
| Attentions weights after the attention softmax, used to compute the weighted average in the self-attention |
| heads. |
| """ |
|
|
| logits: tf.Tensor = None |
| mc_logits: tf.Tensor = None |
| past_key_values: Optional[List[tf.Tensor]] = None |
| hidden_states: Optional[Tuple[tf.Tensor]] = None |
| attentions: Optional[Tuple[tf.Tensor]] = None |
|
|
|
|
| GPT2_START_DOCSTRING = r""" |
| |
| This model inherits from :class:`~transformers.TFPreTrainedModel`. Check the superclass documentation for the |
| generic methods the library implements for all its model (such as downloading or saving, resizing the input |
| embeddings, pruning heads etc.) |
| |
| This model is also a `tf.keras.Model <https://www.tensorflow.org/api_docs/python/tf/keras/Model>`__ subclass. Use |
| it as a regular TF 2.0 Keras Model and refer to the TF 2.0 documentation for all matter related to general usage |
| and behavior. |
| |
| .. note:: |
| |
| TF 2.0 models accepts two formats as inputs: |
| |
| - having all inputs as keyword arguments (like PyTorch models), or |
| - having all inputs as a list, tuple or dict in the first positional arguments. |
| |
| This second option is useful when using :meth:`tf.keras.Model.fit` method which currently requires having all |
| the tensors in the first argument of the model call function: :obj:`model(inputs)`. |
| |
| If you choose this second option, there are three possibilities you can use to gather all the input Tensors in |
| the first positional argument : |
| |
| - a single Tensor with :obj:`input_ids` only and nothing else: :obj:`model(inputs_ids)` |
| - a list of varying length with one or several input Tensors IN THE ORDER given in the docstring: |
| :obj:`model([input_ids, attention_mask])` or :obj:`model([input_ids, attention_mask, token_type_ids])` |
| - a dictionary with one or several input Tensors associated to the input names given in the docstring: |
| :obj:`model({"input_ids": input_ids, "token_type_ids": token_type_ids})` |
| |
| Parameters: |
| config (:class:`~transformers.GPT2Config`): Model configuration class with all the parameters of the model. |
| Initializing with a config file does not load the weights associated with the model, only the |
| configuration. Check out the :meth:`~transformers.PreTrainedModel.from_pretrained` method to load the model |
| weights. |
| """ |
|
|
| GPT2_INPUTS_DOCSTRING = r""" |
| Args: |
| input_ids (:obj:`Numpy array` or :obj:`tf.Tensor` of shape :obj:`(batch_size, input_ids_length)`): |
| :obj:`input_ids_length` = ``sequence_length`` if ``past`` is ``None`` else ``past[0].shape[-2]`` |
| (``sequence_length`` of input past key value states). Indices of input sequence tokens in the vocabulary. |
| |
| If :obj:`past` is used, only input IDs that do not have their past calculated should be passed as |
| ``input_ids``. |
| |
| Indices can be obtained using :class:`~transformers.GPT2Tokenizer`. See |
| :func:`transformers.PreTrainedTokenizer.__call__` and :func:`transformers.PreTrainedTokenizer.encode` for |
| details. |
| |
| `What are input IDs? <../glossary.html#input-ids>`__ |
| past (:obj:`List[tf.Tensor]` of length :obj:`config.n_layers`): |
| Contains pre-computed hidden-states (key and values in the attention blocks) as computed by the model (see |
| :obj:`past` output below). Can be used to speed up sequential decoding. The token ids which have their past |
| given to this model should not be passed as input ids as they have already been computed. |
| attention_mask (:obj:`tf.Tensor` or :obj:`Numpy array` of shape :obj:`(batch_size, sequence_length)`, `optional`): |
| Mask to avoid performing attention on padding token indices. Mask values selected in ``[0, 1]``: |
| |
| - 1 for tokens that are **not masked**, |
| - 0 for tokens that are **masked**. |
| |
| `What are attention masks? <../glossary.html#attention-mask>`__ |
| token_type_ids (:obj:`tf.Tensor` or :obj:`Numpy array` of shape :obj:`(batch_size, sequence_length)`, `optional`): |
| Segment token indices to indicate first and second portions of the inputs. Indices are selected in ``[0, |
| 1]``: |
| |
| - 0 corresponds to a `sentence A` token, |
| - 1 corresponds to a `sentence B` token. |
| |
| `What are token type IDs? <../glossary.html#token-type-ids>`__ |
| position_ids (:obj:`tf.Tensor` or :obj:`Numpy array` of shape :obj:`(batch_size, sequence_length)`, `optional`): |
| Indices of positions of each input sequence tokens in the position embeddings. Selected in the range ``[0, |
| config.max_position_embeddings - 1]``. |
| |
| `What are position IDs? <../glossary.html#position-ids>`__ |
| head_mask (:obj:`Numpy array` or :obj:`tf.Tensor` of shape :obj:`(num_heads,)` or :obj:`(num_layers, num_heads)`, `optional`): |
| Mask to nullify selected heads of the self-attention modules. Mask values selected in ``[0, 1]``: |
| |
| - 1 indicates the head is **not masked**, |
| - 0 indicates the head is **masked**. |
| |
| inputs_embeds (:obj:`tf.Tensor` of shape :obj:`(batch_size, sequence_length, hidden_size)`, `optional`): |
| Optionally, instead of passing :obj:`input_ids` you can choose to directly pass an embedded representation. |
| This is useful if you want more control over how to convert :obj:`input_ids` indices into associated |
| vectors than the model's internal embedding lookup matrix. |
| output_attentions (:obj:`bool`, `optional`): |
| Whether or not to return the attentions tensors of all attention layers. See ``attentions`` under returned |
| tensors for more detail. This argument can be used only in eager mode, in graph mode the value in the |
| config will be used instead. |
| output_hidden_states (:obj:`bool`, `optional`): |
| Whether or not to return the hidden states of all layers. See ``hidden_states`` under returned tensors for |
| more detail. This argument can be used only in eager mode, in graph mode the value in the config will be |
| used instead. |
| return_dict (:obj:`bool`, `optional`): |
| Whether or not to return a :class:`~transformers.file_utils.ModelOutput` instead of a plain tuple. This |
| argument can be used in eager mode, in graph mode the value will always be set to True. |
| training (:obj:`bool`, `optional`, defaults to :obj:`False`): |
| Whether or not to use the model in training mode (some modules like dropout modules have different |
| behaviors between training and evaluation). |
| """ |
|
|
|
|
| @add_start_docstrings( |
| "The bare GPT2 Model transformer outputting raw hidden-states without any specific head on top.", |
| GPT2_START_DOCSTRING, |
| ) |
| class TFGPT2Model(TFGPT2PreTrainedModel): |
| def __init__(self, config, *inputs, **kwargs): |
| super().__init__(config, *inputs, **kwargs) |
| self.transformer = TFGPT2MainLayer(config, name="transformer") |
|
|
| @add_start_docstrings_to_model_forward(GPT2_INPUTS_DOCSTRING) |
| @add_code_sample_docstrings( |
| tokenizer_class=_TOKENIZER_FOR_DOC, |
| checkpoint=_CHECKPOINT_FOR_DOC, |
| output_type=TFBaseModelOutputWithPast, |
| config_class=_CONFIG_FOR_DOC, |
| ) |
| def call( |
| self, |
| input_ids=None, |
| past=None, |
| attention_mask=None, |
| token_type_ids=None, |
| position_ids=None, |
| head_mask=None, |
| inputs_embeds=None, |
| use_cache=None, |
| output_attentions=None, |
| output_hidden_states=None, |
| return_dict=None, |
| training=False, |
| **kwargs, |
| ): |
| inputs = input_processing( |
| func=self.call, |
| config=self.config, |
| input_ids=input_ids, |
| past=past, |
| attention_mask=attention_mask, |
| token_type_ids=token_type_ids, |
| position_ids=position_ids, |
| head_mask=head_mask, |
| inputs_embeds=inputs_embeds, |
| use_cache=use_cache, |
| output_attentions=output_attentions, |
| output_hidden_states=output_hidden_states, |
| return_dict=return_dict, |
| training=training, |
| kwargs_call=kwargs, |
| ) |
| outputs = self.transformer( |
| input_ids=inputs["input_ids"], |
| past=inputs["past"], |
| attention_mask=inputs["attention_mask"], |
| token_type_ids=inputs["token_type_ids"], |
| position_ids=inputs["position_ids"], |
| head_mask=inputs["head_mask"], |
| inputs_embeds=inputs["inputs_embeds"], |
| use_cache=inputs["use_cache"], |
| output_attentions=inputs["output_attentions"], |
| output_hidden_states=inputs["output_hidden_states"], |
| return_dict=inputs["return_dict"], |
| training=inputs["training"], |
| ) |
|
|
| return outputs |
|
|
| def serving_output(self, output): |
| pkv = tf.convert_to_tensor(output.past_key_values) if self.config.use_cache else None |
| hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None |
| attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None |
|
|
| return TFBaseModelOutputWithPast( |
| last_hidden_state=output.last_hidden_state, past_key_values=pkv, hidden_states=hs, attentions=attns |
| ) |
|
|
|
|
| @add_start_docstrings( |
| """ |
| The GPT2 Model transformer with a language modeling head on top (linear layer with weights tied to the input |
| embeddings). |
| """, |
| GPT2_START_DOCSTRING, |
| ) |
| class TFGPT2LMHeadModel(TFGPT2PreTrainedModel, TFCausalLanguageModelingLoss): |
| def __init__(self, config, *inputs, **kwargs): |
| super().__init__(config, *inputs, **kwargs) |
| self.transformer = TFGPT2MainLayer(config, name="transformer") |
|
|
| def get_output_embeddings(self): |
| return self.get_input_embeddings() |
|
|
| def set_output_embeddings(self, value): |
| self.set_input_embeddings(value) |
|
|
| def prepare_inputs_for_generation(self, inputs, past, **kwargs): |
| |
| if past: |
| inputs = tf.expand_dims(inputs[:, -1], -1) |
|
|
| return {"input_ids": inputs, "past": past, "use_cache": kwargs["use_cache"]} |
|
|
| @add_start_docstrings_to_model_forward(GPT2_INPUTS_DOCSTRING) |
| @add_code_sample_docstrings( |
| tokenizer_class=_TOKENIZER_FOR_DOC, |
| checkpoint=_CHECKPOINT_FOR_DOC, |
| output_type=TFCausalLMOutputWithPast, |
| config_class=_CONFIG_FOR_DOC, |
| ) |
| def call( |
| self, |
| input_ids=None, |
| past=None, |
| attention_mask=None, |
| token_type_ids=None, |
| position_ids=None, |
| head_mask=None, |
| inputs_embeds=None, |
| use_cache=None, |
| output_attentions=None, |
| output_hidden_states=None, |
| return_dict=None, |
| labels=None, |
| training=False, |
| **kwargs, |
| ): |
| r""" |
| labels (:obj:`tf.Tensor` of shape :obj:`(batch_size, sequence_length)`, `optional`): |
| Labels for computing the cross entropy classification loss. Indices should be in ``[0, ..., |
| config.vocab_size - 1]``. |
| """ |
| inputs = input_processing( |
| func=self.call, |
| config=self.config, |
| input_ids=input_ids, |
| past=past, |
| attention_mask=attention_mask, |
| token_type_ids=token_type_ids, |
| position_ids=position_ids, |
| head_mask=head_mask, |
| inputs_embeds=inputs_embeds, |
| use_cache=use_cache, |
| output_attentions=output_attentions, |
| output_hidden_states=output_hidden_states, |
| return_dict=return_dict, |
| labels=labels, |
| training=training, |
| kwargs_call=kwargs, |
| ) |
| transformer_outputs = self.transformer( |
| input_ids=inputs["input_ids"], |
| past=inputs["past"], |
| attention_mask=inputs["attention_mask"], |
| token_type_ids=inputs["token_type_ids"], |
| position_ids=inputs["position_ids"], |
| head_mask=inputs["head_mask"], |
| inputs_embeds=inputs["inputs_embeds"], |
| use_cache=inputs["use_cache"], |
| output_attentions=inputs["output_attentions"], |
| output_hidden_states=inputs["output_hidden_states"], |
| return_dict=inputs["return_dict"], |
| training=inputs["training"], |
| ) |
| hidden_states = transformer_outputs[0] |
| logits = self.transformer.wte(hidden_states, mode="linear") |
|
|
| loss = None |
| if inputs["labels"] is not None: |
| |
| logits = logits[:, :-1] |
| labels = inputs["labels"][:, 1:] |
| loss = self.compute_loss(labels, logits) |
|
|
| if not inputs["return_dict"]: |
| output = (logits,) + transformer_outputs[1:] |
| return ((loss,) + output) if loss is not None else output |
|
|
| return TFCausalLMOutputWithPast( |
| loss=loss, |
| logits=logits, |
| past_key_values=transformer_outputs.past_key_values, |
| hidden_states=transformer_outputs.hidden_states, |
| attentions=transformer_outputs.attentions, |
| ) |
|
|
| def serving_output(self, output): |
| pkv = tf.convert_to_tensor(output.past_key_values) if self.config.use_cache else None |
| hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None |
| attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None |
|
|
| return TFCausalLMOutputWithPast(logits=output.logits, past_key_values=pkv, hidden_states=hs, attentions=attns) |
|
|
|
|
| @add_start_docstrings( |
| """ |
| The GPT2 Model transformer with a language modeling and a multiple-choice classification head on top e.g. for |
| RocStories/SWAG tasks. The two heads are two linear layers. The language modeling head has its weights tied to the |
| input embeddings, the classification head takes as input the input of a specified classification token index in the |
| input sequence). |
| """, |
| GPT2_START_DOCSTRING, |
| ) |
| class TFGPT2DoubleHeadsModel(TFGPT2PreTrainedModel): |
| def __init__(self, config, *inputs, **kwargs): |
| super().__init__(config, *inputs, **kwargs) |
| config.num_labels = 1 |
| self.transformer = TFGPT2MainLayer(config, name="transformer") |
| self.multiple_choice_head = TFSequenceSummary( |
| config, initializer_range=config.initializer_range, name="multiple_choice_head" |
| ) |
|
|
| @add_start_docstrings_to_model_forward(GPT2_INPUTS_DOCSTRING) |
| @replace_return_docstrings(output_type=TFGPT2DoubleHeadsModelOutput, config_class=_CONFIG_FOR_DOC) |
| def call( |
| self, |
| input_ids=None, |
| past=None, |
| attention_mask=None, |
| token_type_ids=None, |
| position_ids=None, |
| head_mask=None, |
| inputs_embeds=None, |
| mc_token_ids=None, |
| use_cache=None, |
| output_attentions=None, |
| output_hidden_states=None, |
| return_dict=None, |
| training=False, |
| **kwargs, |
| ): |
| r""" |
| mc_token_ids (:obj:`tf.Tensor` or :obj:`Numpy array` of shape :obj:`(batch_size, num_choices)`, `optional`, default to index of the last token of the input): |
| Index of the classification token in each input sequence. Selected in the range ``[0, input_ids.size(-1) - |
| 1[``. |
| |
| Return: |
| |
| Examples:: |
| |
| >>> import tensorflow as tf |
| >>> from transformers import GPT2Tokenizer, TFGPT2DoubleHeadsModel |
| |
| >>> tokenizer = GPT2Tokenizer.from_pretrained('gpt2') |
| >>> model = TFGPT2DoubleHeadsModel.from_pretrained('gpt2') |
| |
| >>> # Add a [CLS] to the vocabulary (we should train it also!) |
| >>> num_added_tokens = tokenizer.add_special_tokens({'cls_token': '[CLS]'}) |
| |
| >>> embedding_layer = model.resize_token_embeddings(len(tokenizer)) # Update the model embeddings with the new vocabulary size |
| |
| >>> choices = ["Hello, my dog is cute [CLS]", "Hello, my cat is cute [CLS]"] |
| >>> encoded_choices = [tokenizer.encode(s) for s in choices] |
| >>> cls_token_location = [tokens.index(tokenizer.cls_token_id) for tokens in encoded_choices] |
| |
| >>> input_ids = tf.constant(encoded_choices)[None, :] # Batch size: 1, number of choices: 2 |
| >>> mc_token_ids = tf.constant([cls_token_location]) # Batch size: 1 |
| |
| >>> outputs = model(input_ids, mc_token_ids=mc_token_ids) |
| >>> lm_prediction_scores, mc_prediction_scores = outputs[:2] |
| |
| """ |
| inputs = input_processing( |
| func=self.call, |
| config=self.config, |
| input_ids=input_ids, |
| past=past, |
| attention_mask=attention_mask, |
| token_type_ids=token_type_ids, |
| position_ids=position_ids, |
| head_mask=head_mask, |
| inputs_embeds=inputs_embeds, |
| mc_token_ids=mc_token_ids, |
| use_cache=use_cache, |
| output_attentions=output_attentions, |
| output_hidden_states=output_hidden_states, |
| return_dict=return_dict, |
| training=training, |
| kwargs_call=kwargs, |
| ) |
|
|
| if inputs["input_ids"] is not None: |
| input_shapes = shape_list(inputs["input_ids"]) |
| else: |
| input_shapes = shape_list(inputs["inputs_embeds"])[:-1] |
|
|
| seq_length = input_shapes[-1] |
| flat_input_ids = tf.reshape(inputs["input_ids"], (-1, seq_length)) if inputs["input_ids"] is not None else None |
| flat_attention_mask = ( |
| tf.reshape(inputs["attention_mask"], (-1, seq_length)) if inputs["attention_mask"] is not None else None |
| ) |
| flat_token_type_ids = ( |
| tf.reshape(inputs["token_type_ids"], (-1, seq_length)) if inputs["token_type_ids"] is not None else None |
| ) |
| flat_position_ids = ( |
| tf.reshape(inputs["position_ids"], (-1, seq_length)) if inputs["position_ids"] is not None else None |
| ) |
| transformer_outputs = self.transformer( |
| flat_input_ids, |
| inputs["past"], |
| flat_attention_mask, |
| flat_token_type_ids, |
| flat_position_ids, |
| inputs["head_mask"], |
| inputs["inputs_embeds"], |
| inputs["use_cache"], |
| inputs["output_attentions"], |
| inputs["output_hidden_states"], |
| return_dict=inputs["return_dict"], |
| training=inputs["training"], |
| ) |
| hidden_states = transformer_outputs[0] |
| hidden_states = tf.reshape(hidden_states, input_shapes + shape_list(hidden_states)[-1:]) |
| lm_logits = self.transformer.wte(hidden_states, mode="linear") |
| mc_logits = self.multiple_choice_head(hidden_states, inputs["mc_token_ids"], training=inputs["training"]) |
| mc_logits = tf.squeeze(mc_logits, axis=-1) |
|
|
| if not inputs["return_dict"]: |
| return (lm_logits, mc_logits) + transformer_outputs[1:] |
|
|
| return TFGPT2DoubleHeadsModelOutput( |
| logits=lm_logits, |
| mc_logits=mc_logits, |
| past_key_values=transformer_outputs.past_key_values, |
| hidden_states=transformer_outputs.hidden_states, |
| attentions=transformer_outputs.attentions, |
| ) |
|
|
| @tf.function( |
| input_signature=[ |
| { |
| "input_ids": tf.TensorSpec((None, None, None), tf.int32, name="input_ids"), |
| "attention_mask": tf.TensorSpec((None, None, None), tf.int32, name="attention_mask"), |
| "mc_token_ids": tf.TensorSpec((None, None), tf.int32, name="mc_token_ids"), |
| } |
| ] |
| ) |
| def serving(self, inputs): |
| output = self.call(inputs) |
|
|
| return self.serving_output(output) |
|
|
| def serving_output(self, output): |
| pkv = tf.convert_to_tensor(output.past_key_values) if self.config.use_cache else None |
| hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None |
| attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None |
|
|
| return TFGPT2DoubleHeadsModelOutput( |
| logits=output.logits, |
| mc_logits=output.mc_logits, |
| past_key_values=pkv, |
| hidden_states=hs, |
| attentions=attns, |
| ) |
|
|
|
|
| @add_start_docstrings( |
| """ |
| The GPT2 Model transformer with a sequence classification head on top (linear layer). |
| |
| :class:`~transformers.TFGPT2ForSequenceClassification` uses the last token in order to do the classification, as |
| other causal models (e.g. GPT-1) do. |
| |
| Since it does classification on the last token, it requires to know the position of the last token. If a |
| :obj:`pad_token_id` is defined in the configuration, it finds the last token that is not a padding token in each |
| row. If no :obj:`pad_token_id` is defined, it simply takes the last value in each row of the batch. Since it cannot |
| guess the padding tokens when :obj:`inputs_embeds` are passed instead of :obj:`input_ids`, it does the same (take |
| the last value in each row of the batch). |
| """, |
| GPT2_START_DOCSTRING, |
| ) |
| class TFGPT2ForSequenceClassification(TFGPT2PreTrainedModel, TFSequenceClassificationLoss): |
| def __init__(self, config, *inputs, **kwargs): |
| super().__init__(config, *inputs, **kwargs) |
| self.num_labels = config.num_labels |
| self.score = tf.keras.layers.Dense( |
| config.num_labels, |
| kernel_initializer=get_initializer(config.initializer_range), |
| name="score", |
| use_bias=False, |
| ) |
| self.transformer = TFGPT2MainLayer(config, name="transformer") |
|
|
| @add_start_docstrings_to_model_forward(GPT2_INPUTS_DOCSTRING) |
| @add_code_sample_docstrings( |
| tokenizer_class=_TOKENIZER_FOR_DOC, |
| checkpoint="microsoft/DialogRPT-updown", |
| output_type=TFSequenceClassifierOutputWithPast, |
| config_class=_CONFIG_FOR_DOC, |
| ) |
| def call( |
| self, |
| input_ids=None, |
| past=None, |
| attention_mask=None, |
| token_type_ids=None, |
| position_ids=None, |
| head_mask=None, |
| inputs_embeds=None, |
| use_cache=None, |
| output_attentions=None, |
| output_hidden_states=None, |
| return_dict=None, |
| labels=None, |
| training=False, |
| **kwargs, |
| ): |
| r""" |
| labels (:obj:`tf.Tensor` of shape :obj:`(batch_size, sequence_length)`, `optional`): |
| Labels for computing the cross entropy classification loss. Indices should be in ``[0, ..., |
| config.vocab_size - 1]``. |
| """ |
| inputs = input_processing( |
| func=self.call, |
| config=self.config, |
| input_ids=input_ids, |
| past=past, |
| attention_mask=attention_mask, |
| token_type_ids=token_type_ids, |
| position_ids=position_ids, |
| head_mask=head_mask, |
| inputs_embeds=inputs_embeds, |
| use_cache=use_cache, |
| output_attentions=output_attentions, |
| output_hidden_states=output_hidden_states, |
| return_dict=return_dict, |
| labels=labels, |
| training=training, |
| kwargs_call=kwargs, |
| ) |
|
|
| transformer_outputs = self.transformer( |
| input_ids=inputs["input_ids"], |
| past=inputs["past"], |
| attention_mask=inputs["attention_mask"], |
| token_type_ids=inputs["token_type_ids"], |
| position_ids=inputs["position_ids"], |
| head_mask=inputs["head_mask"], |
| inputs_embeds=inputs["inputs_embeds"], |
| use_cache=inputs["use_cache"], |
| output_attentions=inputs["output_attentions"], |
| output_hidden_states=inputs["output_hidden_states"], |
| return_dict=inputs["return_dict"], |
| training=inputs["training"], |
| ) |
|
|
| hidden_states = transformer_outputs[0] |
| logits = self.score(hidden_states) |
| logits_shape = shape_list(logits) |
| in_logits = None |
| if self.config.pad_token_id is None: |
| sequence_lengths = -1 |
| else: |
| if inputs["input_ids"] is not None: |
| sequence_lengths = ( |
| tf.reduce_sum( |
| tf.cast( |
| tf.math.not_equal(inputs["input_ids"], self.config.pad_token_id), |
| dtype=inputs["input_ids"].dtype, |
| ), |
| -1, |
| keepdims=False, |
| ) |
| - 1 |
| ) |
| in_logits = tf.gather(logits, sequence_lengths, batch_dims=1, axis=1) |
| else: |
| sequence_lengths = -1 |
| logger.warning( |
| f"{self.__class__.__name__} will not detect padding tokens in `inputs_embeds`. Results may be " |
| f"unexpected if using padding tokens in conjunction with `inputs_embeds.`" |
| ) |
| loss = None |
|
|
| if inputs["labels"] is not None: |
| assert ( |
| self.config.pad_token_id is not None or logits_shape[0] == 1 |
| ), "Cannot handle batch sizes > 1 if no padding token is defined." |
|
|
| if not tf.is_tensor(sequence_lengths): |
| in_logits = logits[0 : logits_shape[0], sequence_lengths] |
|
|
| loss = self.compute_loss(tf.reshape(inputs["labels"], [-1]), tf.reshape(in_logits, [-1, self.num_labels])) |
| pooled_logits = in_logits if in_logits is not None else logits |
|
|
| if not inputs["return_dict"]: |
| output = (pooled_logits,) + transformer_outputs[1:] |
| return ((loss,) + output) if loss is not None else output |
|
|
| return TFSequenceClassifierOutputWithPast( |
| loss=loss, |
| logits=pooled_logits, |
| past_key_values=transformer_outputs.past_key_values, |
| hidden_states=transformer_outputs.hidden_states, |
| attentions=transformer_outputs.attentions, |
| ) |
|
|
| def serving_output(self, output): |
| pkv = tf.convert_to_tensor(output.past_key_values) if self.config.use_cache else None |
| hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None |
| attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None |
|
|
| return TFSequenceClassifierOutputWithPast( |
| logits=output.logits, past_key_values=pkv, hidden_states=hs, attentions=attns |
| ) |
|
|