File size: 6,468 Bytes
31b85e0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | import zstandard
import sys
import time
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),
os.path.pardir,os.path.pardir,os.path.pardir)))
from megatron.data import indexed_dataset
def pile_download(download_url, file_path, i):
start = time.time()
zstd_file_path = f"{file_path}{i:02}.jsonl.zst"
download_path = f"{download_url}{i:02}.jsonl.zst"
if not os.path.exists(zstd_file_path):
os.system(f"wget -P {file_path} {download_path}")
print(f"Finished downloading chunk {i} in {time.time() - start} sec")
def pile_decompress(download_url, file_path, i):
zstd_file_path = f"{file_path}{i:02}.jsonl.zst"
output_path = f"{file_path}{i:02}.jsonl"
if not os.path.exists(output_path):
if not os.path.exists(zstd_file_path):
pile_download(download_url, file_path, i)
start = time.time()
with open(zstd_file_path, 'rb') as compressed:
decomp = zstandard.ZstdDecompressor()
with open(output_path, 'wb') as destination:
decomp.copy_stream(compressed, destination)
os.remove(zstd_file_path)
print(f"Finished decompressing chunk {i} in {time.time() - start} sec")
def pile_preprocess(download_url, file_path, vocab_file, num_workers, i):
json_file_path = f"{file_path}{i:02}.jsonl"
output_prefix = f"{file_path}pile_bert_train_{i:02}"
if not os.path.exists(f"{output_prefix}_text_sentence.idx"):
if not os.path.exists(json_file_path):
pile_decompress(download_url, file_path, i)
start = time.time()
cmd = f"python ../../tools/preprocess_data.py \
--input {json_file_path} \
--output-prefix {output_prefix} \
--vocab {vocab_file} \
--dataset-impl mmap \
--tokenizer-type BertWordPieceLowerCase \
--split-sentences \
--workers {num_workers} "
# It's possible to hit MemoryError during above cmd since the memory
# usage is proportional to num_workers. In this case we delete the
# incomplete output and user shall retry with smaller num_workers.
# Our experience show that chunk 6, 7, 9, 17, 18, 20, 21, 24, 27
# particularly have large memory usage.
if os.system(cmd) == 0: # Success
os.remove(json_file_path)
else:
print(f"Error: chunk {i} preprocessing got error, delete \
incomplete output. If MemoryError appeared, please retry \
with num_workers smaller than {num_workers}.")
if os.path.exists(f"{output_prefix}_text_sentence.idx"):
os.remove(f"{output_prefix}_text_sentence.idx")
if os.path.exists(f"{output_prefix}_text_sentence.bin"):
os.remove(f"{output_prefix}_text_sentence.bin")
print(f"Finished preprocessing chunk {i} in {time.time() - start} sec")
def pile_merge(file_path):
start = time.time()
num_chunks = 30
vocab_size = 30524
for i in range(num_chunks):
output_prefix = f"{file_path}pile_bert_train_{i:02}"
assert os.path.exists(f"{output_prefix}_text_sentence.idx")
assert os.path.exists(f"{output_prefix}_text_sentence.bin")
builder = indexed_dataset.make_builder(
f"{file_path}pile_bert_train_text_sentence.bin", impl="mmap",
vocab_size=vocab_size)
for i in range(num_chunks):
chunk_file = f"{file_path}pile_bert_train_{i:02}_text_sentence"
print(f"Merging file {chunk_file}")
builder.merge_file_(chunk_file)
print("Finalizing merged file ...")
builder.finalize(f"{file_path}pile_bert_train_text_sentence.idx")
print(f"Finished merging in {time.time() - start} sec")
# After verifying the merged data with real training, you may want to
# delete the data chunks.
# for i in range(num_chunks):
# output_prefix = f"{file_path}pile_bert_train_{i:02}"
# os.remove(f"{output_prefix}_text_sentence.idx")
# os.remove(f"{output_prefix}_text_sentence.bin")
if __name__ == '__main__':
# Path to download and store all the output files during the whole process.
# Estimated max storage usage would be around 1.6 TB (or 780GB if skip the
# final merge). Memory usage is proportional to the num_workers below (can
# be as high as O(300GB) if num_workers is around 20).
file_path = "/blob/data/the_pile_bert/"
# The raw Pile data has 30 compressed .zst chunks. To run on single
# machine for all chunks, run "python prepare_pile_data.py range 0 30".
# You can also split and run on multiple machines to speed up, since
# processing one chunk can take hours. The whole process only uses CPU.
if sys.argv[1] == "merge":
# "python prepare_pile_data.py merge" means merge all 30 processed data
# chunks. Run it only after all 30 chunks are preprocessed. The memory
# usage during merge is about 600GB. If you don't have enough memory,
# one solution is to directly use the 30 data chunks as multiple
# datasets. See '--data-path' in
# github.com/microsoft/Megatron-DeepSpeed/blob/main/megatron/arguments.py
pile_merge(file_path)
else:
if sys.argv[1] == "range":
# "python prepare_pile_data.py range 0 30" means process chunk 0-29
selected_chunk = range(int(sys.argv[2]), int(sys.argv[3]))
else:
# "python prepare_pile_data.py 2 5 8" means process chunk 2, 5, 8
selected_chunk = [int(x) for x in sys.argv[1:]]
print("selected_chunk: ", selected_chunk)
# Number of process. Adjust based on your CPU/Memory.
num_workers = 20
# Where the raw Pile data can be downloaded. The url may change in
# future. Contact EleutherAI (https://github.com/EleutherAI/the-pile)
# if this url does not work.
download_url = "https://the-eye.eu/public/AI/pile/train/"
vocab_file = "bert-large-uncased-vocab.txt"
vocab_url = "https://s3.amazonaws.com/models.huggingface.co/bert/bert-large-uncased-vocab.txt"
if not os.path.exists(vocab_file):
os.system(f"wget {vocab_url}")
os.makedirs(file_path, exist_ok=True)
for i in selected_chunk:
pile_preprocess(download_url, file_path, vocab_file, num_workers, i)
|