| """ |
| CogNet Data Preparation Script |
| =============================== |
| Prepares and tokenizes multiple datasets for training: |
| - Wikipedia (multilingual) |
| - Code datasets (The Stack, CodeParrot) |
| - Books (BookCorpus) |
| - Common Crawl subsets |
| - Custom local files |
| |
| Outputs pre-tokenized .pt files for maximum training throughput. |
| |
| Usage: |
| python prepare_data.py --output-dir ./data_cache --vocab-size 32000 |
| python prepare_data.py --output-dir ./data_cache --datasets wiki code books |
| """ |
|
|
| import argparse |
| import json |
| import os |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Dict, List, Optional |
|
|
| |
|
|
| DATASET_CONFIGS = { |
| 'wiki': { |
| 'path': 'wikimedia/wikipedia', |
| 'subset': '20231101.en', |
| 'split': 'train', |
| 'text_field': 'text', |
| 'max_docs': None, |
| 'max_chars': 5_000_000_000, |
| 'description': 'Wikipedia English', |
| }, |
| 'wiki_fr': { |
| 'path': 'wikimedia/wikipedia', |
| 'subset': '20231101.fr', |
| 'split': 'train', |
| 'text_field': 'text', |
| 'max_docs': None, |
| 'max_chars': 2_000_000_000, |
| 'description': 'Wikipedia French', |
| }, |
| 'code': { |
| 'path': 'bigcode/the-stack', |
| 'subset': 'data', |
| 'split': 'train', |
| 'text_field': 'content', |
| 'max_docs': None, |
| 'max_chars': 5_000_000_000, |
| 'description': 'The Stack (multi-language code)', |
| 'languages': ['python', 'javascript', 'java', 'cpp', 'c', 'rust', 'go', 'typescript'], |
| }, |
| 'code_python': { |
| 'path': 'bigcode/the-stack', |
| 'subset': 'data', |
| 'split': 'train', |
| 'text_field': 'content', |
| 'max_docs': None, |
| 'max_chars': 3_000_000_000, |
| 'description': 'Python code from The Stack', |
| 'languages': ['python'], |
| }, |
| 'books': { |
| 'path': 'bookcorpus/bookcorpus', |
| 'subset': None, |
| 'split': 'train', |
| 'text_field': 'text', |
| 'max_docs': None, |
| 'max_chars': 3_000_000_000, |
| 'description': 'BookCorpus', |
| }, |
| 'c4': { |
| 'path': 'allenai/c4', |
| 'subset': 'en', |
| 'split': 'train', |
| 'text_field': 'text', |
| 'max_docs': None, |
| 'max_chars': 10_000_000_000, |
| 'description': 'C4 (Colossal Clean Crawled Corpus)', |
| }, |
| 'openwebtext': { |
| 'path': 'openwebtext', |
| 'subset': None, |
| 'split': 'train', |
| 'text_field': 'text', |
| 'max_docs': None, |
| 'max_chars': 5_000_000_000, |
| 'description': 'OpenWebText', |
| }, |
| 'alpaca': { |
| 'path': 'tatsu-lab/alpaca', |
| 'subset': None, |
| 'split': 'train', |
| 'text_field': 'text', |
| 'max_docs': None, |
| 'max_chars': 500_000_000, |
| 'description': 'Alpaca instruction data', |
| 'format_fn': 'alpaca_format', |
| }, |
| 'redpajama': { |
| 'path': 'togethercomputer/RedPajama-Data-1T', |
| 'subset': None, |
| 'split': 'train', |
| 'text_field': 'text', |
| 'max_docs': None, |
| 'max_chars': 10_000_000_000, |
| 'description': 'RedPajama 1T', |
| }, |
| } |
|
|
|
|
| def alpaca_format(example: Dict) -> str: |
| """Format Alpaca data into text.""" |
| instruction = example.get('instruction', '') |
| input_text = example.get('input', '') |
| output = example.get('output', '') |
| if input_text: |
| return f"### Instruction:\n{instruction}\n\n### Input:\n{input_text}\n\n### Response:\n{output}" |
| return f"### Instruction:\n{instruction}\n\n### Response:\n{output}" |
|
|
|
|
| |
|
|
| def train_bpe_tokenizer(output_dir: str, vocab_size: int = 32000, |
| sample_files: Optional[List[str]] = None) -> str: |
| """ |
| Train a BPE tokenizer on sample text data. |
| Returns the path to the saved tokenizer. |
| """ |
| try: |
| from tokenizers import Tokenizer |
| from tokenizers.models import BPE |
| from tokenizers.trainers import BpeTrainer |
| from tokenizers.pre_tokenizers import Metaspace, ByteLevel |
| from tokenizers.decoders import ByteLevel as ByteLevelDecoder |
| except ImportError: |
| print("ERROR: 'tokenizers' library not installed.") |
| print("Install with: pip install tokenizers") |
| sys.exit(1) |
|
|
| tokenizer_path = os.path.join(output_dir, f"bpe_tokenizer_{vocab_size}.json") |
| if os.path.exists(tokenizer_path): |
| print(f"Tokenizer already exists at {tokenizer_path}") |
| return tokenizer_path |
|
|
| print(f"\nTraining BPE tokenizer (vocab_size={vocab_size})...") |
|
|
| tokenizer = Tokenizer(BPE(unk_token="[UNK]")) |
| tokenizer.pre_tokenizer = ByteLevel(add_prefix_space=False) |
| tokenizer.decoder = ByteLevelDecoder() |
|
|
| trainer = BpeTrainer( |
| vocab_size=vocab_size, |
| special_tokens=[ |
| "[PAD]", |
| "[UNK]", |
| "[BOS]", |
| "[EOS]", |
| ], |
| show_progress=True, |
| initial_alphabet=ByteLevel.alphabet(), |
| ) |
|
|
| if sample_files and len(sample_files) > 0: |
| print(f"Training on {len(sample_files)} files...") |
| tokenizer.train(sample_files, trainer) |
| else: |
| print("No sample files provided. Training on built-in data...") |
| |
| sample_texts = [] |
| |
| sample_texts.extend([ |
| "The quick brown fox jumps over the lazy dog. " * 500, |
| "Science and technology have transformed our understanding of the universe. " * 500, |
| "In the field of artificial intelligence, neural networks learn from data. " * 500, |
| ]) |
| |
| sample_texts.extend([ |
| "Le renard brun rapide saute par-dessus le chien paresseux. " * 500, |
| "La science et la technologie ont transforme notre comprehension de l'univers. " * 500, |
| ]) |
| |
| sample_texts.extend([ |
| "def hello_world():\n print('Hello, World!')\n return True\n" * 500, |
| "class NeuralNetwork:\n def __init__(self, layers):\n self.layers = layers\n" * 500, |
| "import torch\nimport torch.nn as nn\nmodel = nn.Sequential(nn.Linear(768, 768))\n" * 500, |
| "function fibonacci(n) {\n if (n <= 1) return n;\n return fibonacci(n-1) + fibonacci(n-2);\n}\n" * 500, |
| ]) |
| tokenizer.train_from_iterator(sample_texts, trainer) |
|
|
| os.makedirs(output_dir, exist_ok=True) |
| tokenizer.save(tokenizer_path) |
| print(f"Saved tokenizer to {tokenizer_path}") |
| print(f"Vocabulary size: {tokenizer.get_vocab_size()}") |
|
|
| return tokenizer_path |
|
|
|
|
| |
|
|
| def process_dataset(name: str, config: Dict, tokenizer, output_dir: str, |
| seq_len: int = 4096) -> Optional[str]: |
| """ |
| Process a single dataset and save as pre-tokenized .pt file. |
| Returns the output path or None if failed. |
| """ |
| print(f"\n{'='*60}") |
| print(f"Processing: {name} β {config.get('description', '')}") |
| print(f"{'='*60}") |
|
|
| output_path = os.path.join(output_dir, f"{name}_packed_seq{seq_len}.pt") |
| if os.path.exists(output_path): |
| print(f"Already exists: {output_path}") |
| return output_path |
|
|
| try: |
| from datasets import load_dataset |
| except ImportError: |
| print("ERROR: 'datasets' library not installed.") |
| print("Install with: pip install datasets") |
| return None |
|
|
| |
| print(f"Loading {config['path']}...") |
| try: |
| if config.get('subset'): |
| ds = load_dataset( |
| config['path'], |
| config['subset'], |
| split=config['split'], |
| streaming=True, |
| trust_remote_code=True, |
| ) |
| else: |
| ds = load_dataset( |
| config['path'], |
| split=config['split'], |
| streaming=True, |
| trust_remote_code=True, |
| ) |
| except Exception as e: |
| print(f"Failed to load dataset: {e}") |
| return None |
|
|
| |
| if config.get('languages'): |
| languages = set(config['languages']) |
| def lang_filter(example): |
| return example.get('language', '') in languages |
| ds = ds.filter(lang_filter) |
|
|
| |
| all_ids = [] |
| doc_count = 0 |
| total_chars = 0 |
| max_chars = config.get('max_chars', 5_000_000_000) |
| text_field = config.get('text_field', 'text') |
| format_fn_name = config.get('format_fn') |
|
|
| t0 = time.time() |
|
|
| for example in ds: |
| |
| if format_fn_name == 'alpaca_format': |
| text = alpaca_format(example) |
| else: |
| text = example.get(text_field, '') |
|
|
| if not text or len(text.strip()) < 20: |
| continue |
|
|
| |
| ids = tokenizer.encode(text) |
| if isinstance(ids, list): |
| all_ids.extend(ids) |
| elif hasattr(ids, 'ids'): |
| all_ids.extend(ids.ids) |
| else: |
| all_ids.extend(list(ids)) |
|
|
| |
| all_ids.append(3) |
|
|
| doc_count += 1 |
| total_chars += len(text) |
|
|
| if doc_count % 10000 == 0: |
| elapsed = time.time() - t0 |
| print(f" {doc_count:,} docs | {len(all_ids):,} tokens | " |
| f"{total_chars/1e9:.2f}B chars | {elapsed:.0f}s") |
|
|
| if total_chars >= max_chars: |
| print(f" Reached char limit ({max_chars/1e9:.1f}B)") |
| break |
|
|
| if config.get('max_docs') and doc_count >= config['max_docs']: |
| print(f" Reached doc limit ({config['max_docs']:,})") |
| break |
|
|
| if len(all_ids) == 0: |
| print(" No tokens collected!") |
| return None |
|
|
| |
| elapsed = time.time() - t0 |
| print(f"\n Final: {doc_count:,} docs, {len(all_ids):,} tokens, {total_chars/1e9:.2f}B chars") |
| print(f" Time: {elapsed:.0f}s ({doc_count/max(elapsed,1):,.0f} docs/s)") |
|
|
| |
| import torch |
| tensor_data = torch.tensor(all_ids, dtype=torch.long) |
| torch.save(tensor_data, output_path) |
| size_gb = os.path.getsize(output_path) / 1e9 |
| print(f" Saved to {output_path} ({size_gb:.2f} GB)") |
|
|
| return output_path |
|
|
|
|
| |
|
|
| def merge_datasets(paths: List[str], output_path: str): |
| """Merge multiple pre-tokenized datasets into one.""" |
| print(f"\nMerging {len(paths)} datasets...") |
| all_data = [] |
|
|
| for path in paths: |
| if not os.path.exists(path): |
| print(f" Skipping (not found): {path}") |
| continue |
| data = torch.load(path, map_location='cpu', weights_only=True) |
| all_data.append(data) |
| print(f" {path}: {len(data):,} tokens") |
|
|
| if not all_data: |
| print(" No data to merge!") |
| return |
|
|
| merged = torch.cat(all_data, dim=0) |
| print(f" Total: {len(merged):,} tokens") |
|
|
| torch.save(merged, output_path) |
| size_gb = os.path.getsize(output_path) / 1e9 |
| print(f" Saved to {output_path} ({size_gb:.2f} GB)") |
|
|
|
|
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description='CogNet Data Preparation') |
| parser.add_argument('--output-dir', type=str, default='./data_cache', |
| help='Output directory for processed data') |
| parser.add_argument('--vocab-size', type=int, default=32000, |
| help='BPE vocabulary size') |
| parser.add_argument('--seq-len', type=int, default=4096, |
| help='Sequence length for packing') |
| parser.add_argument('--datasets', nargs='+', |
| default=['wiki', 'code'], |
| choices=list(DATASET_CONFIGS.keys()) + ['all'], |
| help='Datasets to process') |
| parser.add_argument('--merge', action='store_true', |
| help='Merge all datasets into one file') |
| parser.add_argument('--local-data', type=str, default=None, |
| help='Path to local data directory with .txt/.py files') |
| args = parser.parse_args() |
|
|
| os.makedirs(args.output_dir, exist_ok=True) |
|
|
| |
| tokenizer_path = train_bpe_tokenizer(args.output_dir, args.vocab_size) |
|
|
| |
| from tokenizers import Tokenizer |
| tokenizer = Tokenizer.from_file(tokenizer_path) |
| print(f"\nTokenizer loaded: {tokenizer.get_vocab_size()} vocab") |
|
|
| |
| if 'all' in args.datasets: |
| datasets_to_process = list(DATASET_CONFIGS.keys()) |
| else: |
| datasets_to_process = args.datasets |
|
|
| output_paths = [] |
| for name in datasets_to_process: |
| config = DATASET_CONFIGS[name] |
| path = process_dataset(name, config, tokenizer, args.output_dir, args.seq_len) |
| if path: |
| output_paths.append(path) |
|
|
| |
| if args.local_data and os.path.exists(args.local_data): |
| print(f"\nProcessing local data from {args.local_data}...") |
| local_ids = [] |
| for ext in ['*.txt', '*.md', '*.py', '*.js', '*.java', '*.c', '*.cpp', '*.rs', '*.go']: |
| for fpath in Path(args.local_data).rglob(ext): |
| try: |
| with open(fpath, 'r', encoding='utf-8', errors='ignore') as f: |
| text = f.read() |
| ids = tokenizer.encode(text) |
| if isinstance(ids, list): |
| local_ids.extend(ids) |
| elif hasattr(ids, 'ids'): |
| local_ids.extend(ids.ids) |
| local_ids.append(3) |
| except Exception as e: |
| print(f" Skipping {fpath}: {e}") |
|
|
| if local_ids: |
| local_path = os.path.join(args.output_dir, "local_packed_seq{args.seq_len}.pt") |
| torch.save(torch.tensor(local_ids, dtype=torch.long), local_path) |
| output_paths.append(local_path) |
| print(f" Local data: {len(local_ids):,} tokens") |
|
|
| |
| if args.merge and len(output_paths) > 1: |
| merge_path = os.path.join(args.output_dir, f"train_packed_seq{args.seq_len}.pt") |
| merge_datasets(output_paths, merge_path) |
|
|
| print("\n" + "=" * 60) |
| print("Data preparation complete!") |
| print(f"Output directory: {args.output_dir}") |
| print("=" * 60) |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|