File size: 11,260 Bytes
31b85e0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 | # coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
Copyright 2022 The Microsoft DeepSpeed Team
'''
import os
import time
import sys
import math
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),
os.path.pardir,os.path.pardir)))
from datetime import datetime
import numpy as np
import torch
from deepspeed.runtime.data_pipeline.data_sampling.data_analyzer \
import DataAnalyzer
from deepspeed.runtime.data_pipeline.data_sampling.indexed_dataset \
import MMapIndexedDataset
from megatron import get_args
from megatron import print_rank_0
from megatron.initialize import initialize_megatron
def get_tasks_args(parser):
"""Provide extra arguments required for data analyzing."""
group = parser.add_argument_group(title='data_analyzing')
group.add_argument('--analyzing-task', type=str, required=True,
default=None,
choices=['map',
'reduce'],
help='What type of analyzing task to perform.')
group.add_argument('--analyzing-data-type', type=str, required=True,
default=None,
choices=['BERT',
'GPT'],
help='What type of data.')
group.add_argument('--analyzing-metric', type=str, nargs='+', default=[],
help='What kinds of metrics to analyze.')
group.add_argument('--analyzing-num-workers', type=int, default=1,
help='Number of workers. Each worker could be a single CPU node.')
group.add_argument('--analyzing-worker-id', type=int, default=0,
help='Worker id of current node.')
group.add_argument('--analyzing-num-threads', type=int, default=1,
help='Number of threads for each worker.')
group.add_argument('--analyzing-num-threads-reduce', type=int, default=1,
help='Number of threads for each worker.')
group.add_argument('--analyzing-specific-threads', type=int, nargs='+', default=[],
help='Which specific threads to run. Helpful when there are specific thread failed in previous run.')
return parser
def train_valid_test_datasets_provider_gpt():
"""Build train, valid, and test datasets."""
args = get_args()
print_rank_0('> building train, validation, and test datasets '
'for GPT ...')
from megatron.data.gpt_dataset import build_train_valid_test_datasets
train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
data_prefix=args.data_path,
data_impl=args.data_impl,
splits_string=args.split,
train_valid_test_num_samples=[1,1,1], # Just dummy numbers since we assume args.train_data_exact_num_epochs will override them
seq_length=args.seq_length,
seed=args.seed,
skip_warmup=(not args.mmap_warmup))
print_rank_0("> finished creating GPT datasets ...")
return train_ds, valid_ds, test_ds
def train_valid_test_datasets_provider_bert():
"""Build train, valid, and test datasets."""
args = get_args()
print_rank_0('> building train, validation, and test datasets '
'for BERT ...')
from megatron.data.dataset_utils import build_train_valid_test_datasets
train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
data_prefix=args.data_path,
data_impl=args.data_impl,
splits_string=args.split,
train_valid_test_num_samples=[1,1,1], # Just dummy numbers since we assume args.train_data_exact_num_epochs will override them
max_seq_length=args.seq_length,
masked_lm_prob=args.mask_prob,
short_seq_prob=args.short_seq_prob,
seed=args.seed,
skip_warmup=(not args.mmap_warmup),
binary_head=args.bert_binary_head)
print_rank_0("> finished creating BERT datasets ...")
return train_ds, valid_ds, test_ds
def metric_seqlen(data):
metric = torch.count_nonzero(data['padding_mask'], dim=1)
return metric
def metric_total_vocab_freq(data):
args = get_args()
if args.analyzing_data_type == 'BERT':
frequency = torch.bincount(data['text'].view(-1),
minlength=args.padded_vocab_size+1,
weights=data['padding_mask'].view(-1))
elif args.analyzing_data_type == 'GPT':
frequency = torch.bincount(data['text'].view(-1),
minlength=args.padded_vocab_size+1)
return frequency
def metric_vocab_rarity(data):
args = get_args()
if args.analyzing_data_type == 'BERT':
rarity = torch.sum(data['padding_mask'] * \
args.total_vocab_freq[data['text']], dim=1).to(torch.long)
elif args.analyzing_data_type == 'GPT':
rarity = []
# Do one by one to avoid too high memory consumption
for row in range(data['text'].size()[0]):
rarity.append(int(torch.sum(args.total_vocab_freq[data['text'][row]]).item()))
rarity = torch.tensor(rarity, dtype=torch.long)
print(f"rarity min {min(rarity)}, max {max(rarity)}, len {len(rarity)}, avg {sum(rarity)/len(rarity)}")
return rarity
def metric_seqlen_vocab_rarity(data):
args = get_args()
metric = torch.count_nonzero(data['padding_mask'], dim=1).to(torch.long) * args.seqlen_coeff
metric += torch.sum(data['padding_mask'] * \
args.total_vocab_freq[data['text']], dim=1).to(torch.long)
print(f"metric min {min(metric)}, max {max(metric)}, len {len(metric)}, avg {sum(metric)/len(metric)}")
return metric
def get_metric_function(metric_name):
if metric_name == 'seqlen':
return metric_seqlen
if metric_name == 'total_vocab_freq':
return metric_total_vocab_freq
if metric_name == 'vocab_rarity':
return metric_vocab_rarity
if metric_name == 'seqlen_vocab_rarity':
return metric_seqlen_vocab_rarity
def get_metric_type(metric_name):
if metric_name == 'seqlen':
return 'single_value_per_sample'
if metric_name == 'total_vocab_freq':
return 'accumulate_value_over_samples'
if metric_name == 'vocab_rarity':
return 'single_value_per_sample'
if metric_name == 'seqlen_vocab_rarity':
return 'single_value_per_sample'
def run_map():
args = get_args()
if args.analyzing_data_type == 'BERT':
args.mask_prob = 0 # When analyzing data, we don't want any mask.
train_ds, _, _ = train_valid_test_datasets_provider_bert()
elif args.analyzing_data_type == 'GPT':
train_ds, _, _ = train_valid_test_datasets_provider_gpt()
assert 'seqlen' not in args.analyzing_metric, 'GPT data has fixed seqlen, thus unnecessary to analyze seqlen metric.'
assert 'seqlen_vocab_rarity' not in args.analyzing_metric, 'GPT data has fixed seqlen, thus unnecessary to analyze seqlen metric.'
if 'vocab_rarity' in args.analyzing_metric or 'seqlen_vocab_rarity' in args.analyzing_metric:
total_vocab_freq_fname = f"{args.save}/total_vocab_freq/total_vocab_freq_metric_value"
assert os.path.isfile(f"{total_vocab_freq_fname}.bin") and os.path.isfile(f"{total_vocab_freq_fname}.idx"), "To analyze vocab rarity, first need to analyze the total vocab freq."
total_vocab_freq = MMapIndexedDataset(total_vocab_freq_fname, skip_warmup=True)
total_vocab_freq = np.copy(total_vocab_freq[0])
total_vocab_freq[total_vocab_freq == 0] = 1 # Avoid log(0) error
total_vocab_freq = np.log(total_vocab_freq/sum(total_vocab_freq)) * -1
args.total_vocab_freq = torch.tensor(total_vocab_freq, dtype=torch.double)
if 'seqlen_vocab_rarity' in args.analyzing_metric:
# Use large coeff to make seqlen dominates vocab_rarity
max_possible_rarity = args.seq_length * torch.max(args.total_vocab_freq).item()
args.seqlen_coeff = 10 ** (math.ceil(math.log(max_possible_rarity, 10)) + 1)
print(f"Metric seqlen_vocab_rarity: using {args.seqlen_coeff} as coefficient for seqlen.")
metric_functions = [get_metric_function(x) for x in args.analyzing_metric]
metric_types = [get_metric_type(x) for x in args.analyzing_metric]
# For metric_dtypes we int64 by default since it could be hard to estimate
# the appropriate dtype before the mapping analysis. During reduce where
# we merge the analysis results, the DataAnalyzer will automatically choose
# the dtype of merged result file as the smallest one that meet the range
# requirement.
metric_dtypes = [np.int64 for x in args.analyzing_metric]
start = time.time()
data_analyzer = DataAnalyzer(train_ds,
num_workers=args.analyzing_num_workers,
worker_id=args.analyzing_worker_id,
num_threads=args.analyzing_num_threads,
specific_threads=args.analyzing_specific_threads,
batch_size=args.global_batch_size, metric_names=args.analyzing_metric,
metric_functions=metric_functions, metric_types=metric_types,
metric_dtypes=metric_dtypes, save_path=args.save)
data_analyzer.run_map()
duration = (time.time() - start) / 3600.0
print(f"map job finished in {duration} hr.")
def run_reduce():
args = get_args()
if args.analyzing_data_type == 'BERT':
args.mask_prob = 0 # When analyzing data, we don't want any mask.
train_ds, _, _ = train_valid_test_datasets_provider_bert()
elif args.analyzing_data_type == 'GPT':
train_ds, _, _ = train_valid_test_datasets_provider_gpt()
metric_functions = [get_metric_function(x) for x in args.analyzing_metric]
metric_types = [get_metric_type(x) for x in args.analyzing_metric]
metric_dtypes = [np.int64 for x in args.analyzing_metric]
start = time.time()
data_analyzer = DataAnalyzer(train_ds,
num_workers=args.analyzing_num_workers,
num_threads=args.analyzing_num_threads,
num_threads_reduce=args.analyzing_num_threads_reduce,
batch_size=args.global_batch_size, metric_names=args.analyzing_metric,
metric_functions=metric_functions, metric_types=metric_types,
metric_dtypes=metric_dtypes, save_path=args.save)
data_analyzer.run_reduce()
duration = (time.time() - start) / 3600.0
print(f"reduce job finished in {duration} hr.")
if __name__ == "__main__":
initialize_megatron(extra_args_provider=get_tasks_args, allow_no_cuda=True)
args = get_args()
if args.analyzing_task == 'map':
run_map()
elif args.analyzing_task == 'reduce':
run_reduce()
else:
raise NotImplementedError('Task {} is not implemented.'.format(
args.analyzing_task))
|