peacock-data-public-evaluation / Megatron-DeepSpeed /examples_deepspeed /data_efficiency /analyze_data.py
| # coding=utf-8 | |
| # Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| ''' | |
| Copyright 2022 The Microsoft DeepSpeed Team | |
| ''' | |
| import os | |
| import time | |
| import sys | |
| import math | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), | |
| os.path.pardir,os.path.pardir))) | |
| from datetime import datetime | |
| import numpy as np | |
| import torch | |
| from deepspeed.runtime.data_pipeline.data_sampling.data_analyzer \ | |
| import DataAnalyzer | |
| from deepspeed.runtime.data_pipeline.data_sampling.indexed_dataset \ | |
| import MMapIndexedDataset | |
| from megatron import get_args | |
| from megatron import print_rank_0 | |
| from megatron.initialize import initialize_megatron | |
| def get_tasks_args(parser): | |
| """Provide extra arguments required for data analyzing.""" | |
| group = parser.add_argument_group(title='data_analyzing') | |
| group.add_argument('--analyzing-task', type=str, required=True, | |
| default=None, | |
| choices=['map', | |
| 'reduce'], | |
| help='What type of analyzing task to perform.') | |
| group.add_argument('--analyzing-data-type', type=str, required=True, | |
| default=None, | |
| choices=['BERT', | |
| 'GPT'], | |
| help='What type of data.') | |
| group.add_argument('--analyzing-metric', type=str, nargs='+', default=[], | |
| help='What kinds of metrics to analyze.') | |
| group.add_argument('--analyzing-num-workers', type=int, default=1, | |
| help='Number of workers. Each worker could be a single CPU node.') | |
| group.add_argument('--analyzing-worker-id', type=int, default=0, | |
| help='Worker id of current node.') | |
| group.add_argument('--analyzing-num-threads', type=int, default=1, | |
| help='Number of threads for each worker.') | |
| group.add_argument('--analyzing-num-threads-reduce', type=int, default=1, | |
| help='Number of threads for each worker.') | |
| group.add_argument('--analyzing-specific-threads', type=int, nargs='+', default=[], | |
| help='Which specific threads to run. Helpful when there are specific thread failed in previous run.') | |
| return parser | |
| def train_valid_test_datasets_provider_gpt(): | |
| """Build train, valid, and test datasets.""" | |
| args = get_args() | |
| print_rank_0('> building train, validation, and test datasets ' | |
| 'for GPT ...') | |
| from megatron.data.gpt_dataset import build_train_valid_test_datasets | |
| train_ds, valid_ds, test_ds = build_train_valid_test_datasets( | |
| data_prefix=args.data_path, | |
| data_impl=args.data_impl, | |
| splits_string=args.split, | |
| train_valid_test_num_samples=[1,1,1], # Just dummy numbers since we assume args.train_data_exact_num_epochs will override them | |
| seq_length=args.seq_length, | |
| seed=args.seed, | |
| skip_warmup=(not args.mmap_warmup)) | |
| print_rank_0("> finished creating GPT datasets ...") | |
| return train_ds, valid_ds, test_ds | |
| def train_valid_test_datasets_provider_bert(): | |
| """Build train, valid, and test datasets.""" | |
| args = get_args() | |
| print_rank_0('> building train, validation, and test datasets ' | |
| 'for BERT ...') | |
| from megatron.data.dataset_utils import build_train_valid_test_datasets | |
| train_ds, valid_ds, test_ds = build_train_valid_test_datasets( | |
| data_prefix=args.data_path, | |
| data_impl=args.data_impl, | |
| splits_string=args.split, | |
| train_valid_test_num_samples=[1,1,1], # Just dummy numbers since we assume args.train_data_exact_num_epochs will override them | |
| max_seq_length=args.seq_length, | |
| masked_lm_prob=args.mask_prob, | |
| short_seq_prob=args.short_seq_prob, | |
| seed=args.seed, | |
| skip_warmup=(not args.mmap_warmup), | |
| binary_head=args.bert_binary_head) | |
| print_rank_0("> finished creating BERT datasets ...") | |
| return train_ds, valid_ds, test_ds | |
| def metric_seqlen(data): | |
| metric = torch.count_nonzero(data['padding_mask'], dim=1) | |
| return metric | |
| def metric_total_vocab_freq(data): | |
| args = get_args() | |
| if args.analyzing_data_type == 'BERT': | |
| frequency = torch.bincount(data['text'].view(-1), | |
| minlength=args.padded_vocab_size+1, | |
| weights=data['padding_mask'].view(-1)) | |
| elif args.analyzing_data_type == 'GPT': | |
| frequency = torch.bincount(data['text'].view(-1), | |
| minlength=args.padded_vocab_size+1) | |
| return frequency | |
| def metric_vocab_rarity(data): | |
| args = get_args() | |
| if args.analyzing_data_type == 'BERT': | |
| rarity = torch.sum(data['padding_mask'] * \ | |
| args.total_vocab_freq[data['text']], dim=1).to(torch.long) | |
| elif args.analyzing_data_type == 'GPT': | |
| rarity = [] | |
| # Do one by one to avoid too high memory consumption | |
| for row in range(data['text'].size()[0]): | |
| rarity.append(int(torch.sum(args.total_vocab_freq[data['text'][row]]).item())) | |
| rarity = torch.tensor(rarity, dtype=torch.long) | |
| print(f"rarity min {min(rarity)}, max {max(rarity)}, len {len(rarity)}, avg {sum(rarity)/len(rarity)}") | |
| return rarity | |
| def metric_seqlen_vocab_rarity(data): | |
| args = get_args() | |
| metric = torch.count_nonzero(data['padding_mask'], dim=1).to(torch.long) * args.seqlen_coeff | |
| metric += torch.sum(data['padding_mask'] * \ | |
| args.total_vocab_freq[data['text']], dim=1).to(torch.long) | |
| print(f"metric min {min(metric)}, max {max(metric)}, len {len(metric)}, avg {sum(metric)/len(metric)}") | |
| return metric | |
| def get_metric_function(metric_name): | |
| if metric_name == 'seqlen': | |
| return metric_seqlen | |
| if metric_name == 'total_vocab_freq': | |
| return metric_total_vocab_freq | |
| if metric_name == 'vocab_rarity': | |
| return metric_vocab_rarity | |
| if metric_name == 'seqlen_vocab_rarity': | |
| return metric_seqlen_vocab_rarity | |
| def get_metric_type(metric_name): | |
| if metric_name == 'seqlen': | |
| return 'single_value_per_sample' | |
| if metric_name == 'total_vocab_freq': | |
| return 'accumulate_value_over_samples' | |
| if metric_name == 'vocab_rarity': | |
| return 'single_value_per_sample' | |
| if metric_name == 'seqlen_vocab_rarity': | |
| return 'single_value_per_sample' | |
| def run_map(): | |
| args = get_args() | |
| if args.analyzing_data_type == 'BERT': | |
| args.mask_prob = 0 # When analyzing data, we don't want any mask. | |
| train_ds, _, _ = train_valid_test_datasets_provider_bert() | |
| elif args.analyzing_data_type == 'GPT': | |
| train_ds, _, _ = train_valid_test_datasets_provider_gpt() | |
| assert 'seqlen' not in args.analyzing_metric, 'GPT data has fixed seqlen, thus unnecessary to analyze seqlen metric.' | |
| assert 'seqlen_vocab_rarity' not in args.analyzing_metric, 'GPT data has fixed seqlen, thus unnecessary to analyze seqlen metric.' | |
| if 'vocab_rarity' in args.analyzing_metric or 'seqlen_vocab_rarity' in args.analyzing_metric: | |
| total_vocab_freq_fname = f"{args.save}/total_vocab_freq/total_vocab_freq_metric_value" | |
| assert os.path.isfile(f"{total_vocab_freq_fname}.bin") and os.path.isfile(f"{total_vocab_freq_fname}.idx"), "To analyze vocab rarity, first need to analyze the total vocab freq." | |
| total_vocab_freq = MMapIndexedDataset(total_vocab_freq_fname, skip_warmup=True) | |
| total_vocab_freq = np.copy(total_vocab_freq[0]) | |
| total_vocab_freq[total_vocab_freq == 0] = 1 # Avoid log(0) error | |
| total_vocab_freq = np.log(total_vocab_freq/sum(total_vocab_freq)) * -1 | |
| args.total_vocab_freq = torch.tensor(total_vocab_freq, dtype=torch.double) | |
| if 'seqlen_vocab_rarity' in args.analyzing_metric: | |
| # Use large coeff to make seqlen dominates vocab_rarity | |
| max_possible_rarity = args.seq_length * torch.max(args.total_vocab_freq).item() | |
| args.seqlen_coeff = 10 ** (math.ceil(math.log(max_possible_rarity, 10)) + 1) | |
| print(f"Metric seqlen_vocab_rarity: using {args.seqlen_coeff} as coefficient for seqlen.") | |
| metric_functions = [get_metric_function(x) for x in args.analyzing_metric] | |
| metric_types = [get_metric_type(x) for x in args.analyzing_metric] | |
| # For metric_dtypes we int64 by default since it could be hard to estimate | |
| # the appropriate dtype before the mapping analysis. During reduce where | |
| # we merge the analysis results, the DataAnalyzer will automatically choose | |
| # the dtype of merged result file as the smallest one that meet the range | |
| # requirement. | |
| metric_dtypes = [np.int64 for x in args.analyzing_metric] | |
| start = time.time() | |
| data_analyzer = DataAnalyzer(train_ds, | |
| num_workers=args.analyzing_num_workers, | |
| worker_id=args.analyzing_worker_id, | |
| num_threads=args.analyzing_num_threads, | |
| specific_threads=args.analyzing_specific_threads, | |
| batch_size=args.global_batch_size, metric_names=args.analyzing_metric, | |
| metric_functions=metric_functions, metric_types=metric_types, | |
| metric_dtypes=metric_dtypes, save_path=args.save) | |
| data_analyzer.run_map() | |
| duration = (time.time() - start) / 3600.0 | |
| print(f"map job finished in {duration} hr.") | |
| def run_reduce(): | |
| args = get_args() | |
| if args.analyzing_data_type == 'BERT': | |
| args.mask_prob = 0 # When analyzing data, we don't want any mask. | |
| train_ds, _, _ = train_valid_test_datasets_provider_bert() | |
| elif args.analyzing_data_type == 'GPT': | |
| train_ds, _, _ = train_valid_test_datasets_provider_gpt() | |
| metric_functions = [get_metric_function(x) for x in args.analyzing_metric] | |
| metric_types = [get_metric_type(x) for x in args.analyzing_metric] | |
| metric_dtypes = [np.int64 for x in args.analyzing_metric] | |
| start = time.time() | |
| data_analyzer = DataAnalyzer(train_ds, | |
| num_workers=args.analyzing_num_workers, | |
| num_threads=args.analyzing_num_threads, | |
| num_threads_reduce=args.analyzing_num_threads_reduce, | |
| batch_size=args.global_batch_size, metric_names=args.analyzing_metric, | |
| metric_functions=metric_functions, metric_types=metric_types, | |
| metric_dtypes=metric_dtypes, save_path=args.save) | |
| data_analyzer.run_reduce() | |
| duration = (time.time() - start) / 3600.0 | |
| print(f"reduce job finished in {duration} hr.") | |
| if __name__ == "__main__": | |
| initialize_megatron(extra_args_provider=get_tasks_args, allow_no_cuda=True) | |
| args = get_args() | |
| if args.analyzing_task == 'map': | |
| run_map() | |
| elif args.analyzing_task == 'reduce': | |
| run_reduce() | |
| else: | |
| raise NotImplementedError('Task {} is not implemented.'.format( | |
| args.analyzing_task)) | |