Spaces:
Sleeping
Sleeping
| # Copyright 2022 DeepMind Technologies Limited. All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # ============================================================================== | |
| """Didactic example of an autoregressive Transformer-based language model. | |
| Glossary of shapes: | |
| - B: Batch size. | |
| - T: Sequence length. | |
| - D: Model embedding size. | |
| - H: Number of attention heads. | |
| - V: Vocabulary size. | |
| Forked from: haiku.examples.transformer.model | |
| """ | |
| import collections | |
| import dataclasses | |
| from typing import Callable, List, Optional | |
| import chex | |
| import haiku as hk | |
| import jax | |
| import jax.numpy as jnp | |
| import numpy as np | |
| from tracr.transformer import attention | |
| # hk.Modules are not always callable: github.com/deepmind/dm-haiku/issues/52 | |
| # Ideally, we'd want a type: | |
| # CallableHaikuModule = Intersection[Callable[..., jax.Array], hk.Module] | |
| # But Intersection does not exist (yet): github.com/python/typing/issues/213 | |
| CallableHaikuModule = Callable[..., jax.Array] | |
| class TransformerOutput: | |
| layer_outputs: List[jax.Array] # [B, T, D] | |
| residuals: List[jax.Array] # [B, T, D] | |
| attn_logits: List[jax.Array] # [B, H, T, T] | |
| output: jax.Array # [B, T, D] | |
| input_embeddings: jax.Array # [B, T, D] | |
| class TransformerConfig: | |
| num_heads: int | |
| num_layers: int | |
| key_size: int | |
| mlp_hidden_size: int | |
| dropout_rate: float | |
| activation_function: Callable[[jax.Array], jax.Array] = jax.nn.gelu | |
| layer_norm: bool = True | |
| causal: bool = False | |
| class Transformer(hk.Module): | |
| """A transformer stack.""" | |
| config: TransformerConfig | |
| name: Optional[str] = None | |
| def __call__( | |
| self, | |
| embeddings: jax.Array, # [B, T, D] | |
| mask: jax.Array, # [B, T] | |
| *, | |
| use_dropout: bool = True, | |
| ) -> TransformerOutput: | |
| """Transforms input embedding sequences to output embedding sequences.""" | |
| def layer_norm(x: jax.Array) -> jax.Array: | |
| """Applies a unique LayerNorm to x with default settings.""" | |
| if self.config.layer_norm: | |
| return hk.LayerNorm(axis=-1, create_scale=True, create_offset=True)(x) | |
| return x | |
| initializer = hk.initializers.VarianceScaling(2 / self.config.num_layers) | |
| dropout_rate = self.config.dropout_rate if use_dropout else 0. | |
| _, seq_len, model_size = embeddings.shape | |
| # Compute causal mask for autoregressive sequence modelling. | |
| mask = mask[:, None, None, :] # [B, H=1, T'=1, T] | |
| mask = mask.repeat(seq_len, axis=2) # [B, H=1, T, T] | |
| if self.config.causal: | |
| causal_mask = np.ones((1, 1, seq_len, seq_len)) # [B=1, H=1, T, T] | |
| causal_mask = np.tril(causal_mask) | |
| mask = mask * causal_mask # [B, H=1, T, T] | |
| # Set up activation collection. | |
| collected = collections.defaultdict(list) | |
| def collect(**kwargs): | |
| for k, v in kwargs.items(): | |
| collected[k].append(v) | |
| residual = embeddings | |
| for layer in range(self.config.num_layers): | |
| with hk.experimental.name_scope(f"layer_{layer}"): | |
| # First the attention block. | |
| attn_block = attention.MultiHeadAttention( | |
| num_heads=self.config.num_heads, | |
| key_size=self.config.key_size, | |
| model_size=model_size, | |
| w_init=initializer, | |
| name="attn") | |
| attn_in = layer_norm(residual) | |
| attn_out = attn_block(attn_in, attn_in, attn_in, mask=mask) | |
| attn_out, attn_logits = attn_out.out, attn_out.logits | |
| if dropout_rate > 0: | |
| attn_out = hk.dropout(hk.next_rng_key(), dropout_rate, attn_out) | |
| residual = residual + attn_out | |
| collect( | |
| residuals=residual, layer_outputs=attn_out, attn_logits=attn_logits) | |
| # Then the dense block. | |
| with hk.experimental.name_scope("mlp"): | |
| dense_block = hk.Sequential([ | |
| hk.Linear( | |
| self.config.mlp_hidden_size, | |
| w_init=initializer, | |
| name="linear_1"), | |
| self.config.activation_function, | |
| hk.Linear(model_size, w_init=initializer, name="linear_2"), | |
| ]) | |
| dense_in = layer_norm(residual) | |
| dense_out = dense_block(dense_in) | |
| if dropout_rate > 0: | |
| dense_out = hk.dropout(hk.next_rng_key(), dropout_rate, dense_out) | |
| residual = residual + dense_out | |
| collect(residuals=residual, layer_outputs=dense_out) | |
| return TransformerOutput( | |
| residuals=collected["residuals"], | |
| layer_outputs=collected["layer_outputs"], | |
| attn_logits=collected["attn_logits"], | |
| output=layer_norm(residual), | |
| input_embeddings=embeddings, | |
| ) | |
| class CompiledTransformerModelOutput: | |
| transformer_output: TransformerOutput | |
| unembedded_output: jax.Array # [B, T] | |
| class CompiledTransformerModel(hk.Module): | |
| """A transformer model with one-hot embeddings.""" | |
| transformer: Transformer | |
| token_embed: CallableHaikuModule | |
| position_embed: CallableHaikuModule | |
| unembed: CallableHaikuModule | |
| use_unembed_argmax: bool | |
| pad_token: Optional[int] = None | |
| def embed(self, tokens: jax.Array) -> jax.Array: | |
| token_embeddings = self.token_embed(tokens) | |
| positional_embeddings = self.position_embed(jnp.indices(tokens.shape)[-1]) | |
| return token_embeddings + positional_embeddings # [B, T, D] | |
| def __call__( | |
| self, | |
| tokens: jax.Array, | |
| use_dropout: bool = True, | |
| ) -> CompiledTransformerModelOutput: | |
| """Embed tokens, pass through model, and unembed output.""" | |
| if self.pad_token is None: | |
| input_mask = jnp.ones_like(tokens) | |
| else: | |
| input_mask = (tokens != self.pad_token) | |
| input_embeddings = self.embed(tokens) | |
| transformer_output = self.transformer( | |
| input_embeddings, | |
| input_mask, | |
| use_dropout=use_dropout, | |
| ) | |
| return CompiledTransformerModelOutput( | |
| transformer_output=transformer_output, | |
| unembedded_output=self.unembed( | |
| transformer_output.output, | |
| use_unembed_argmax=self.use_unembed_argmax, | |
| ), | |
| ) | |