|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| """Tests for transformer-based bert encoder network."""
|
|
|
|
|
| from absl.testing import parameterized
|
| import numpy as np
|
| import tensorflow as tf, tf_keras
|
|
|
| from official.nlp.modeling.networks import bert_encoder
|
| from official.projects.token_dropping import encoder
|
|
|
|
|
| class TokenDropBertEncoderTest(tf.test.TestCase, parameterized.TestCase):
|
|
|
| def tearDown(self):
|
| super(TokenDropBertEncoderTest, self).tearDown()
|
| tf_keras.mixed_precision.set_global_policy("float32")
|
|
|
| def test_dict_outputs_network_creation(self):
|
| hidden_size = 32
|
| sequence_length = 21
|
|
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=100,
|
| hidden_size=hidden_size,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| token_keep_k=2,
|
| token_allow_list=(),
|
| token_deny_list=())
|
|
|
| word_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| mask = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| type_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| dict_outputs = test_network(
|
| dict(input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids))
|
| data = dict_outputs["sequence_output"]
|
| pooled = dict_outputs["pooled_output"]
|
|
|
| self.assertIsInstance(test_network.transformer_layers, list)
|
| self.assertLen(test_network.transformer_layers, 3)
|
| self.assertIsInstance(test_network.pooler_layer, tf_keras.layers.Dense)
|
|
|
| expected_data_shape = [None, sequence_length, hidden_size]
|
| expected_pooled_shape = [None, hidden_size]
|
| self.assertAllEqual(expected_data_shape, data.shape.as_list())
|
| self.assertAllEqual(expected_pooled_shape, pooled.shape.as_list())
|
|
|
|
|
| self.assertAllEqual(tf.float32, data.dtype)
|
| self.assertAllEqual(tf.float32, pooled.dtype)
|
|
|
| def test_dict_outputs_all_encoder_outputs_network_creation(self):
|
| hidden_size = 32
|
| sequence_length = 21
|
|
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=100,
|
| hidden_size=hidden_size,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| dict_outputs=True,
|
| token_keep_k=sequence_length,
|
| token_allow_list=(),
|
| token_deny_list=())
|
|
|
| word_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| mask = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| type_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| dict_outputs = test_network(
|
| dict(input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids))
|
| all_encoder_outputs = dict_outputs["encoder_outputs"]
|
| pooled = dict_outputs["pooled_output"]
|
|
|
| expected_data_shape = [None, sequence_length, hidden_size]
|
| expected_pooled_shape = [None, hidden_size]
|
| self.assertLen(all_encoder_outputs, 3)
|
| for data in all_encoder_outputs:
|
| self.assertAllEqual(expected_data_shape, data.shape.as_list())
|
| self.assertAllEqual(expected_pooled_shape, pooled.shape.as_list())
|
|
|
|
|
| self.assertAllEqual(tf.float32, all_encoder_outputs[-1].dtype)
|
| self.assertAllEqual(tf.float32, pooled.dtype)
|
|
|
| def test_dict_outputs_network_creation_with_float16_dtype(self):
|
| hidden_size = 32
|
| sequence_length = 21
|
| tf_keras.mixed_precision.set_global_policy("mixed_float16")
|
|
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=100,
|
| hidden_size=hidden_size,
|
| num_attention_heads=2,
|
| num_layers=4,
|
| dict_outputs=True,
|
| token_keep_k=2,
|
| token_allow_list=(),
|
| token_deny_list=())
|
|
|
| word_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| mask = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| type_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| dict_outputs = test_network(
|
| dict(input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids))
|
| data = dict_outputs["sequence_output"]
|
| pooled = dict_outputs["pooled_output"]
|
|
|
| expected_data_shape = [None, sequence_length, hidden_size]
|
| expected_pooled_shape = [None, hidden_size]
|
| self.assertAllEqual(expected_data_shape, data.shape.as_list())
|
| self.assertAllEqual(expected_pooled_shape, pooled.shape.as_list())
|
|
|
|
|
|
|
| self.assertAllEqual(tf.float32, data.dtype)
|
| self.assertAllEqual(tf.float16, pooled.dtype)
|
|
|
| @parameterized.named_parameters(
|
| ("all_sequence_encoder", None, 21),
|
| ("output_range_encoder", 1, 1),
|
| )
|
| def test_dict_outputs_network_invocation(
|
| self, output_range, out_seq_len):
|
| hidden_size = 32
|
| sequence_length = 21
|
| vocab_size = 57
|
| num_types = 7
|
|
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=vocab_size,
|
| hidden_size=hidden_size,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| type_vocab_size=num_types,
|
| dict_outputs=True,
|
| token_keep_k=2,
|
| token_allow_list=(),
|
| token_deny_list=())
|
|
|
| word_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| mask = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| type_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| dict_outputs = test_network(
|
| dict(input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids),
|
| output_range=output_range)
|
| data = dict_outputs["sequence_output"]
|
| pooled = dict_outputs["pooled_output"]
|
|
|
|
|
| model = tf_keras.Model([word_ids, mask, type_ids], [data, pooled])
|
|
|
|
|
|
|
| batch_size = 3
|
| word_id_data = np.random.randint(
|
| vocab_size, size=(batch_size, sequence_length))
|
| mask_data = np.random.randint(2, size=(batch_size, sequence_length))
|
| type_id_data = np.random.randint(
|
| num_types, size=(batch_size, sequence_length))
|
| outputs = model.predict([word_id_data, mask_data, type_id_data])
|
| self.assertEqual(outputs[0].shape[1], out_seq_len)
|
|
|
|
|
| max_sequence_length = 128
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=vocab_size,
|
| hidden_size=hidden_size,
|
| max_sequence_length=max_sequence_length,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| type_vocab_size=num_types,
|
| dict_outputs=True,
|
| token_keep_k=2,
|
| token_allow_list=(),
|
| token_deny_list=())
|
| dict_outputs = test_network(
|
| dict(input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids))
|
| data = dict_outputs["sequence_output"]
|
| pooled = dict_outputs["pooled_output"]
|
| model = tf_keras.Model([word_ids, mask, type_ids], [data, pooled])
|
| outputs = model.predict([word_id_data, mask_data, type_id_data])
|
| self.assertEqual(outputs[0].shape[1], sequence_length)
|
|
|
|
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=vocab_size,
|
| hidden_size=hidden_size,
|
| max_sequence_length=max_sequence_length,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| type_vocab_size=num_types,
|
| embedding_width=16,
|
| dict_outputs=True,
|
| token_keep_k=2,
|
| token_allow_list=(),
|
| token_deny_list=())
|
| dict_outputs = test_network(
|
| dict(input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids))
|
| data = dict_outputs["sequence_output"]
|
| pooled = dict_outputs["pooled_output"]
|
| model = tf_keras.Model([word_ids, mask, type_ids], [data, pooled])
|
| outputs = model.predict([word_id_data, mask_data, type_id_data])
|
| self.assertEqual(outputs[0].shape[-1], hidden_size)
|
| self.assertTrue(hasattr(test_network, "_embedding_projection"))
|
|
|
| def test_network_creation(self):
|
| hidden_size = 32
|
| sequence_length = 21
|
|
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=100,
|
| hidden_size=hidden_size,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| token_keep_k=2,
|
| token_allow_list=(),
|
| token_deny_list=())
|
|
|
| word_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| mask = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| type_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| dict_outputs = test_network(
|
| dict(input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids))
|
| data = dict_outputs["sequence_output"]
|
| pooled = dict_outputs["pooled_output"]
|
|
|
| self.assertIsInstance(test_network.transformer_layers, list)
|
| self.assertLen(test_network.transformer_layers, 3)
|
| self.assertIsInstance(test_network.pooler_layer, tf_keras.layers.Dense)
|
|
|
| expected_data_shape = [None, sequence_length, hidden_size]
|
| expected_pooled_shape = [None, hidden_size]
|
| self.assertAllEqual(expected_data_shape, data.shape.as_list())
|
| self.assertAllEqual(expected_pooled_shape, pooled.shape.as_list())
|
|
|
|
|
| self.assertAllEqual(tf.float32, data.dtype)
|
| self.assertAllEqual(tf.float32, pooled.dtype)
|
|
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=100,
|
| hidden_size=hidden_size,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| token_keep_k=2,
|
| token_allow_list=(),
|
| token_deny_list=())
|
|
|
| inputs = dict(
|
| input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids)
|
| _ = test_network(inputs)
|
|
|
| def test_all_encoder_outputs_network_creation(self):
|
| hidden_size = 32
|
| sequence_length = 21
|
|
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=100,
|
| hidden_size=hidden_size,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| return_all_encoder_outputs=True,
|
| token_keep_k=sequence_length,
|
| token_allow_list=(),
|
| token_deny_list=())
|
|
|
| word_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| mask = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| type_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| dict_outputs = test_network(
|
| dict(input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids))
|
| all_encoder_outputs = dict_outputs["encoder_outputs"]
|
| pooled = dict_outputs["pooled_output"]
|
|
|
| expected_data_shape = [None, sequence_length, hidden_size]
|
| expected_pooled_shape = [None, hidden_size]
|
| self.assertLen(all_encoder_outputs, 3)
|
| for data in all_encoder_outputs:
|
| self.assertAllEqual(expected_data_shape, data.shape.as_list())
|
| self.assertAllEqual(expected_pooled_shape, pooled.shape.as_list())
|
|
|
|
|
| self.assertAllEqual(tf.float32, all_encoder_outputs[-1].dtype)
|
| self.assertAllEqual(tf.float32, pooled.dtype)
|
|
|
| def test_network_creation_with_float16_dtype(self):
|
| hidden_size = 32
|
| sequence_length = 21
|
| tf_keras.mixed_precision.set_global_policy("mixed_float16")
|
|
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=100,
|
| hidden_size=hidden_size,
|
| num_attention_heads=2,
|
| num_layers=4,
|
| token_keep_k=2,
|
| token_allow_list=(),
|
| token_deny_list=())
|
|
|
| word_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| mask = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| type_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| dict_outputs = test_network(
|
| dict(input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids))
|
| data = dict_outputs["sequence_output"]
|
| pooled = dict_outputs["pooled_output"]
|
|
|
| expected_data_shape = [None, sequence_length, hidden_size]
|
| expected_pooled_shape = [None, hidden_size]
|
| self.assertAllEqual(expected_data_shape, data.shape.as_list())
|
| self.assertAllEqual(expected_pooled_shape, pooled.shape.as_list())
|
|
|
|
|
|
|
| self.assertAllEqual(tf.float32, data.dtype)
|
| self.assertAllEqual(tf.float16, pooled.dtype)
|
|
|
| @parameterized.named_parameters(
|
| ("all_sequence", None, 21),
|
| ("output_range", 1, 1),
|
| )
|
| def test_network_invocation(self, output_range, out_seq_len):
|
| hidden_size = 32
|
| sequence_length = 21
|
| vocab_size = 57
|
| num_types = 7
|
|
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=vocab_size,
|
| hidden_size=hidden_size,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| type_vocab_size=num_types,
|
| token_keep_k=2,
|
| token_allow_list=(),
|
| token_deny_list=())
|
|
|
| word_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| mask = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| type_ids = tf_keras.Input(shape=(sequence_length,), dtype=tf.int32)
|
| dict_outputs = test_network(
|
| dict(input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids),
|
| output_range=output_range)
|
| data = dict_outputs["sequence_output"]
|
| pooled = dict_outputs["pooled_output"]
|
|
|
|
|
| model = tf_keras.Model([word_ids, mask, type_ids], [data, pooled])
|
|
|
|
|
|
|
| batch_size = 3
|
| word_id_data = np.random.randint(
|
| vocab_size, size=(batch_size, sequence_length))
|
| mask_data = np.random.randint(2, size=(batch_size, sequence_length))
|
| type_id_data = np.random.randint(
|
| num_types, size=(batch_size, sequence_length))
|
| outputs = model.predict([word_id_data, mask_data, type_id_data])
|
| self.assertEqual(outputs[0].shape[1], out_seq_len)
|
|
|
|
|
| max_sequence_length = 128
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=vocab_size,
|
| hidden_size=hidden_size,
|
| max_sequence_length=max_sequence_length,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| type_vocab_size=num_types,
|
| token_keep_k=2,
|
| token_allow_list=(),
|
| token_deny_list=())
|
| dict_outputs = test_network(
|
| dict(input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids))
|
| data = dict_outputs["sequence_output"]
|
| pooled = dict_outputs["pooled_output"]
|
| model = tf_keras.Model([word_ids, mask, type_ids], [data, pooled])
|
| outputs = model.predict([word_id_data, mask_data, type_id_data])
|
| self.assertEqual(outputs[0].shape[1], sequence_length)
|
|
|
|
|
| test_network = encoder.TokenDropBertEncoder(
|
| vocab_size=vocab_size,
|
| hidden_size=hidden_size,
|
| max_sequence_length=max_sequence_length,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| type_vocab_size=num_types,
|
| embedding_width=16,
|
| token_keep_k=2,
|
| token_allow_list=(),
|
| token_deny_list=())
|
| dict_outputs = test_network(
|
| dict(input_word_ids=word_ids, input_mask=mask, input_type_ids=type_ids))
|
| data = dict_outputs["sequence_output"]
|
| pooled = dict_outputs["pooled_output"]
|
| model = tf_keras.Model([word_ids, mask, type_ids], [data, pooled])
|
| outputs = model.predict([word_id_data, mask_data, type_id_data])
|
| self.assertEqual(outputs[0].shape[-1], hidden_size)
|
| self.assertTrue(hasattr(test_network, "_embedding_projection"))
|
|
|
|
|
| class TokenDropCompatibilityTest(tf.test.TestCase):
|
|
|
| def tearDown(self):
|
| super().tearDown()
|
| tf_keras.mixed_precision.set_global_policy("float32")
|
|
|
| def test_checkpoint_forward_compatible(self):
|
| batch_size = 3
|
|
|
| hidden_size = 32
|
| sequence_length = 21
|
| vocab_size = 57
|
| num_types = 7
|
|
|
| kwargs = dict(
|
| vocab_size=vocab_size,
|
| hidden_size=hidden_size,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| type_vocab_size=num_types,
|
| output_range=None)
|
|
|
| word_id_data = np.random.randint(
|
| vocab_size, size=(batch_size, sequence_length))
|
| mask_data = np.random.randint(2, size=(batch_size, sequence_length))
|
| type_id_data = np.random.randint(
|
| num_types, size=(batch_size, sequence_length))
|
| data = dict(
|
| input_word_ids=word_id_data,
|
| input_mask=mask_data,
|
| input_type_ids=type_id_data)
|
|
|
| old_net = bert_encoder.BertEncoderV2(**kwargs)
|
| old_net_outputs = old_net(data)
|
| ckpt = tf.train.Checkpoint(net=old_net)
|
| path = ckpt.save(self.get_temp_dir())
|
| new_net = encoder.TokenDropBertEncoder(
|
| token_keep_k=sequence_length,
|
| token_allow_list=(),
|
| token_deny_list=(),
|
| **kwargs)
|
| new_ckpt = tf.train.Checkpoint(net=new_net)
|
| status = new_ckpt.restore(path)
|
| status.assert_existing_objects_matched()
|
|
|
| new_net_outputs = new_net(data)
|
|
|
| self.assertAllEqual(old_net_outputs.keys(), new_net_outputs.keys())
|
| for key in old_net_outputs:
|
| self.assertAllClose(old_net_outputs[key], new_net_outputs[key])
|
|
|
| def test_keras_model_checkpoint_forward_compatible(self):
|
| batch_size = 3
|
|
|
| hidden_size = 32
|
| sequence_length = 21
|
| vocab_size = 57
|
| num_types = 7
|
|
|
| kwargs = dict(
|
| vocab_size=vocab_size,
|
| hidden_size=hidden_size,
|
| num_attention_heads=2,
|
| num_layers=3,
|
| type_vocab_size=num_types,
|
| output_range=None)
|
|
|
| word_id_data = np.random.randint(
|
| vocab_size, size=(batch_size, sequence_length))
|
| mask_data = np.random.randint(2, size=(batch_size, sequence_length))
|
| type_id_data = np.random.randint(
|
| num_types, size=(batch_size, sequence_length))
|
| data = dict(
|
| input_word_ids=word_id_data,
|
| input_mask=mask_data,
|
| input_type_ids=type_id_data)
|
|
|
| old_net = bert_encoder.BertEncoderV2(**kwargs)
|
| inputs = old_net.inputs
|
| outputs = old_net(inputs)
|
| old_model = tf_keras.Model(inputs=inputs, outputs=outputs)
|
| old_model_outputs = old_model(data)
|
| ckpt = tf.train.Checkpoint(net=old_model)
|
| path = ckpt.save(self.get_temp_dir())
|
| new_net = encoder.TokenDropBertEncoder(
|
| token_keep_k=sequence_length,
|
| token_allow_list=(),
|
| token_deny_list=(),
|
| **kwargs)
|
| inputs = new_net.inputs
|
| outputs = new_net(inputs)
|
| new_model = tf_keras.Model(inputs=inputs, outputs=outputs)
|
| new_ckpt = tf.train.Checkpoint(net=new_model)
|
| new_ckpt.restore(path)
|
|
|
| new_model_outputs = new_model(data)
|
|
|
| self.assertAllEqual(old_model_outputs.keys(), new_model_outputs.keys())
|
| for key in old_model_outputs:
|
| self.assertAllClose(old_model_outputs[key], new_model_outputs[key])
|
|
|
|
|
| if __name__ == "__main__":
|
| tf.test.main()
|
|
|