craffel/moto_checkpoints / script_1 /code /download_prepare_hf_data.py
craffel's picture
download
raw
15.6 kB
# Copyright (c) Meta Platforms, Inc. and affiliates.
import argparse
import gzip
import math
import os
import subprocess
import time
import boto3
import requests
from botocore.exceptions import ClientError
from huggingface_hub import HfApi, snapshot_download
num_proc = 16
s3 = boto3.client("s3")
bucket_name = "softwareheritage"
os.environ["TMPDIR"] = "/scratch/gsa/tmp"
def download_contents(blob_id):
key = f"content/{blob_id}"
try:
obj = s3.get_object(Bucket=bucket_name, Key=key)
with gzip.GzipFile(fileobj=obj["Body"]) as fin:
content = fin.read().decode("utf-8", errors="ignore")
return {"text": content, "download_success": True}
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
print(f"File not found: {key}")
return {"text": "", "download_success": False}
else:
raise
def run_command(command):
print(f"Running: {command}")
subprocess.run(command, shell=True, check=True)
def download_dataset(repo_id, local_dir, allow_patterns):
print(f"Downloading dataset from {repo_id}...")
max_retries = 5
retry_delay = 10 # seconds
for attempt in range(max_retries):
try:
snapshot_download(
repo_id,
repo_type="dataset",
local_dir=local_dir,
allow_patterns=allow_patterns,
resume_download=True,
max_workers=16, # Don't hesitate to increase this number to lower the download time
)
break
except requests.exceptions.ReadTimeout:
if attempt < max_retries - 1:
print(f"Timeout occurred. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
raise
print(f"Dataset downloaded to {local_dir}")
def extract_fineweb_2_hq_sample(self, data: dict, path: str, id_in_file: int | str):
return {
"text": data["text"],
"id": data["id"],
"metadata": {
"date": data.get("date", None),
"dump": data.get("dump", None),
"file_path": data.get("file_path", None),
"url": data.get("url", None),
"language": data.get("language", None),
"language_score": data.get("language_score", None),
"top_langs": data.get("top_langs", None),
"quality_score": data.get("quality_score", None),
}
}
def parquet_to_jsonl(
dataset, work_dir, src_dir, tgt_dir, ntasks=64, preserve_subsets=False,
):
from datatrove.executor import LocalPipelineExecutor
from datatrove.pipeline.readers import ParquetReader
from datatrove.pipeline.writers import JsonlWriter
print(f"Converting parquet files in {src_dir} to jsonl files in {tgt_dir}...")
if preserve_subsets:
subsets = os.listdir(f"{src_dir}")
pipe = []
for subset in subsets:
# skip the non data directories
if not os.path.isdir(f"{src_dir}/{subset}"):
continue
if subset in [".cache", ".git", ".github", "datatrove", "terashuf"]:
continue
print(f"Processing subset: {subset}")
# pipe.extend(
pipe = [
ParquetReader(
f"{src_dir}/{subset}/",
file_progress=True,
doc_progress=True,
glob_pattern="**/*.parquet",
adapter=extract_fineweb_2_hq_sample if dataset=="fineweb_2_hq" else None,
),
JsonlWriter(
tgt_dir,
output_filename=f"{dataset}.{subset}" + ".chunk.${rank}.jsonl",
compression=None,
),
]
pipeline_exec = LocalPipelineExecutor(
pipeline=pipe,
tasks=ntasks,
logging_dir=os.path.join(work_dir, "datatrove", subset),
# skip_completed=False,
)
pipeline_exec.run()
return
else:
pipe = [
ParquetReader(
src_dir,
file_progress=True,
doc_progress=True,
glob_pattern="**/*.parquet",
),
JsonlWriter(
tgt_dir,
output_filename=dataset + ".chunk.${rank}.jsonl",
compression=None,
),
]
pipeline_exec = LocalPipelineExecutor(
pipeline=pipe, tasks=ntasks, logging_dir=os.path.join(work_dir, "datatrove")
)
pipeline_exec.run()
def setup_terashuf(work_dir):
terashuf_dir = os.path.join(work_dir, "terashuf")
terashuf_executable = os.path.join(terashuf_dir, "terashuf")
if os.path.exists(terashuf_executable):
print("terashuf executable already exists. Skipping setup.")
return terashuf_dir
print("Setting up terashuf...")
# run_command(f"git clone https://github.com/alexandres/terashuf {terashuf_dir}")
run_command(f"make -C {terashuf_dir}")
return terashuf_dir
def upload_dataset_to_hf(dataset, out_dir, hf_path, preserve_subsets):
api = HfApi()
if preserve_subsets:
for subset in os.listdir(f"{out_dir}"):
# skip the non data directories
if not os.path.isdir(f"{out_dir}/{subset}"):
continue
if subset in [".cache", ".git", ".github", "datatrove", "terashuf"]:
continue
api.upload_folder(
repo_id=hf_path,
folder_path=f"{out_dir}/{subset}",
repo_type="dataset",
path_in_repo=f"{subset}",
allow_patterns=["*.jsonl"],
)
else:
api.upload_folder(
repo_id=hf_path,
folder_path=f"{out_dir}",
repo_type="dataset",
allow_patterns=["*.jsonl"],
)
def main(dataset, memory, data_dir, seed=42, nchunks=32, preserve_subsets=False, upload_to_hf=False, hf_path=None, max_file_size=None, skip_download=False):
# Configuration
repo_id = {
"fineweb_edu": "HuggingFaceFW/fineweb-edu",
"stack_edu": "common-pile/stackv2_edu_filtered",
"fineweb_2": "HuggingFaceFW/fineweb-2",
"fineweb_2_hq": "epfml/FineWeb2-HQ",
"fineweb_edu_10bt": "HuggingFaceFW/fineweb-edu",
"fineweb_edu_100bt": "HuggingFaceFW/fineweb-edu",
"dclm_baseline_1.0": "mlfoundations/dclm-baseline-1.0",
"dclm_baseline_1.0_10prct": "mlfoundations/dclm-baseline-1.0",
}[dataset]
src_dir = f"{data_dir}/{dataset}"
out_dir = f"{src_dir}_shuffled"
os.makedirs(out_dir, exist_ok=True)
work_dir = src_dir # Directory of this Python file
if dataset not in ["fineweb_2_hq", "stack_edu"]:
src_dir = f"{src_dir}/data"
prefix = f"{dataset}.chunk."
orig_extension = {
"fineweb_edu": ".jsonl",
"stack_edu": ".json.gz",
"fineweb_2": ".jsonl",
"fineweb_2_hq": ".jsonl",
"fineweb_edu_10bt": ".jsonl",
"fineweb_edu_100bt": ".jsonl",
"dclm_baseline_1.0": ".jsonl.zst",
"dclm_baseline_1.0_10prct": ".jsonl.zst",
}[dataset]
cat_command = {
"fineweb_edu": "cat {}",
"fineweb_2": "cat {}",
"fineweb_2_hq": "cat {}",
"fineweb_edu_10bt": "cat {}",
"fineweb_edu_100bt": "cat {}",
"stack_edu": "zcat {} && echo",
"dclm_baseline_1.0": "zstdcat {} && echo",
"dclm_baseline_1.0_10prct": "zstdcat {} && echo",
}[dataset]
allow_patterns = {
"fineweb_edu": None,
"fineweb_edu_10bt": "sample/10BT/*",
"fineweb_edu_100bt": "sample/100BT/*",
"fineweb_2": [
"data/arb_Arab/*/000_0000[01].parquet",
"data/ben_Beng/*/000_00000.parquet",
"data/eng_Latn/*/000_0000[01].parquet",
"data/deu_Latn/*/000_0000[01].parquet",
"data/fra_Latn/*/000_0000[01].parquet",
"data/ita_Latn/*/000_0000[01].parquet",
"data/hin_Deva/*/000_00000.parquet",
"data/jpn_Jpan/*/000_0000[01].parquet",
"data/kor_Hang/*/000_0000[01].parquet",
"data/rus_Cyrl/*/000_0000[012].parquet",
"data/spa_Latn/*/000_0000[01].parquet",
"data/swh_Latn/*/000_00000.parquet",
"data/tel_Telu/*/000_00000.parquet",
"data/tha_Thai/*/000_0000[01].parquet",
"data/tur_Latn/*/000_0000[01].parquet",
"data/cmn_Hani/*/000_0000[012].parquet",
],
"fineweb_2_hq": [
"ita_Latn/*",
"tur_Latn/*",
"fas_Arab/*",
"cmn_Hani/*",
"rus_Cyrl/*",
"deu_Latn/*",
"spa_Latn/*",
"jpn_Jpan/*",
"fra_Latn/*",
"por_Latn/*",
"pol_Latn/*",
"nld_Latn/*",
"ind_Latn/*",
"ces_Latn/*",
"arb_Arab/*",
"hun_Latn/*",
"swe_Latn/*",
"ell_Grek/*",
"dan_Latn/*",
"vie_Latn/*",
],
"stack_edu": "*.json.gz",
"dclm_baseline_1.0": "*.jsonl.zst",
"dclm_baseline_1.0_10prct": "global-shard_01_of_10/*.jsonl.zst",
}[dataset]
suffix = ".jsonl"
k_validation = 10000 # Number of lines to take from each chunk for validation
# Setup terashuf
terashuf_dir = setup_terashuf(work_dir)
print(f"Terashuf directory: {terashuf_dir}")
# Download dataset
if not skip_download:
download_dataset(repo_id, src_dir, allow_patterns)
else:
orig_extension = ".jsonl"
print("Skipping download of dataset, make sure the dataset or jsonl files are present in the data directory")
if "fineweb" in dataset:
parquet_to_jsonl(
dataset, work_dir, src_dir, src_dir, preserve_subsets=preserve_subsets
)
# Set up environment variables
os.environ["MEMORY"] = f"{memory}"
os.environ["SEED"] = f"{seed}"
# Run the original shuffling and splitting command
terashuf_executable = os.path.join(terashuf_dir, "terashuf")
print(orig_extension, src_dir, cat_command, terashuf_executable)
if preserve_subsets:
for subset in os.listdir(f"{src_dir}"):
# skip the non data directories
if not os.path.isdir(f"{src_dir}/{subset}"):
continue
if subset in [".cache", ".git", ".github", "datatrove", "terashuf"]:
continue
prefix = f"{dataset}.{subset}.chunk."
# Create validation set and remove lines from chunks
validation_file = f"{out_dir}/{subset}/{dataset}.{subset}.val{suffix}"
run_command(f"mkdir -p {out_dir}/{subset} ")
run_command(f"ulimit -n 100000")
run_command(
f"find {src_dir} -type f -name '*{subset}*{orig_extension}' -print0 | xargs -0 -I {{}} sh -c '{cat_command}' | {terashuf_executable} | "
f" split -n l/{nchunks} -d --suffix-length 2 --additional-suffix {suffix} - {out_dir}/{subset}/{prefix}"
"; trap 'echo \"Caught signal 13, exiting with code 1\"; exit 1' PIPE;"
)
for i in range(nchunks):
chunk_file = f"{out_dir}/{subset}/{prefix}{i:02d}{suffix}"
run_command(f"head -n {k_validation} {chunk_file} >> {validation_file}")
run_command(f"sed -i '1,{k_validation}d' {chunk_file}")
if max_file_size:
current_size = int(os.path.getsize(chunk_file))
if current_size > int(max_file_size):
print(f"Truncating {chunk_file} from {current_size} to {max_file_size}")
run_command(f"truncate -s {max_file_size} {chunk_file}")
# remove the last line
run_command(f"perl -i -ne 'print unless eof' {chunk_file}")
else:
print(f"Skipping truncate: {chunk_file} is already within limits ({current_size})")
else:
run_command(
f"ulimit -n 100000 && "
f"find {src_dir} -type f -name '*{orig_extension}' -print0 | xargs -0 -I {{}} sh -c '{cat_command}' | {terashuf_executable} | "
f" split -n l/{nchunks} -d --suffix-length 2 --additional-suffix {suffix} - {out_dir}/{prefix}"
"; trap 'echo \"Caught signal 13, exiting with code 1\"; exit 1' PIPE;"
# "; trap 'echo \"Caught signal 13, exiting with code 1\"; exit 1' SIGPIPE;"
)
# Create validation set and remove lines from chunks
validation_file = f"{out_dir}/{dataset}.val{suffix}"
for i in range(nchunks):
chunk_file = f"{out_dir}/{prefix}{i:02d}{suffix}"
run_command(f"head -n {k_validation} {chunk_file} >> {validation_file}")
run_command(f"sed -i '1,{k_validation}d' {chunk_file}")
if max_file_size:
current_size = os.path.getsize(chunk_file)
if current_size > int(max_file_size):
print(f"Truncating {chunk_file} from {current_size} to {max_file_size}")
run_command(f"truncate -s {max_file_size} {chunk_file}")
# remove the last line
run_command(f"perl -i -ne 'print unless eof' {chunk_file}")
else:
print(f"Skipping truncate: {chunk_file} is already within limits ({current_size})")
print("All tasks completed successfully!")
if upload_to_hf:
print("Uploading to Hugging Face...")
upload_dataset_to_hf(dataset, out_dir, hf_path, preserve_subsets)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("dataset", type=str)
parser.add_argument("memory", type=float, default=8)
parser.add_argument("--data_dir", type=str, default="data")
parser.add_argument("--seed", type=int, default=42)
parser.add_argument("--nchunks", type=int, default=32)
parser.add_argument("--max_file_size", type=str, default=None, help="If specified, the dataset will be truncated to match this file size, e.g. 45G")
parser.add_argument(
"--preserve_subsets",
action="store_true",
default=False,
help="If true, the subsets will be preserved in the output directory",
)
parser.add_argument("--upload_to_hf", action="store_true", default=False)
parser.add_argument("--hf_path", type=str, default=None)
parser.add_argument("--skip_download", action="store_true", default=False)
args = parser.parse_args()
if args.upload_to_hf:
if args.hf_path is None:
raise ValueError("hf_path is required when upload_to_hf is true")
try:
api = HfApi()
if not api.repo_exists(args.hf_path):
api.create_repo(args.hf_path, repo_type="dataset", exist_ok=True)
else:
print(f"Repository {args.hf_path} already exists, skipping creation")
except Exception as e:
print(f"Error creating repository: {e}")
print("Please ensure you have the correct permissions to create the repository.")
raise
main(
args.dataset,
args.memory,
args.data_dir,
args.seed,
args.nchunks,
args.preserve_subsets,
args.upload_to_hf,
args.hf_path,
args.max_file_size,
args.skip_download,
)

Xet Storage Details

Size:
15.6 kB
·
Xet hash:
46e6519de374d73758169ce656609f3891904c1793a5e9541ff4b8207377c5ce

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.