Spaces:
Build error
Build error
| import numpy as np | |
| import tensorflow as tf | |
| import tensorflow_addons as tfa | |
| from tensorflow.keras import layers | |
| import transformers | |
| import os | |
| MAX_LENGTH = 512 # the maximum number of messages per input | |
| BATCH_SIZE = 8 # number of messages processed at a time | |
| class MeanPool(tf.keras.layers.Layer): | |
| def call(self, inputs, mask=None): | |
| broadcast_mask = tf.expand_dims(tf.cast(mask, "float32"), -1) | |
| embedding_sum = tf.reduce_sum(inputs * broadcast_mask, axis=1) | |
| mask_sum = tf.reduce_sum(broadcast_mask, axis=1) | |
| mask_sum = tf.math.maximum(mask_sum, tf.constant([1e-9])) | |
| return embedding_sum / mask_sum | |
| class WeightsSumOne(tf.keras.constraints.Constraint): | |
| def __call__(self, w): | |
| return tf.nn.softmax(w, axis=0) | |
| def deberta_init( | |
| pretrained_model_name: str = "microsoft/deberta-v3-large", tokenizer_dir: str = "." | |
| ): | |
| """Helper function to quickly initialize the config and tokenizer for a model | |
| Args: | |
| pretrained_model_name (str, optional): The model name. Defaults to "microsoft/deberta-v3-large". | |
| tokenizer_dir (str, optional): Directory of the tokenizer. Defaults to ".". | |
| Returns: | |
| The configuration and tokenizer of the model. | |
| """ | |
| tokenizer = transformers.AutoTokenizer.from_pretrained(pretrained_model_name) | |
| tokenizer_path = os.path.join(tokenizer_dir, "tokenizer") | |
| tokenizer.save_pretrained(tokenizer_path) | |
| cfg = transformers.AutoConfig.from_pretrained( | |
| pretrained_model_name, output_hidden_states=True | |
| ) | |
| cfg.hidden_dropout_prob = 0 | |
| cfg.attention_probs_dropout_prob = 0 | |
| cfg.save_pretrained(tokenizer_path) | |
| return cfg, tokenizer | |
| def get_model(cfg): | |
| """Get a DeBERTa model using the specified configuration | |
| Args: | |
| cfg : the configuration of the model (can be generated using deberta_init) | |
| Returns: | |
| The model with respect to the given configuration. | |
| """ | |
| input_ids = tf.keras.layers.Input( | |
| shape=(MAX_LENGTH,), dtype=tf.int32, name="input_ids" | |
| ) | |
| attention_masks = tf.keras.layers.Input( | |
| shape=(MAX_LENGTH,), dtype=tf.int32, name="attention_masks" | |
| ) | |
| deberta_model = transformers.TFAutoModel.from_pretrained( | |
| "microsoft/deberta-v3-large", config=cfg | |
| ) | |
| REINIT_LAYERS = 1 | |
| normal_initializer = tf.keras.initializers.GlorotUniform() | |
| zeros_initializer = tf.keras.initializers.Zeros() | |
| ones_initializer = tf.keras.initializers.Ones() | |
| for encoder_block in deberta_model.deberta.encoder.layer[-REINIT_LAYERS:]: | |
| for layer in encoder_block.submodules: | |
| if isinstance(layer, tf.keras.layers.Dense): | |
| layer.kernel.assign( | |
| normal_initializer( | |
| shape=layer.kernel.shape, dtype=layer.kernel.dtype | |
| ) | |
| ) | |
| if layer.bias is not None: | |
| layer.bias.assign( | |
| zeros_initializer( | |
| shape=layer.bias.shape, dtype=layer.bias.dtype | |
| ) | |
| ) | |
| elif isinstance(layer, tf.keras.layers.LayerNormalization): | |
| layer.beta.assign( | |
| zeros_initializer(shape=layer.beta.shape, dtype=layer.beta.dtype) | |
| ) | |
| layer.gamma.assign( | |
| ones_initializer(shape=layer.gamma.shape, dtype=layer.gamma.dtype) | |
| ) | |
| deberta_output = deberta_model.deberta(input_ids, attention_mask=attention_masks) | |
| hidden_states = deberta_output.hidden_states | |
| # WeightedLayerPool + MeanPool of the last 4 hidden states | |
| stack_meanpool = tf.stack( | |
| [MeanPool()(hidden_s, mask=attention_masks) for hidden_s in hidden_states[-4:]], | |
| axis=2, | |
| ) | |
| weighted_layer_pool = layers.Dense( | |
| 1, use_bias=False, kernel_constraint=WeightsSumOne() | |
| )(stack_meanpool) | |
| weighted_layer_pool = tf.squeeze(weighted_layer_pool, axis=-1) | |
| output = layers.Dense(15, activation="linear")(weighted_layer_pool) | |
| model = tf.keras.Model(inputs=[input_ids, attention_masks], outputs=output) | |
| # Compile model with Layer-wise Learning Rate Decay | |
| layer_list = [deberta_model.deberta.embeddings] + list( | |
| deberta_model.deberta.encoder.layer | |
| ) | |
| layer_list.reverse() | |
| INIT_LR = 1e-5 | |
| LLRDR = 0.9 | |
| LR_SCH_DECAY_STEPS = 1600 | |
| lr_schedules = [ | |
| tf.keras.optimizers.schedules.ExponentialDecay( | |
| initial_learning_rate=INIT_LR * LLRDR**i, | |
| decay_steps=LR_SCH_DECAY_STEPS, | |
| decay_rate=0.3, | |
| ) | |
| for i in range(len(layer_list)) | |
| ] | |
| lr_schedule_head = tf.keras.optimizers.schedules.ExponentialDecay( | |
| initial_learning_rate=1e-4, decay_steps=LR_SCH_DECAY_STEPS, decay_rate=0.3 | |
| ) | |
| optimizers = [ | |
| tf.keras.optimizers.Adam(learning_rate=lr_sch) for lr_sch in lr_schedules | |
| ] | |
| optimizers_and_layers = [ | |
| (tf.keras.optimizers.Adam(learning_rate=lr_schedule_head), model.layers[-4:]) | |
| ] + list(zip(optimizers, layer_list)) | |
| optimizer = tfa.optimizers.MultiOptimizer(optimizers_and_layers) | |
| model.compile( | |
| optimizer=optimizer, | |
| loss="mse", | |
| metrics=[tf.keras.metrics.RootMeanSquaredError()], | |
| ) | |
| return model | |
| def deberta_encode(texts: str, tokenizer): | |
| """Helper function to tokenize the text using the specified tokenizer""" | |
| input_ids = [] | |
| attention_mask = [] | |
| for text in texts: | |
| token = tokenizer( | |
| text, | |
| add_special_tokens=True, | |
| max_length=512, | |
| return_attention_mask=True, | |
| return_tensors="np", | |
| truncation=True, | |
| padding="max_length", | |
| ) | |
| input_ids.append(token["input_ids"][0]) | |
| attention_mask.append(token["attention_mask"][0]) | |
| return np.array(input_ids, dtype="int32"), np.array(attention_mask, dtype="int32") | |
| def predict(model, tokenizer, texts): | |
| """Predict the labels for each messages in texts | |
| Args: | |
| model: your DeBERTa model | |
| tokenizer: a tokenizer (can be generated by deberta_init) | |
| texts (_type_): _description_ | |
| Returns: | |
| _type_: _description_ | |
| """ | |
| prediction = model.predict(deberta_encode(texts, tokenizer)) | |
| labels = np.argmax(prediction, axis=1) | |
| return labels | |
| def load_model(cfg, model_dir: str = "."): | |
| """Helper function to load a DeBERTa model with pretrained weights | |
| Args: | |
| cfg: configuration for the model (can be generated with deberta_init) | |
| model_dir (str, optional): the directory of the pretrained weights. Defaults to ".". | |
| Returns: | |
| A DeBERTa model with pretrained weights. | |
| """ | |
| tf.keras.backend.clear_session() | |
| model = get_model(cfg) | |
| model_path = os.path.join(model_dir, "best_model_fold2.h5") | |
| model.load_weights(model_path) | |
| return model | |
| # map the integer labels to their original string representation | |
| DEBERTA_LABEL_MAP = { | |
| 0: "Greeting", | |
| 1: "Curiosity", | |
| 2: "Interest", | |
| 3: "Obscene", | |
| 4: "Annoyed", | |
| 5: "Openness", | |
| 6: "Anxious", | |
| 7: "Acceptance", | |
| 8: "Uninterested", | |
| 9: "Informative", | |
| 10: "Accusatory", | |
| 11: "Denial", | |
| 12: "Confused", | |
| 13: "Disapproval", | |
| 14: "Remorse", | |
| } | |
| def decode_deberta_label(numeric_label): | |
| return DEBERTA_LABEL_MAP.get(numeric_label, "Unknown Label") | |