#!/usr/bin/env python3 -u """bootstrap_test_set.py — Bootstrap 95% CIs on test set, for Table 5 consistency.""" import json, os, sys, csv, gc, warnings from dataclasses import dataclass, asdict from collections import Counter from typing import List import numpy as np import regex warnings.filterwarnings("ignore") BASE = "/root/oiq_cc_tokenizer/results" CORPORA = os.path.join(BASE, "corpora") TOK_DIR = os.path.join(BASE, "tokenizers") _WORD_PAT = regex.compile(r"[\p{L}\p{M}\p{N}]+", regex.UNICODE) _AR_PAT = regex.compile(r"[\u0600-\u06FF\u0750-\u077F]") _SPECIAL = {"", "", "", "[CLS]", "[SEP]", "[PAD]", "[UNK]", "", ""} def segment_words(t): return _WORD_PAT.findall(t) def count_graphemes(t): return len(regex.findall(r"\X", t)) def detect_script(t): return "ar" if len(_AR_PAT.findall(t)) > len(t) * 0.3 else "az" def filter_sp(tokens): return [t for t in tokens if t not in _SPECIAL] class RawConcat: def __init__(self, ar_j, az_j): from tokenizers import Tokenizer self.ar = Tokenizer.from_file(ar_j) self.az = Tokenizer.from_file(az_j) def encode(self, text): s = detect_script(text) t = self.ar if s == "ar" else self.az enc = t.encode(text) return enc.tokens, enc.ids, s class RawShared: def __init__(self, j): from tokenizers import Tokenizer self.tok = Tokenizer.from_file(j) def encode(self, text): enc = self.tok.encode(text) return enc.tokens, enc.ids, detect_script(text) def precompute_metrics(texts): """Compute per-text fertility and CPT for bootstrap resampling.""" words_per_text = [segment_words(t) for t in texts] graphemes_per_text = [count_graphemes(t) for t in texts] return words_per_text, graphemes_per_text def bootstrap_ci(tok, texts, words_per_text, graphemes_per_text, n_bootstrap=500): """Pre-compute per-text metrics once, then resample.""" n = len(texts) # Pre-compute per-text fertility and CPT per_text_fert = [] per_text_cpt = [] valid_mask = [] for i, text in enumerate(texts): w = words_per_text[i] if not w: valid_mask.append(False) per_text_fert.append(0) per_text_cpt.append(0) continue try: tokens, ids, script = tok.encode(text) content = filter_sp(tokens) fert = len(content) / len(w) cpt = graphemes_per_text[i] / max(len(content), 1) valid_mask.append(True) per_text_fert.append(fert) per_text_cpt.append(cpt) except: valid_mask.append(False) per_text_fert.append(0) per_text_cpt.append(0) valid_idx = np.where(valid_mask)[0] fert_arr = np.array([per_text_fert[i] for i in valid_idx]) cpt_arr = np.array([per_text_cpt[i] for i in valid_idx]) n_valid = len(valid_idx) fert_samples = [] cpt_samples = [] rng = np.random.RandomState(42) for _ in range(n_bootstrap): idx = rng.choice(n_valid, size=n_valid, replace=True) fert_samples.append(np.mean(fert_arr[idx])) cpt_samples.append(np.mean(cpt_arr[idx])) point_fert = float(np.mean(fert_arr)) point_cpt = float(np.mean(cpt_arr)) fert_lo, fert_hi = float(np.percentile(fert_samples, 2.5)), float(np.percentile(fert_samples, 97.5)) cpt_lo, cpt_hi = float(np.percentile(cpt_samples, 2.5)), float(np.percentile(cpt_samples, 97.5)) return point_fert, fert_lo, fert_hi, point_cpt, cpt_lo, cpt_hi def main(): texts = [] for s in ("test_ar", "test_az", "test_mi"): p = os.path.join(CORPORA, f"{s}.txt") if os.path.exists(p): with open(p) as f: texts.extend(l.strip() for l in f if l.strip()) print(f"{len(texts)} test texts", flush=True) words_per_text, graphemes_per_text = precompute_metrics(texts) results = [] for vsz in (8000, 16000, 32000): for algo in ("bpe", "unigram", "wordpiece", "bbpe"): jpath = os.path.join(TOK_DIR, f"shared_{algo}_{vsz}.json") if os.path.exists(jpath): name = f"shared_{algo}_{vsz}" print(f"\n{name}", flush=True) tok = RawShared(jpath) r = bootstrap_ci(tok, texts, words_per_text, graphemes_per_text) print(f" F={r[0]:.4f} [{r[1]:.4f}, {r[2]:.4f}] CPT={r[3]:.3f} [{r[4]:.3f}, {r[5]:.3f}]", flush=True) results.append({"name": name, **dict(zip(["fert","fert_lo","fert_hi","cpt","cpt_lo","cpt_hi"], r))}) del tok; gc.collect() ar_j = os.path.join(TOK_DIR, f"concat_ar_{algo}_{vsz//2}.json") az_j = os.path.join(TOK_DIR, f"concat_az_{algo}_{vsz//2}.json") if os.path.exists(ar_j) and os.path.exists(az_j): name = f"concat_{algo}_{vsz}" print(f"\n{name}", flush=True) tok = RawConcat(ar_j, az_j) r = bootstrap_ci(tok, texts, words_per_text, graphemes_per_text) print(f" F={r[0]:.4f} [{r[1]:.4f}, {r[2]:.4f}] CPT={r[3]:.3f} [{r[4]:.3f}, {r[5]:.3f}]", flush=True) results.append({"name": name, **dict(zip(["fert","fert_lo","fert_hi","cpt","cpt_lo","cpt_hi"], r))}) del tok; gc.collect() # Verify consistency with test_set_results.csv print("\n--- Consistency check ---", flush=True) import csv as csv_mod test_results = {} with open(os.path.join(BASE, "test_set_results.csv")) as f: for row in csv_mod.DictReader(f): test_results[row["name"]] = row print(f"{'Name':<25} {'Table5_F':>8} {'TestCSV_F':>8} {'Match':>5} {'Table5_CPT':>8} {'TestCSV_CPT':>8} {'Match':>5}", flush=True) for r in results: csv_r = test_results.get(r["name"]) if csv_r: f_match = abs(float(r["fert"]) - float(csv_r["fertility_overall"])) < 0.001 c_match = abs(float(r["cpt"]) - float(csv_r["cpt_overall"])) < 0.01 print(f"{r['name']:<25} {r['fert']:>8.4f} {float(csv_r['fertility_overall']):>8.4f} {'OK' if f_match else 'MISMATCH':>5} {r['cpt']:>8.3f} {float(csv_r['cpt_overall']):>8.3f} {'OK' if c_match else 'MISMATCH':>5}", flush=True) # Save out = os.path.join(BASE, "bootstrap_ci_test_set.csv") with open(out, "w", newline="") as f: w = csv_mod.DictWriter(f, fieldnames=["name","fert","fert_lo","fert_hi","cpt","cpt_lo","cpt_hi"]) w.writeheader() for r in results: w.writerow(r) print(f"\nSaved: {out}", flush=True) print("DONE!", flush=True) if __name__ == "__main__": main()