| import tensorflow as tf |
| from tensorflow.keras import layers, Sequential |
| import tensorflow_hub as hub |
| from tensorflow.keras.utils import plot_model |
| import sys |
| import os |
|
|
|
|
| parent_root = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir)) |
| sys.path.append(parent_root) |
| from config.configs import Params |
| params = Params() |
|
|
|
|
| class TransformerEncoderLayer(tf.keras.layers.Layer): |
| def __init__(self, d_model, nhead, dim_feedforward, dropout_rate=0.1): |
| """ |
| args: |
| - d_model: Embedding dim |
| - nhead: Number of heads in MultiHeadAttention |
| - dim_feedforward: Dense layer's dim |
| """ |
| super(TransformerEncoderLayer, self).__init__() |
|
|
| self.attention = layers.MultiHeadAttention(num_heads=nhead, key_dim=d_model // nhead) |
| self.feed_forward = tf.keras.Sequential([ |
| layers.Dense(dim_feedforward, activation='relu'), |
| layers.Dense(d_model) |
| ]) |
| self.norm1 = layers.LayerNormalization(epsilon=1e-6) |
| self.norm2 = layers.LayerNormalization(epsilon=1e-6) |
| self.dropout = layers.Dropout(dropout_rate) |
|
|
|
|
|
|
| def call(self, inputs, training=True, mask=None): |
| attention_output = self.attention(inputs, inputs, inputs, attention_mask=mask) |
| attention_output = self.dropout(attention_output) |
| output1 = self.norm1(inputs + attention_output) |
|
|
| feed_forward_output = self.feed_forward(output1) |
| feed_forward_output = self.dropout(feed_forward_output) |
| output2 = self.norm2(output1 + feed_forward_output) |
|
|
| return output2 |
|
|
|
|
|
|
| class TransformerEncoder(tf.keras.layers.Layer): |
| """ |
| Stack of TransformerEncoderLayer |
| """ |
|
|
| def __init__(self, num_layers, d_model, nhead, dim_feedforward, dropout_rate=0.1): |
| super(TransformerEncoder, self).__init__() |
|
|
| self.encoder_layers = [TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout_rate) |
| for _ in range(num_layers)] |
|
|
|
|
| def call(self, inputs, training=True, mask=None): |
| x = inputs |
| for layer in self.encoder_layers: |
| x = layer(x, training=training, mask=mask) |
| return x |
|
|
|
|
|
|
| class TransformerModel(object): |
| def __init__(self, word_vectorizer = None, char_vectorizer = None, word_embed = None, char_embed= None, |
| num_layers = None, d_model = None, nhead = None, dim_feedforward = None, pretrained_embedding = None, |
| glove_embed = None, bert_process = None, bert_layer = None, dropout_rate=0.1, num_classes=5): |
| super(TransformerModel, self).__init__() |
| """ |
| args: |
| - word_vectorizer: Word-level vectorizer |
| - char_vectorizer: Char-level vectorizer |
| - word_embed: Word-level embedding layer |
| - char_embed: Char-level embedding layer |
| - pretrained_embedding: "bert", "glove" or None. Default: None |
| - glove_embed: glove embedding layer |
| - bert_process: BERT input processing layer |
| - bert_layer: BERT embedding layer |
| - num_classes: Number of classes. Default: 5 ("Do not change") |
| |
| - num_layers: Number of TransformerEncoder in the model |
| - d_model: Embedding dim |
| - nhead: Number of heads in MultiHeadAttention |
| - dim_feedforward: Dense layer's dim |
| """ |
| |
| self.pretrained_embedding = pretrained_embedding |
| self.num_classes = num_classes |
| self.word_output_dim = params.WORD_OUTPUT_DIM |
| self.char_output_dim = params.CHAR_OUTPUT_DIM |
| self.concate_dim = self.word_output_dim + 2 * self.char_output_dim |
|
|
| self.line_ids_input_dim = params.LINE_IDS_DEPTH |
| self.length_lines_input_dim = params.LENGTH_LINES_DEPTH |
| self.total_lines_input_dim = params.TOTAL_LINES_DEPTH |
|
|
|
|
| |
| self.word_vectorizer = word_vectorizer |
| self.char_vectorizer = char_vectorizer |
|
|
|
|
| |
| self.word_embed = word_embed |
| self.char_embed = char_embed |
| self.glove_embed = glove_embed |
| self.bert_process = bert_process |
| self.bert_layer = bert_layer |
|
|
| |
| self.encoder = TransformerEncoder(num_layers, d_model, nhead, dim_feedforward, dropout_rate) |
|
|
|
|
|
|
| def word_level_branch(self, word_input): |
| """ |
| Pretrained BERT don't need vectorization layer |
| """ |
| if str(self.pretrained_embedding).lower() == "bert": |
| |
| bert_input = self.bert_process(word_input) |
| bert_output = self.bert_layer(bert_input, training = False) |
| word_embeddings = bert_output['sequence_output'] |
| else: |
| if (self.word_vectorizer): |
| if (str(self.pretrained_embedding).lower() == "glove"): |
| |
| word_vectors = self.word_vectorizer(word_input) |
| word_embeddings = self.glove_embed(word_vectors) |
| else: |
| |
| word_vectors = self.word_vectorizer(word_input) |
| word_embeddings = self.word_embed(word_vectors) |
| else: |
| raise Exception("Please provide word vectorizer.") |
| |
| x = layers.Dense(128, activation = "relu")(word_embeddings) |
| word_outputs = layers.BatchNormalization()(x) |
|
|
| return word_outputs |
|
|
|
|
| def char_level_branch(self, char_input): |
| """ |
| arg: |
| - char_input: char-level tokens embedding |
| """ |
| char_vectors = self.char_vectorizer(char_input) |
| char_embeddings = self.char_embed(char_vectors) |
| x = layers.Dense(128, activation ="relu")(char_embeddings) |
| x = layers.BatchNormalization()(x) |
| return x |
|
|
|
|
|
|
| def line_ids_branch(self, line_id_in): |
| """ |
| arg: |
| - line_id_in: line_ids one-hot embedding |
| """ |
|
|
| x = layers.Dense(64, activation = "relu")(line_id_in) |
| x = layers.BatchNormalization()(x) |
| return x |
|
|
|
|
|
|
| def length_lines_branch(self, length_line_in): |
| """ |
| arg: |
| - length_line_in: length_lines one-hot embedding |
| """ |
|
|
| x = layers.Dense(64, activation = "relu")(length_line_in) |
| x = layers.BatchNormalization()(x) |
| return x |
|
|
|
|
| def total_lines_branch(self, total_line_in): |
| """ |
| arg: |
| -total_lines_in: total_lines one-hot embedding |
| """ |
|
|
| x = layers.Dense(64, activation = "relu")(total_line_in) |
| x = layers.BatchNormalization()(x) |
| return x |
|
|
|
|
| def _get_model(self): |
|
|
| |
| |
| word_inputs = layers.Input(shape = [], dtype = tf.string, name = "token_input") |
| char_inputs = layers.Input(shape = (1, ), dtype = tf.string, name = "char_input") |
| |
| line_ids_inputs = layers.Input(shape = (self.line_ids_input_dim, ), name = "line_ids_input") |
| length_lines_inputs = layers.Input(shape = (self.length_lines_input_dim, ), name = "length_lines_input") |
| total_lines_inputs = layers.Input(shape = (self.total_lines_input_dim, ), name = "total_lines_input") |
|
|
| |
| |
| |
| word_level_output = self.word_level_branch(word_inputs) |
|
|
| |
| char_level_output = self.char_level_branch(char_inputs) |
|
|
| |
| line_ids_output = self.line_ids_branch(line_ids_inputs) |
| length_lines_output = self.length_lines_branch(length_lines_inputs) |
| total_lines_output = self.total_lines_branch(total_lines_inputs) |
| |
|
|
| |
| word_char_concat = tf.concat([word_level_output,char_level_output], axis = 1) |
|
|
| |
| |
| position_embed = tf.concat([length_lines_output, line_ids_output, total_lines_output], axis = 1) |
|
|
| |
| position_embed = layers.Dense(128, activation = "relu")(position_embed) |
|
|
| |
| position_embed = tf.expand_dims(position_embed, axis = 1) |
| |
| |
| |
| total_embed = tf.concat([word_char_concat, position_embed], axis = 1) |
|
|
| |
| x = self.encoder(total_embed, training=True, mask= None) |
|
|
| |
| x = layers.Bidirectional(layers.LSTM(64))(x) |
| |
|
|
| |
| x = layers.Dense(64, activation="relu")(x) |
| x = layers.BatchNormalization()(x) |
| x = layers.Dropout(0.5)(x) |
| output= layers.Dense(self.num_classes, activation = "softmax")(x) |
| model= tf.keras.Model(inputs=[word_inputs, char_inputs, line_ids_inputs, length_lines_inputs, total_lines_inputs], |
| outputs= output, |
| name="transformer_encoder_based_model") |
|
|
| model.compile(optimizer = params.OPTIMIZER, loss = params.LOSS, metrics = params.METRICS) |
| |
| return model |
|
|
| def _define_checkpoint(self): |
| """ |
| Define checkpoint for model |
| """ |
|
|
| if str(self.pretrained_embedding).lower() == "glove": |
| if not os.path.exists(params.TF_BASED_GLOVE_MODEL_DIR): |
| os.makedirs(params.TF_BASED_GLOVE_MODEL_DIR) |
| checkpoint_dir = params.TF_BASED_GLOVE_MODEL_DIR |
| elif str(self.pretrained_embedding).lower() == "bert": |
| if not os.path.exists(params.TF_BASED_BERT_MODEL_DIR): |
| os.makedirs(params.TF_BASED_BERT_MODEL_DIR) |
| checkpoint_dir = params.TF_BASED_BERT_MODEL_DIR |
| else: |
| if not os.path.exists(params.TF_BASED_NOR_MODEL_DIR): |
| os.makedirs(params.TF_BASED_NOR_MODEL_DIR) |
| checkpoint_dir = params.TF_BASED_NOR_MODEL_DIR |
| |
| checkpoint= tf.keras.callbacks.ModelCheckpoint( |
| filepath = checkpoint_dir + '/best_model.ckpt', |
| monitor = "val_categorical_accuracy", |
| save_best_only = True, |
| save_weights_only = True, |
| verbose = 1 |
| ) |
| return checkpoint |
|
|
|
|
| def _plot_model(self, model): |
| plot_model(model) |
| return |
|
|