Spaces:
Sleeping
Sleeping
| # Copyright 2024 The TensorFlow Authors. All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """Tests for official.nlp.data.pretrain_dataloader.""" | |
| import itertools | |
| import os | |
| from absl.testing import parameterized | |
| import numpy as np | |
| import tensorflow as tf, tf_keras | |
| from official.nlp.data import pretrain_dataloader | |
| def create_int_feature(values): | |
| f = tf.train.Feature(int64_list=tf.train.Int64List(value=list(values))) | |
| return f | |
| def _create_fake_bert_dataset( | |
| output_path, | |
| seq_length, | |
| max_predictions_per_seq, | |
| use_position_id, | |
| use_next_sentence_label, | |
| use_v2_feature_names=False): | |
| """Creates a fake dataset.""" | |
| writer = tf.io.TFRecordWriter(output_path) | |
| def create_float_feature(values): | |
| f = tf.train.Feature(float_list=tf.train.FloatList(value=list(values))) | |
| return f | |
| for _ in range(100): | |
| features = {} | |
| input_ids = np.random.randint(100, size=(seq_length)) | |
| features["input_mask"] = create_int_feature(np.ones_like(input_ids)) | |
| if use_v2_feature_names: | |
| features["input_word_ids"] = create_int_feature(input_ids) | |
| features["input_type_ids"] = create_int_feature(np.ones_like(input_ids)) | |
| else: | |
| features["input_ids"] = create_int_feature(input_ids) | |
| features["segment_ids"] = create_int_feature(np.ones_like(input_ids)) | |
| features["masked_lm_positions"] = create_int_feature( | |
| np.random.randint(100, size=(max_predictions_per_seq))) | |
| features["masked_lm_ids"] = create_int_feature( | |
| np.random.randint(100, size=(max_predictions_per_seq))) | |
| features["masked_lm_weights"] = create_float_feature( | |
| [1.0] * max_predictions_per_seq) | |
| if use_next_sentence_label: | |
| features["next_sentence_labels"] = create_int_feature([1]) | |
| if use_position_id: | |
| features["position_ids"] = create_int_feature(range(0, seq_length)) | |
| tf_example = tf.train.Example(features=tf.train.Features(feature=features)) | |
| writer.write(tf_example.SerializeToString()) | |
| writer.close() | |
| def _create_fake_xlnet_dataset( | |
| output_path, seq_length, max_predictions_per_seq): | |
| """Creates a fake dataset.""" | |
| writer = tf.io.TFRecordWriter(output_path) | |
| for _ in range(100): | |
| features = {} | |
| input_ids = np.random.randint(100, size=(seq_length)) | |
| num_boundary_indices = np.random.randint(1, seq_length) | |
| if max_predictions_per_seq is not None: | |
| input_mask = np.zeros_like(input_ids) | |
| input_mask[:max_predictions_per_seq] = 1 | |
| np.random.shuffle(input_mask) | |
| else: | |
| input_mask = np.ones_like(input_ids) | |
| features["input_mask"] = create_int_feature(input_mask) | |
| features["input_word_ids"] = create_int_feature(input_ids) | |
| features["input_type_ids"] = create_int_feature(np.ones_like(input_ids)) | |
| features["boundary_indices"] = create_int_feature( | |
| sorted(np.random.randint(seq_length, size=(num_boundary_indices)))) | |
| features["target"] = create_int_feature(input_ids + 1) | |
| features["label"] = create_int_feature([1]) | |
| tf_example = tf.train.Example(features=tf.train.Features(feature=features)) | |
| writer.write(tf_example.SerializeToString()) | |
| writer.close() | |
| class BertPretrainDataTest(tf.test.TestCase, parameterized.TestCase): | |
| def test_load_data(self, use_next_sentence_label, use_position_id): | |
| train_data_path = os.path.join(self.get_temp_dir(), "train.tf_record") | |
| seq_length = 128 | |
| max_predictions_per_seq = 20 | |
| _create_fake_bert_dataset( | |
| train_data_path, | |
| seq_length, | |
| max_predictions_per_seq, | |
| use_next_sentence_label=use_next_sentence_label, | |
| use_position_id=use_position_id) | |
| data_config = pretrain_dataloader.BertPretrainDataConfig( | |
| input_path=train_data_path, | |
| max_predictions_per_seq=max_predictions_per_seq, | |
| seq_length=seq_length, | |
| global_batch_size=10, | |
| is_training=True, | |
| use_next_sentence_label=use_next_sentence_label, | |
| use_position_id=use_position_id) | |
| dataset = pretrain_dataloader.BertPretrainDataLoader(data_config).load() | |
| features = next(iter(dataset)) | |
| self.assertLen(features, | |
| 6 + int(use_next_sentence_label) + int(use_position_id)) | |
| self.assertIn("input_word_ids", features) | |
| self.assertIn("input_mask", features) | |
| self.assertIn("input_type_ids", features) | |
| self.assertIn("masked_lm_positions", features) | |
| self.assertIn("masked_lm_ids", features) | |
| self.assertIn("masked_lm_weights", features) | |
| self.assertEqual("next_sentence_labels" in features, | |
| use_next_sentence_label) | |
| self.assertEqual("position_ids" in features, use_position_id) | |
| def test_v2_feature_names(self): | |
| train_data_path = os.path.join(self.get_temp_dir(), "train.tf_record") | |
| seq_length = 128 | |
| max_predictions_per_seq = 20 | |
| _create_fake_bert_dataset( | |
| train_data_path, | |
| seq_length, | |
| max_predictions_per_seq, | |
| use_next_sentence_label=True, | |
| use_position_id=False, | |
| use_v2_feature_names=True) | |
| data_config = pretrain_dataloader.BertPretrainDataConfig( | |
| input_path=train_data_path, | |
| max_predictions_per_seq=max_predictions_per_seq, | |
| seq_length=seq_length, | |
| global_batch_size=10, | |
| is_training=True, | |
| use_next_sentence_label=True, | |
| use_position_id=False, | |
| use_v2_feature_names=True) | |
| dataset = pretrain_dataloader.BertPretrainDataLoader(data_config).load() | |
| features = next(iter(dataset)) | |
| self.assertIn("input_word_ids", features) | |
| self.assertIn("input_mask", features) | |
| self.assertIn("input_type_ids", features) | |
| self.assertIn("masked_lm_positions", features) | |
| self.assertIn("masked_lm_ids", features) | |
| self.assertIn("masked_lm_weights", features) | |
| class XLNetPretrainDataTest(parameterized.TestCase, tf.test.TestCase): | |
| def test_load_data( | |
| self, sample_strategy, reuse_length, max_predictions_per_seq): | |
| train_data_path = os.path.join(self.get_temp_dir(), "train.tf_record") | |
| seq_length = 128 | |
| batch_size = 5 | |
| _create_fake_xlnet_dataset( | |
| train_data_path, seq_length, max_predictions_per_seq) | |
| data_config = pretrain_dataloader.XLNetPretrainDataConfig( | |
| input_path=train_data_path, | |
| max_predictions_per_seq=max_predictions_per_seq, | |
| seq_length=seq_length, | |
| global_batch_size=batch_size, | |
| is_training=True, | |
| reuse_length=reuse_length, | |
| sample_strategy=sample_strategy, | |
| min_num_tokens=1, | |
| max_num_tokens=2, | |
| permutation_size=seq_length // 2, | |
| leak_ratio=0.1) | |
| if max_predictions_per_seq is None: | |
| with self.assertRaises(ValueError): | |
| dataset = pretrain_dataloader.XLNetPretrainDataLoader( | |
| data_config).load() | |
| features = next(iter(dataset)) | |
| else: | |
| dataset = pretrain_dataloader.XLNetPretrainDataLoader(data_config).load() | |
| features = next(iter(dataset)) | |
| self.assertIn("input_word_ids", features) | |
| self.assertIn("input_type_ids", features) | |
| self.assertIn("permutation_mask", features) | |
| self.assertIn("masked_tokens", features) | |
| self.assertIn("target", features) | |
| self.assertIn("target_mask", features) | |
| self.assertAllClose(features["input_word_ids"].shape, | |
| (batch_size, seq_length)) | |
| self.assertAllClose(features["input_type_ids"].shape, | |
| (batch_size, seq_length)) | |
| self.assertAllClose(features["permutation_mask"].shape, | |
| (batch_size, seq_length, seq_length)) | |
| self.assertAllClose(features["masked_tokens"].shape, | |
| (batch_size, seq_length,)) | |
| if max_predictions_per_seq is not None: | |
| self.assertIn("target_mapping", features) | |
| self.assertAllClose(features["target_mapping"].shape, | |
| (batch_size, max_predictions_per_seq, seq_length)) | |
| self.assertAllClose(features["target_mask"].shape, | |
| (batch_size, max_predictions_per_seq)) | |
| self.assertAllClose(features["target"].shape, | |
| (batch_size, max_predictions_per_seq)) | |
| else: | |
| self.assertAllClose(features["target_mask"].shape, | |
| (batch_size, seq_length)) | |
| self.assertAllClose(features["target"].shape, | |
| (batch_size, seq_length)) | |
| if __name__ == "__main__": | |
| tf.test.main() | |