applied-ai-018's picture
Add files using upload-large-folder tool
31b85e0 verified
import zstandard
import sys
import time
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),
os.path.pardir,os.path.pardir,os.path.pardir)))
from megatron.data import indexed_dataset
def pile_download(download_url, file_path, i):
start = time.time()
zstd_file_path = f"{file_path}{i:02}.jsonl.zst"
download_path = f"{download_url}{i:02}.jsonl.zst"
if not os.path.exists(zstd_file_path):
os.system(f"wget -P {file_path} {download_path}")
print(f"Finished downloading chunk {i} in {time.time() - start} sec")
def pile_decompress(download_url, file_path, i):
zstd_file_path = f"{file_path}{i:02}.jsonl.zst"
output_path = f"{file_path}{i:02}.jsonl"
if not os.path.exists(output_path):
if not os.path.exists(zstd_file_path):
pile_download(download_url, file_path, i)
start = time.time()
with open(zstd_file_path, 'rb') as compressed:
decomp = zstandard.ZstdDecompressor()
with open(output_path, 'wb') as destination:
decomp.copy_stream(compressed, destination)
os.remove(zstd_file_path)
print(f"Finished decompressing chunk {i} in {time.time() - start} sec")
def pile_preprocess(download_url, file_path, vocab_file, num_workers, i):
json_file_path = f"{file_path}{i:02}.jsonl"
output_prefix = f"{file_path}pile_bert_train_{i:02}"
if not os.path.exists(f"{output_prefix}_text_sentence.idx"):
if not os.path.exists(json_file_path):
pile_decompress(download_url, file_path, i)
start = time.time()
cmd = f"python ../../tools/preprocess_data.py \
--input {json_file_path} \
--output-prefix {output_prefix} \
--vocab {vocab_file} \
--dataset-impl mmap \
--tokenizer-type BertWordPieceLowerCase \
--split-sentences \
--workers {num_workers} "
# It's possible to hit MemoryError during above cmd since the memory
# usage is proportional to num_workers. In this case we delete the
# incomplete output and user shall retry with smaller num_workers.
# Our experience show that chunk 6, 7, 9, 17, 18, 20, 21, 24, 27
# particularly have large memory usage.
if os.system(cmd) == 0: # Success
os.remove(json_file_path)
else:
print(f"Error: chunk {i} preprocessing got error, delete \
incomplete output. If MemoryError appeared, please retry \
with num_workers smaller than {num_workers}.")
if os.path.exists(f"{output_prefix}_text_sentence.idx"):
os.remove(f"{output_prefix}_text_sentence.idx")
if os.path.exists(f"{output_prefix}_text_sentence.bin"):
os.remove(f"{output_prefix}_text_sentence.bin")
print(f"Finished preprocessing chunk {i} in {time.time() - start} sec")
def pile_merge(file_path):
start = time.time()
num_chunks = 30
vocab_size = 30524
for i in range(num_chunks):
output_prefix = f"{file_path}pile_bert_train_{i:02}"
assert os.path.exists(f"{output_prefix}_text_sentence.idx")
assert os.path.exists(f"{output_prefix}_text_sentence.bin")
builder = indexed_dataset.make_builder(
f"{file_path}pile_bert_train_text_sentence.bin", impl="mmap",
vocab_size=vocab_size)
for i in range(num_chunks):
chunk_file = f"{file_path}pile_bert_train_{i:02}_text_sentence"
print(f"Merging file {chunk_file}")
builder.merge_file_(chunk_file)
print("Finalizing merged file ...")
builder.finalize(f"{file_path}pile_bert_train_text_sentence.idx")
print(f"Finished merging in {time.time() - start} sec")
# After verifying the merged data with real training, you may want to
# delete the data chunks.
# for i in range(num_chunks):
# output_prefix = f"{file_path}pile_bert_train_{i:02}"
# os.remove(f"{output_prefix}_text_sentence.idx")
# os.remove(f"{output_prefix}_text_sentence.bin")
if __name__ == '__main__':
# Path to download and store all the output files during the whole process.
# Estimated max storage usage would be around 1.6 TB (or 780GB if skip the
# final merge). Memory usage is proportional to the num_workers below (can
# be as high as O(300GB) if num_workers is around 20).
file_path = "/blob/data/the_pile_bert/"
# The raw Pile data has 30 compressed .zst chunks. To run on single
# machine for all chunks, run "python prepare_pile_data.py range 0 30".
# You can also split and run on multiple machines to speed up, since
# processing one chunk can take hours. The whole process only uses CPU.
if sys.argv[1] == "merge":
# "python prepare_pile_data.py merge" means merge all 30 processed data
# chunks. Run it only after all 30 chunks are preprocessed. The memory
# usage during merge is about 600GB. If you don't have enough memory,
# one solution is to directly use the 30 data chunks as multiple
# datasets. See '--data-path' in
# github.com/microsoft/Megatron-DeepSpeed/blob/main/megatron/arguments.py
pile_merge(file_path)
else:
if sys.argv[1] == "range":
# "python prepare_pile_data.py range 0 30" means process chunk 0-29
selected_chunk = range(int(sys.argv[2]), int(sys.argv[3]))
else:
# "python prepare_pile_data.py 2 5 8" means process chunk 2, 5, 8
selected_chunk = [int(x) for x in sys.argv[1:]]
print("selected_chunk: ", selected_chunk)
# Number of process. Adjust based on your CPU/Memory.
num_workers = 20
# Where the raw Pile data can be downloaded. The url may change in
# future. Contact EleutherAI (https://github.com/EleutherAI/the-pile)
# if this url does not work.
download_url = "https://the-eye.eu/public/AI/pile/train/"
vocab_file = "bert-large-uncased-vocab.txt"
vocab_url = "https://s3.amazonaws.com/models.huggingface.co/bert/bert-large-uncased-vocab.txt"
if not os.path.exists(vocab_file):
os.system(f"wget {vocab_url}")
os.makedirs(file_path, exist_ok=True)
for i in selected_chunk:
pile_preprocess(download_url, file_path, vocab_file, num_workers, i)