| """Compute morphological fidelity metrics (ue and uc) for 80K and 110K tokenizers.""" |
|
|
| import json |
| import sys |
| import gc |
| from pathlib import Path |
| from tokenizers import Tokenizer as HFTokenizer |
| import numpy as np |
| from tqdm import tqdm |
|
|
| |
| |
| |
| RESULTS = Path("/root/oiq_cc_tokenizer/results") |
| TOKENIZER_DIR = RESULTS / "tokenizers" |
| MORPH_CACHE = RESULTS / "morphology" / "farasa_segmentations.json" |
| CORPUS_DIR = RESULTS / "corpora" |
| OUTPUT_CSV = RESULTS / "morph_large_vocab_results.csv" |
|
|
| SPECIAL_TOKENS = ("<<pad>", "<unk>", "<s>", "</s>", "<mask>") |
| MORPH_K_CLUSTERS = 30 |
| MORPH_C_PAIRS = 20 |
| MORPH_BOOTSTRAP_N = 5 |
|
|
| |
| |
| |
| print("Loading Arabic test corpus...") |
| with open(CORPUS_DIR / "test_ar.txt", encoding="utf-8") as f: |
| test_ar_texts = [line.strip() for line in f if line.strip()] |
| print(f" {len(test_ar_texts)} Arabic test texts") |
|
|
| |
| |
| |
| print("Loading Farasa segmentations...") |
| with open(MORPH_CACHE, encoding="utf-8") as f: |
| morph_segmentations = json.load(f) |
| print(f" {len(morph_segmentations)} cached segmentations") |
|
|
| morph_db_light = {} |
| for text in test_ar_texts: |
| wm = morph_segmentations.get(text, []) |
| if wm: |
| morph_db_light[text] = wm |
| print(f" {len(morph_db_light)} test texts have morph data") |
| del morph_segmentations |
| gc.collect() |
|
|
| |
| |
| |
| import regex |
|
|
| ARABIC_RANGE = regex.compile(r"[\u0600-\u06FF\u0750-\u077F]") |
|
|
|
|
| def detect_script(text): |
| ar_chars = len(ARABIC_RANGE.findall(text)) |
| return "ar" if ar_chars > len(text) * 0.3 else "az" |
|
|
|
|
| def tokenize_and_decode(tok_info, text): |
| is_concat = tok_info["type"] == "concatenated" |
| if is_concat: |
| concat = tok_info["tokenizer"] |
| script = detect_script(text) |
| if script == "ar": |
| enc = concat["tokenizer_ar"].encode(text) |
| decoded = concat["tokenizer_ar"].decode(enc.ids, skip_special_tokens=True) |
| else: |
| enc = concat["tokenizer_az"].encode(text) |
| decoded = concat["tokenizer_az"].decode(enc.ids, skip_special_tokens=True) |
| return enc.tokens, enc.ids, decoded |
| else: |
| enc = tok_info["tokenizer"].encode(text) |
| decoded = tok_info["tokenizer"].decode(enc.ids, skip_special_tokens=True) |
| return enc.tokens, enc.ids, decoded |
|
|
|
|
| def filter_content(tokens): |
| return [t for t in tokens if t not in SPECIAL_TOKENS] |
|
|
|
|
| |
| |
| |
| def morph_edit_distance(tokens, morphemes): |
| if not tokens or not morphemes: |
| return 0.0 |
| m, n = len(tokens), len(morphemes) |
| dp = [[0] * (n + 1) for _ in range(m + 1)] |
| for i in range(m + 1): |
| dp[i][0] = i |
| for j in range(n + 1): |
| dp[0][j] = j |
| for i in range(1, m + 1): |
| for j in range(1, n + 1): |
| cost = 0 if tokens[i - 1] == morphemes[j - 1] else 1 |
| dp[i][j] = min(dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i][j - 1] + cost) |
| return float(dp[m][n]) |
|
|
|
|
| def compute_morph_edit_distance_score(tok_info, texts, morph_db): |
| distances = [] |
| for text in texts: |
| word_morphs = morph_db.get(text, []) |
| if not word_morphs: |
| continue |
| tokens_list, _, _ = tokenize_and_decode(tok_info, text) |
| content_tokens = filter_content(tokens_list) |
| token_idx = 0 |
| for word, morphs in word_morphs: |
| word_toks = [] |
| while token_idx < len(content_tokens) and len(word_toks) < len(word): |
| word_toks.append(content_tokens[token_idx]) |
| token_idx += 1 |
| if word_toks: |
| d = morph_edit_distance(word_toks, morphs) |
| distances.append(d) |
| return float(np.mean(distances)) if distances else 0.0 |
|
|
|
|
| def compute_morph_consistency_f1(tok_info, texts, morph_db, k_clusters, c_pairs, bootstrap_n): |
| from sklearn.cluster import KMeans |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from collections import defaultdict |
|
|
| word_data = [] |
| seen_words = set() |
| for text in texts: |
| word_morphs = morph_db.get(text, []) |
| for word, morphs in word_morphs: |
| if word not in seen_words and word and morphs: |
| word_data.append((word, set(morphs))) |
| seen_words.add(word) |
|
|
| if len(word_data) < c_pairs * 2: |
| return 0.0, 0.0, 0.0 |
|
|
| vectorizer = TfidfVectorizer(analyzer=lambda m: list(m[1])) |
| morph_strs = [" ".join(morphs) for _, morphs in word_data] |
|
|
| try: |
| tfidf_matrix = vectorizer.fit_transform(morph_strs) |
| if tfidf_matrix.shape[1] < k_clusters: |
| k_clusters = max(1, tfidf_matrix.shape[1]) |
| km = KMeans(n_clusters=k_clusters, random_state=42, n_init=10) |
| labels = km.fit_predict(tfidf_matrix) |
| except Exception: |
| labels = np.zeros(len(word_data), dtype=int) |
|
|
| clusters = defaultdict(list) |
| for i, label in enumerate(labels): |
| clusters[int(label)].append(word_data[i]) |
|
|
| valid_clusters = {k: v for k, v in clusters.items() if len(v) >= 2} |
| rng = np.random.RandomState(42) |
| all_prec, all_rec, all_f1 = [], [], [] |
|
|
| for _ in range(bootstrap_n): |
| prec_list, rec_list = [], [] |
| for cluster_words in valid_clusters.values(): |
| if len(cluster_words) < 2: |
| continue |
| indices = rng.choice(len(cluster_words), size=min(c_pairs, len(cluster_words)), replace=False) |
| sample = [cluster_words[i] for i in indices] |
| prec_cluster, rec_cluster = [], [] |
| for i in range(len(sample)): |
| for j in range(i + 1, len(sample)): |
| w1, morphs1 = sample[i] |
| w2, morphs2 = sample[j] |
| shared_morph = len(morphs1 & morphs2) > 0 |
| t1, _, _ = tokenize_and_decode(tok_info, w1) |
| t2, _, _ = tokenize_and_decode(tok_info, w2) |
| toks1 = set(filter_content(t1)) |
| toks2 = set(filter_content(t2)) |
| shared_tok = len(toks1 & toks2) > 0 |
| if shared_tok and not shared_morph: |
| prec_cluster.append(0.0) |
| elif shared_tok: |
| prec_cluster.append(1.0) |
| if shared_morph: |
| rec_cluster.append(1.0 if shared_tok else 0.0) |
| if prec_cluster: |
| prec_list.append(np.mean(prec_cluster)) |
| if rec_cluster: |
| rec_list.append(np.mean(rec_cluster)) |
| if prec_list: |
| all_prec.append(np.mean(prec_list)) |
| if rec_list: |
| all_rec.append(np.mean(rec_list)) |
| if prec_list and rec_list: |
| p, r = np.mean(prec_list), np.mean(rec_list) |
| all_f1.append(2 * p * r / max(p + r, 1e-10)) |
|
|
| return ( |
| float(np.mean(all_prec)) if all_prec else 0.0, |
| float(np.mean(all_rec)) if all_rec else 0.0, |
| float(np.mean(all_f1)) if all_f1 else 0.0, |
| ) |
|
|
|
|
| |
| |
| |
| VOCAB_SIZES = [80000, 110000] |
| ALGOS = ["BPE", "Unigram", "WordPiece", "BBPE"] |
| ARCHES = ["shared", "concatenated"] |
|
|
| tokenizers_to_eval = [] |
|
|
| for vsz in VOCAB_SIZES: |
| for algo in ALGOS: |
| for arch in ARCHES: |
| name = f"{'shared' if arch == 'shared' else 'concat'}_{algo.lower()}_{vsz}" |
| if arch == "shared": |
| path = TOKENIZER_DIR / f"shared_{algo.lower()}_{vsz}.json" |
| if not path.exists(): |
| print(f" SKIP {name}: {path} not found") |
| continue |
| tok = HFTokenizer.from_file(str(path)) |
| tok_info = { |
| "tokenizer": tok, |
| "type": "shared", |
| "algorithm": algo, |
| "vocab_size": vsz, |
| "name": name, |
| } |
| else: |
| half = vsz // 2 |
| ar_path = TOKENIZER_DIR / f"concat_ar_{algo.lower()}_{half}.json" |
| az_path = TOKENIZER_DIR / f"concat_az_{algo.lower()}_{half}.json" |
| if not ar_path.exists() or not az_path.exists(): |
| print(f" SKIP {name}: concat files not found") |
| continue |
| tok_ar = HFTokenizer.from_file(str(ar_path)) |
| tok_az = HFTokenizer.from_file(str(az_path)) |
| tok_info = { |
| "tokenizer": { |
| "tokenizer_ar": tok_ar, "tokenizer_az": tok_az, |
| "vocab_size_ar": half, "vocab_size_az": half, |
| "shift": half, "algorithm": algo, |
| "total_vocab_size": vsz, |
| }, |
| "type": "concatenated", |
| "algorithm": algo, |
| "vocab_size": vsz, |
| "name": name, |
| } |
| tokenizers_to_eval.append(tok_info) |
|
|
| print(f"\nLoaded {len(tokenizers_to_eval)} tokenizers to evaluate") |
| for t in tokenizers_to_eval: |
| print(f" - {t['name']}") |
|
|
| |
| |
| |
| import csv |
|
|
| results = [] |
| for tok_info in tqdm(tokenizers_to_eval, desc="Morphological evaluation"): |
| name = tok_info["name"] |
| print(f"\nEvaluating: {name}") |
|
|
| ue = compute_morph_edit_distance_score(tok_info, test_ar_texts, morph_db_light) |
| p, r, f1 = compute_morph_consistency_f1( |
| tok_info, test_ar_texts, morph_db_light, |
| k_clusters=MORPH_K_CLUSTERS, |
| c_pairs=MORPH_C_PAIRS, |
| bootstrap_n=MORPH_BOOTSTRAP_N, |
| ) |
| print(f" ue={ue:.4f} P={p:.4f} R={r:.4f} F1={f1:.4f}") |
| results.append({ |
| "name": name, |
| "type": tok_info["type"], |
| "algorithm": tok_info["algorithm"], |
| "vocab_size": tok_info["vocab_size"], |
| "morph_edit_distance_ar": round(ue, 4), |
| "morph_consistency_precision": round(p, 4), |
| "morph_consistency_recall": round(r, 4), |
| "morph_consistency_f1": round(f1, 4), |
| }) |
|
|
| |
| |
| |
| with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f: |
| writer = csv.DictWriter(f, fieldnames=results[0].keys()) |
| writer.writeheader() |
| writer.writerows(results) |
|
|
| print(f"\nResults saved to {OUTPUT_CSV}") |
| print("\nSummary:") |
| for r in results: |
| print(f" {r['name']:40s} ue={r['morph_edit_distance_ar']:.4f} F1={r['morph_consistency_f1']:.4f}") |
|
|