CogNet-1B / hf_scripts /prepare_data.py
thefinalboss's picture
Upload hf_scripts/prepare_data.py with huggingface_hub
388fa4d verified
Raw
History Blame Contribute Delete
15.2 kB
"""
CogNet Data Preparation Script
===============================
Prepares and tokenizes multiple datasets for training:
- Wikipedia (multilingual)
- Code datasets (The Stack, CodeParrot)
- Books (BookCorpus)
- Common Crawl subsets
- Custom local files
Outputs pre-tokenized .pt files for maximum training throughput.
Usage:
python prepare_data.py --output-dir ./data_cache --vocab-size 32000
python prepare_data.py --output-dir ./data_cache --datasets wiki code books
"""
import argparse
import json
import os
import sys
import time
from pathlib import Path
from typing import Dict, List, Optional
# ─── Dataset Configs ─────────────────────────────────────────────────────────
DATASET_CONFIGS = {
'wiki': {
'path': 'wikimedia/wikipedia',
'subset': '20231101.en',
'split': 'train',
'text_field': 'text',
'max_docs': None,
'max_chars': 5_000_000_000, # 5B chars
'description': 'Wikipedia English',
},
'wiki_fr': {
'path': 'wikimedia/wikipedia',
'subset': '20231101.fr',
'split': 'train',
'text_field': 'text',
'max_docs': None,
'max_chars': 2_000_000_000,
'description': 'Wikipedia French',
},
'code': {
'path': 'bigcode/the-stack',
'subset': 'data',
'split': 'train',
'text_field': 'content',
'max_docs': None,
'max_chars': 5_000_000_000,
'description': 'The Stack (multi-language code)',
'languages': ['python', 'javascript', 'java', 'cpp', 'c', 'rust', 'go', 'typescript'],
},
'code_python': {
'path': 'bigcode/the-stack',
'subset': 'data',
'split': 'train',
'text_field': 'content',
'max_docs': None,
'max_chars': 3_000_000_000,
'description': 'Python code from The Stack',
'languages': ['python'],
},
'books': {
'path': 'bookcorpus/bookcorpus',
'subset': None,
'split': 'train',
'text_field': 'text',
'max_docs': None,
'max_chars': 3_000_000_000,
'description': 'BookCorpus',
},
'c4': {
'path': 'allenai/c4',
'subset': 'en',
'split': 'train',
'text_field': 'text',
'max_docs': None,
'max_chars': 10_000_000_000,
'description': 'C4 (Colossal Clean Crawled Corpus)',
},
'openwebtext': {
'path': 'openwebtext',
'subset': None,
'split': 'train',
'text_field': 'text',
'max_docs': None,
'max_chars': 5_000_000_000,
'description': 'OpenWebText',
},
'alpaca': {
'path': 'tatsu-lab/alpaca',
'subset': None,
'split': 'train',
'text_field': 'text',
'max_docs': None,
'max_chars': 500_000_000,
'description': 'Alpaca instruction data',
'format_fn': 'alpaca_format',
},
'redpajama': {
'path': 'togethercomputer/RedPajama-Data-1T',
'subset': None,
'split': 'train',
'text_field': 'text',
'max_docs': None,
'max_chars': 10_000_000_000,
'description': 'RedPajama 1T',
},
}
def alpaca_format(example: Dict) -> str:
"""Format Alpaca data into text."""
instruction = example.get('instruction', '')
input_text = example.get('input', '')
output = example.get('output', '')
if input_text:
return f"### Instruction:\n{instruction}\n\n### Input:\n{input_text}\n\n### Response:\n{output}"
return f"### Instruction:\n{instruction}\n\n### Response:\n{output}"
# ─── Tokenizer Training ──────────────────────────────────────────────────────
def train_bpe_tokenizer(output_dir: str, vocab_size: int = 32000,
sample_files: Optional[List[str]] = None) -> str:
"""
Train a BPE tokenizer on sample text data.
Returns the path to the saved tokenizer.
"""
try:
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Metaspace, ByteLevel
from tokenizers.decoders import ByteLevel as ByteLevelDecoder
except ImportError:
print("ERROR: 'tokenizers' library not installed.")
print("Install with: pip install tokenizers")
sys.exit(1)
tokenizer_path = os.path.join(output_dir, f"bpe_tokenizer_{vocab_size}.json")
if os.path.exists(tokenizer_path):
print(f"Tokenizer already exists at {tokenizer_path}")
return tokenizer_path
print(f"\nTraining BPE tokenizer (vocab_size={vocab_size})...")
tokenizer = Tokenizer(BPE(unk_token="[UNK]"))
tokenizer.pre_tokenizer = ByteLevel(add_prefix_space=False)
tokenizer.decoder = ByteLevelDecoder()
trainer = BpeTrainer(
vocab_size=vocab_size,
special_tokens=[
"[PAD]", # 0
"[UNK]", # 1
"[BOS]", # 2
"[EOS]", # 3
],
show_progress=True,
initial_alphabet=ByteLevel.alphabet(),
)
if sample_files and len(sample_files) > 0:
print(f"Training on {len(sample_files)} files...")
tokenizer.train(sample_files, trainer)
else:
print("No sample files provided. Training on built-in data...")
# Generate diverse sample text for tokenizer training
sample_texts = []
# English
sample_texts.extend([
"The quick brown fox jumps over the lazy dog. " * 500,
"Science and technology have transformed our understanding of the universe. " * 500,
"In the field of artificial intelligence, neural networks learn from data. " * 500,
])
# French
sample_texts.extend([
"Le renard brun rapide saute par-dessus le chien paresseux. " * 500,
"La science et la technologie ont transforme notre comprehension de l'univers. " * 500,
])
# Code
sample_texts.extend([
"def hello_world():\n print('Hello, World!')\n return True\n" * 500,
"class NeuralNetwork:\n def __init__(self, layers):\n self.layers = layers\n" * 500,
"import torch\nimport torch.nn as nn\nmodel = nn.Sequential(nn.Linear(768, 768))\n" * 500,
"function fibonacci(n) {\n if (n <= 1) return n;\n return fibonacci(n-1) + fibonacci(n-2);\n}\n" * 500,
])
tokenizer.train_from_iterator(sample_texts, trainer)
os.makedirs(output_dir, exist_ok=True)
tokenizer.save(tokenizer_path)
print(f"Saved tokenizer to {tokenizer_path}")
print(f"Vocabulary size: {tokenizer.get_vocab_size()}")
return tokenizer_path
# ─── Data Processing ─────────────────────────────────────────────────────────
def process_dataset(name: str, config: Dict, tokenizer, output_dir: str,
seq_len: int = 4096) -> Optional[str]:
"""
Process a single dataset and save as pre-tokenized .pt file.
Returns the output path or None if failed.
"""
print(f"\n{'='*60}")
print(f"Processing: {name} β€” {config.get('description', '')}")
print(f"{'='*60}")
output_path = os.path.join(output_dir, f"{name}_packed_seq{seq_len}.pt")
if os.path.exists(output_path):
print(f"Already exists: {output_path}")
return output_path
try:
from datasets import load_dataset
except ImportError:
print("ERROR: 'datasets' library not installed.")
print("Install with: pip install datasets")
return None
# Load dataset
print(f"Loading {config['path']}...")
try:
if config.get('subset'):
ds = load_dataset(
config['path'],
config['subset'],
split=config['split'],
streaming=True,
trust_remote_code=True,
)
else:
ds = load_dataset(
config['path'],
split=config['split'],
streaming=True,
trust_remote_code=True,
)
except Exception as e:
print(f"Failed to load dataset: {e}")
return None
# Filter by language if specified (for code datasets)
if config.get('languages'):
languages = set(config['languages'])
def lang_filter(example):
return example.get('language', '') in languages
ds = ds.filter(lang_filter)
# Tokenize
all_ids = []
doc_count = 0
total_chars = 0
max_chars = config.get('max_chars', 5_000_000_000)
text_field = config.get('text_field', 'text')
format_fn_name = config.get('format_fn')
t0 = time.time()
for example in ds:
# Get text
if format_fn_name == 'alpaca_format':
text = alpaca_format(example)
else:
text = example.get(text_field, '')
if not text or len(text.strip()) < 20:
continue
# Tokenize
ids = tokenizer.encode(text)
if isinstance(ids, list):
all_ids.extend(ids)
elif hasattr(ids, 'ids'):
all_ids.extend(ids.ids)
else:
all_ids.extend(list(ids))
# Add EOS between documents
all_ids.append(3) # [EOS] token id
doc_count += 1
total_chars += len(text)
if doc_count % 10000 == 0:
elapsed = time.time() - t0
print(f" {doc_count:,} docs | {len(all_ids):,} tokens | "
f"{total_chars/1e9:.2f}B chars | {elapsed:.0f}s")
if total_chars >= max_chars:
print(f" Reached char limit ({max_chars/1e9:.1f}B)")
break
if config.get('max_docs') and doc_count >= config['max_docs']:
print(f" Reached doc limit ({config['max_docs']:,})")
break
if len(all_ids) == 0:
print(" No tokens collected!")
return None
# Save
elapsed = time.time() - t0
print(f"\n Final: {doc_count:,} docs, {len(all_ids):,} tokens, {total_chars/1e9:.2f}B chars")
print(f" Time: {elapsed:.0f}s ({doc_count/max(elapsed,1):,.0f} docs/s)")
# Pack into sequences and save
import torch
tensor_data = torch.tensor(all_ids, dtype=torch.long)
torch.save(tensor_data, output_path)
size_gb = os.path.getsize(output_path) / 1e9
print(f" Saved to {output_path} ({size_gb:.2f} GB)")
return output_path
# ─── Merge Datasets ──────────────────────────────────────────────────────────
def merge_datasets(paths: List[str], output_path: str):
"""Merge multiple pre-tokenized datasets into one."""
print(f"\nMerging {len(paths)} datasets...")
all_data = []
for path in paths:
if not os.path.exists(path):
print(f" Skipping (not found): {path}")
continue
data = torch.load(path, map_location='cpu', weights_only=True)
all_data.append(data)
print(f" {path}: {len(data):,} tokens")
if not all_data:
print(" No data to merge!")
return
merged = torch.cat(all_data, dim=0)
print(f" Total: {len(merged):,} tokens")
torch.save(merged, output_path)
size_gb = os.path.getsize(output_path) / 1e9
print(f" Saved to {output_path} ({size_gb:.2f} GB)")
# ─── Main ────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description='CogNet Data Preparation')
parser.add_argument('--output-dir', type=str, default='./data_cache',
help='Output directory for processed data')
parser.add_argument('--vocab-size', type=int, default=32000,
help='BPE vocabulary size')
parser.add_argument('--seq-len', type=int, default=4096,
help='Sequence length for packing')
parser.add_argument('--datasets', nargs='+',
default=['wiki', 'code'],
choices=list(DATASET_CONFIGS.keys()) + ['all'],
help='Datasets to process')
parser.add_argument('--merge', action='store_true',
help='Merge all datasets into one file')
parser.add_argument('--local-data', type=str, default=None,
help='Path to local data directory with .txt/.py files')
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
# Train tokenizer
tokenizer_path = train_bpe_tokenizer(args.output_dir, args.vocab_size)
# Load tokenizer
from tokenizers import Tokenizer
tokenizer = Tokenizer.from_file(tokenizer_path)
print(f"\nTokenizer loaded: {tokenizer.get_vocab_size()} vocab")
# Process datasets
if 'all' in args.datasets:
datasets_to_process = list(DATASET_CONFIGS.keys())
else:
datasets_to_process = args.datasets
output_paths = []
for name in datasets_to_process:
config = DATASET_CONFIGS[name]
path = process_dataset(name, config, tokenizer, args.output_dir, args.seq_len)
if path:
output_paths.append(path)
# Process local data
if args.local_data and os.path.exists(args.local_data):
print(f"\nProcessing local data from {args.local_data}...")
local_ids = []
for ext in ['*.txt', '*.md', '*.py', '*.js', '*.java', '*.c', '*.cpp', '*.rs', '*.go']:
for fpath in Path(args.local_data).rglob(ext):
try:
with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
ids = tokenizer.encode(text)
if isinstance(ids, list):
local_ids.extend(ids)
elif hasattr(ids, 'ids'):
local_ids.extend(ids.ids)
local_ids.append(3) # EOS
except Exception as e:
print(f" Skipping {fpath}: {e}")
if local_ids:
local_path = os.path.join(args.output_dir, "local_packed_seq{args.seq_len}.pt")
torch.save(torch.tensor(local_ids, dtype=torch.long), local_path)
output_paths.append(local_path)
print(f" Local data: {len(local_ids):,} tokens")
# Merge
if args.merge and len(output_paths) > 1:
merge_path = os.path.join(args.output_dir, f"train_packed_seq{args.seq_len}.pt")
merge_datasets(output_paths, merge_path)
print("\n" + "=" * 60)
print("Data preparation complete!")
print(f"Output directory: {args.output_dir}")
print("=" * 60)
if __name__ == '__main__':
main()