import os, time, threading, queue, hashlib, multiprocessing import gzip import orjson import numpy as np import zstandard as zstd from flask import Flask, jsonify from waitress import serve from huggingface_hub import HfApi, hf_hub_download from transformers import AutoTokenizer from selectolax.parser import HTMLParser from bloom_filter import BloomFilter # --- šŸ”± 1. CONFIGURATION & SECRETS --- REPO_ID = "indro-ai/indro-web-data" HF_TOKEN = os.environ.get("HF_TOKEN") assert HF_TOKEN is not None, "HF_TOKEN missing! Add it to your Space Secrets." NUM_FILES = 16 TEXT_COLUMN = "text" MAX_TOKENS_PER_DOC = 8192 MIN_CHAR_LENGTH = 100 # Increased for better quality VAL_SPLIT_RATIO = 0.01 OUTPUT_FOLDER = "tokinzed(data)" os.makedirs(OUTPUT_FOLDER, exist_ok=True) BUFFER_CAPACITY = 10_000_000 NUM_CORES = max(1, multiprocessing.cpu_count() - 1) # English Quality Check (Stopwords) ENGLISH_STOPWORDS = {"the", "and", "to", "of", "a", "in", "is", "it", "that", "for"} # --- šŸ”± 2. THREAD-SAFE DASHBOARD --- app = Flask(__name__) state_lock = threading.Lock() forge_state = { "status": "Booting Forge V4 (Frontier Edition)", "current_file": "None", "files_completed": 0, "train_tokens": 0, "val_tokens": 0, "speed_tokens_sec": 0.0, "eta_hours": 0.0, "docs_dropped_garbage": 0, "docs_dropped_duplicates": 0, "docs_dropped_quality": 0 # Track non-English/Spam } @app.route('/health') def health(): with state_lock: return jsonify(forge_state) def start_dashboard(): serve(app, host="0.0.0.0", port=7860) # --- šŸ”± 3. GLOBAL WORKER INITIALIZATION --- # This fixes the massive V3 bottleneck. Tokenizer loads ONCE per CPU core. global_tokenizer = None global_eot = None def init_worker(): global global_tokenizer, global_eot global_tokenizer = AutoTokenizer.from_pretrained("gpt2") global_eot = global_tokenizer.eos_token_id # --- šŸ”± 4. RESEARCH LAB DATA CLEANERS --- def is_high_quality_english(text): """Fast heuristic to drop code blobs, SEO spam, and foreign languages.""" words = text.lower().split() if len(words) < 15: return False # Check first 100 words for English density sample = words[:100] stopword_count = sum(1 for w in sample if w in ENGLISH_STOPWORDS) # If less than 4% of the text is common English connecting words, it's likely garbage/code if (stopword_count / len(sample)) < 0.04: return False return True def clean_and_tokenize(batch): """Runs on parallel background CPU cores""" processed_docs = [] garbage_count = 0 quality_drop_count = 0 for text in batch: # 1. C-Speed HTML Parsing (Massive upgrade from Regex) try: tree = HTMLParser(text) # Remove script and style tags completely for tag in tree.css('script, style'): tag.decompose() text = tree.text(separator=' ', strip=True) except Exception: pass # Fallback if text is heavily malformed # 2. Length Filter if len(text) < MIN_CHAR_LENGTH: garbage_count += 1 continue # 3. Quality & Language Filter if not is_high_quality_english(text): quality_drop_count += 1 continue # 4. Cryptographic Hashing for Bloom Filter doc_hash = hashlib.md5(text.encode('utf-8')).hexdigest() # 5. Tokenize using the GLOBAL worker tokenizer tokens = global_tokenizer.encode(text, add_special_tokens=False) safe_tokens = tokens[:MAX_TOKENS_PER_DOC] + [global_eot] # 6. Probabilistic Routing is_val = np.random.rand() < VAL_SPLIT_RATIO processed_docs.append((safe_tokens, doc_hash, is_val)) return processed_docs, garbage_count, quality_drop_count # --- šŸ”± 5. THE FORGE ENGINE --- def upload_with_retry(api, filepath, repo_path, retries=3): for attempt in range(retries): try: api.upload_file( path_or_fileobj=filepath, path_in_repo=repo_path, repo_id=REPO_ID, repo_type="dataset", commit_message=f"Added {repo_path} (V4: Bloom Filter, Selectolax, Quality Heuristics)" ) return True except Exception as e: print(f"āš ļø Upload failed (Attempt {attempt+1}/{retries}): {e}") time.sleep(5) return False def flush_buffer(buffer_array, cursor, filename, cctx): if cursor == 0: return with open(filename, 'ab') as out_f: with cctx.stream_writer(out_f) as writer: writer.write(buffer_array[:cursor].tobytes()) def process_file(file_index, api, start_time, bloom_filter): source_filename = f"data/indro_v52_zenith_{file_index}.jsonl.gz" train_filename = f"{OUTPUT_FOLDER}/tokens_{file_index}.bin.zst" val_filename = f"{OUTPUT_FOLDER}/tokens_val_{file_index}.bin.zst" with state_lock: forge_state["status"] = f"Downloading {source_filename}" forge_state["current_file"] = source_filename print(f"\nā¬‡ļø Downloading {source_filename}...") try: file_path = hf_hub_download(repo_id=REPO_ID, filename=source_filename, repo_type="dataset", token=HF_TOKEN) except Exception as e: print(f"āŒ Failed to download {source_filename}: {e}") return with state_lock: forge_state["status"] = f"Parallel Tokenizing {source_filename}" print(f"🧠 Utilizing {NUM_CORES} CPU Cores for Tokenization...") cctx = zstd.ZstdCompressor(level=3) train_buffer = np.zeros(BUFFER_CAPACITY, dtype=np.uint16) val_buffer = np.zeros(BUFFER_CAPACITY, dtype=np.uint16) train_cursor = 0 val_cursor = 0 open(train_filename, 'wb').close() open(val_filename, 'wb').close() # AUDIT FIX: Initialize the tokenizer once per worker core pool = multiprocessing.Pool(processes=NUM_CORES, initializer=init_worker) def read_batches(): batch = [] with gzip.open(file_path, 'rt', encoding='utf-8') as gz_f: for line in gz_f: try: data = orjson.loads(line) text = data.get(TEXT_COLUMN, "") if text: batch.append(text) if len(batch) >= 2048: yield batch batch = [] except orjson.JSONDecodeError: continue if batch: yield batch for results, garbage_count, quality_drop_count in pool.imap_unordered(clean_and_tokenize, read_batches()): with state_lock: forge_state["docs_dropped_garbage"] += garbage_count forge_state["docs_dropped_quality"] += quality_drop_count for tokens, doc_hash, is_val in results: # AUDIT FIX: High-Scale Bloom Filter Deduplication (O(1) Memory) if doc_hash in bloom_filter: with state_lock: forge_state["docs_dropped_duplicates"] += 1 continue bloom_filter.add(doc_hash) tok_len = len(tokens) if is_val: if val_cursor + tok_len > BUFFER_CAPACITY: flush_buffer(val_buffer, val_cursor, val_filename, cctx) val_cursor = 0 val_buffer[val_cursor:val_cursor+tok_len] = tokens val_cursor += tok_len with state_lock: forge_state["val_tokens"] += tok_len else: if train_cursor + tok_len > BUFFER_CAPACITY: flush_buffer(train_buffer, train_cursor, train_filename, cctx) train_cursor = 0 train_buffer[train_cursor:train_cursor+tok_len] = tokens train_cursor += tok_len with state_lock: forge_state["train_tokens"] += tok_len with state_lock: total = forge_state["train_tokens"] + forge_state["val_tokens"] elapsed = time.time() - start_time speed = total / max(1, elapsed) forge_state["speed_tokens_sec"] = speed remaining = 3_000_000_000 - total forge_state["eta_hours"] = max(0.0, round((remaining / max(1, speed)) / 3600, 2)) pool.close() pool.join() flush_buffer(train_buffer, train_cursor, train_filename, cctx) flush_buffer(val_buffer, val_cursor, val_filename, cctx) # --- 6. UPLOAD --- with state_lock: forge_state["status"] = f"Uploading files for {file_index}" if train_cursor > 0 or os.path.getsize(train_filename) > 0: upload_with_retry(api, train_filename, f"{OUTPUT_FOLDER}/tokens_{file_index}.bin.zst") if val_cursor > 0 or os.path.getsize(val_filename) > 0: upload_with_retry(api, val_filename, f"{OUTPUT_FOLDER}/tokens_val_{file_index}.bin.zst") os.remove(train_filename) os.remove(val_filename) os.remove(file_path) with state_lock: forge_state["files_completed"] += 1 def main(): api = HfApi(token=HF_TOKEN) start_time = time.time() # AUDIT FIX: Massive 100-Million Capacity Bloom Filter print("šŸ›”ļø Initializing Bloom Filter (Capacity: 100M documents)...") bloom_filter = BloomFilter(max_elements=100_000_000, error_rate=0.01) for i in range(1, NUM_FILES + 1): process_file(i, api, start_time, bloom_filter) with state_lock: forge_state["status"] = "COMPLETE" forge_state["eta_hours"] = 0.0 print("\nšŸŽ‰ FORGE V4 COMPLETED: All files optimized, deduped, and uploaded!") if __name__ == "__main__": threading.Thread(target=start_dashboard, daemon=True).start() main()