| |
| """ |
| dataset_stats.py — Load each Darija dataset from HuggingFace and compute: |
| - min / max / mean / median sentence length (characters) |
| - count of Arabic / Arabizi / Mixed sentences |
| """ |
|
|
| import json, csv, os, gc, warnings, statistics, random |
|
|
| import regex |
| warnings.filterwarnings("ignore") |
|
|
| HF_TOKEN = os.environ.get("HF_TOKEN", "") |
| OUTPUT_DIR = "/root/oiq_cc_tokenizer/results" |
|
|
| _AR_PAT = regex.compile(r"[\u0600-\u06FF\u0750-\u077F]") |
| _LAT_PAT = regex.compile(r"[a-zA-Z]") |
|
|
| DATASETS = [ |
| { |
| "name": "DODa", |
| "repo": "atlasia/DODa", |
| "split": "train", |
| "text_col": None, |
| "config": None, |
| "max_rows": 0, |
| }, |
| { |
| "name": "Darija-Wiki", |
| "repo": "atlasia/Moroccan-Darija-Wiki-Dataset", |
| "split": "train", |
| "text_col": None, |
| "config": None, |
| "max_rows": 0, |
| }, |
| { |
| "name": "Atlaset", |
| "repo": "atlasia/Atlaset", |
| "split": "train", |
| "text_col": None, |
| "config": None, |
| "max_rows": 0, |
| }, |
| { |
| "name": "Ours (daa-pairs)", |
| "repo": "OiQ/daa-pairs", |
| "split": "train", |
| "text_col": None, |
| "config": None, |
| "max_rows": 0, |
| }, |
| ] |
|
|
|
|
| class IncrementalStats: |
| """Compute min/max/mean/median without storing all values.""" |
| def __init__(self): |
| self.count = 0 |
| self._min = float('inf') |
| self._max = 0 |
| self._sum = 0 |
| self._samples = [] |
| self._reservoir_size = 10000 |
| self._seen = 0 |
|
|
| def add(self, val): |
| self.count += 1 |
| self._seen += 1 |
| self._min = min(self._min, val) |
| self._max = max(self._max, val) |
| self._sum += val |
| if len(self._samples) < self._reservoir_size: |
| self._samples.append(val) |
| else: |
| j = random.randint(0, self._seen - 1) |
| if j < self._reservoir_size: |
| self._samples[j] = val |
|
|
| def finalize(self): |
| return { |
| "count": self.count, |
| "min": self._min if self._min != float('inf') else 0, |
| "max": self._max, |
| "mean": round(self._sum / max(self.count, 1), 2), |
| "median": statistics.median(self._samples) if self._samples else 0, |
| } |
|
|
|
|
| def classify_script(text: str) -> str: |
| ar_chars = len(_AR_PAT.findall(text)) |
| lat_chars = len(_LAT_PAT.findall(text)) |
| total_alpha = ar_chars + lat_chars |
| if total_alpha == 0: |
| return "ar" |
| ar_ratio = ar_chars / total_alpha |
| lat_ratio = lat_chars / total_alpha |
| if ar_ratio > 0.9 and lat_ratio < 0.1: |
| return "ar" |
| elif lat_ratio > 0.9 and ar_ratio < 0.1: |
| return "az" |
| else: |
| return "mi" |
|
|
|
|
| def find_text_column_from_cols(cols, name: str) -> str: |
| candidates = ["text", "darija", "arabic", "sentence", "word", |
| "content", "src", "source", "ar", "az", "mixed", |
| "darija_ar", "darija_az", "darija_mix"] |
| for c in candidates: |
| if c in cols: |
| print(f" [{name}] Using column '{c}' from {cols}") |
| return c |
| print(f" [{name}] Could not find text column! Columns: {cols}") |
| return None |
|
|
|
|
| def compute_stats(lengths_by_script: dict) -> dict: |
| result = {} |
| for script in ("ar", "az", "mi", "ALL"): |
| s = lengths_by_script.get(script) |
| if s is None: |
| result[script] = {"count": 0, "min": 0, "max": 0, "mean": 0, "median": 0} |
| else: |
| result[script] = s.finalize() |
| return result |
|
|
|
|
| def process_dataset(name: str, repo: str, split: str, text_col=None, |
| config=None, max_rows: int = 0) -> dict | None: |
| from datasets import load_dataset |
| print(f"\n{'='*80}") |
| print(f"Loading {name} ({repo})...", flush=True) |
|
|
| load_kwargs = {"token": HF_TOKEN} |
| if config: |
| load_kwargs["name"] = config |
|
|
| try: |
| ds = load_dataset(repo, split=split, streaming=True, **load_kwargs) |
| total_rows = None |
| print(f" Streaming mode enabled") |
| except Exception as e: |
| print(f" Streaming failed ({e}), trying regular mode...") |
| try: |
| ds = load_dataset(repo, split=split, **load_kwargs) |
| total_rows = len(ds) |
| except Exception as e2: |
| print(f" Failed to load {repo}: {e2}") |
| return None |
|
|
| |
| if total_rows is None: |
| peek = [] |
| for row in ds: |
| peek.append(row) |
| if len(peek) >= 5: |
| break |
| cols = list(peek[0].keys()) if peek else [] |
| else: |
| cols = ds.column_names |
|
|
| print(f" Columns: {cols}") |
| if text_col is None or text_col not in cols: |
| text_col = find_text_column_from_cols(cols, name) |
| if text_col is None: |
| return None |
|
|
| lengths_by_script = { |
| "ar": IncrementalStats(), "az": IncrementalStats(), |
| "mi": IncrementalStats(), "ALL": IncrementalStats(), |
| } |
| total_processed = 0 |
| max_label = f" (max {max_rows:,})" if max_rows else "" |
|
|
| |
| if total_rows is None: |
| for row in ds: |
| if max_rows and total_processed >= max_rows: |
| print(f" Reached sample limit of {max_rows:,}", flush=True) |
| break |
| text = row.get(text_col) |
| if not isinstance(text, str) or len(text.strip()) < 2: |
| continue |
| sc = classify_script(text) |
| length = len(text) |
| lengths_by_script[sc].add(length) |
| lengths_by_script["ALL"].add(length) |
| total_processed += 1 |
| if total_processed % 100000 == 0: |
| print(f" Processed {total_processed:,}{max_label}...", flush=True) |
| |
| else: |
| batch = 50000 |
| for i in range(0, total_rows, batch): |
| chunk = ds[i:i+batch] |
| for text in chunk[text_col]: |
| if max_rows and total_processed >= max_rows: |
| break |
| if not isinstance(text, str) or len(text.strip()) < 2: |
| continue |
| sc = classify_script(text) |
| length = len(text) |
| lengths_by_script[sc].add(length) |
| lengths_by_script["ALL"].add(length) |
| total_processed += 1 |
| if total_processed % 100000 == 0 or i + batch >= total_rows: |
| print(f" Processed {total_processed:,}/{total_rows:,}{max_label}", flush=True) |
| if max_rows and total_processed >= max_rows: |
| print(f" Reached sample limit of {max_rows:,}", flush=True) |
| break |
|
|
| stats = compute_stats(lengths_by_script) |
| stats["dataset"] = name |
| stats["repo"] = repo |
| stats["text_col"] = text_col |
| stats["total_rows"] = total_rows if total_rows else total_processed |
| stats["total_processed"] = total_processed |
|
|
| print(f"\n Results for {name}:") |
| for script in ("ar", "az", "mi", "ALL"): |
| s = stats[script] |
| print(f" {script.upper():>4}: n={s['count']:>8,} min={s['min']:>5} " |
| f"max={s['max']:>6} mean={s['mean']:>7.1f} median={s['median']:>5.0f}") |
|
|
| del ds |
| gc.collect() |
| return stats |
|
|
|
|
| def save_results(all_stats: list): |
| csv_path = os.path.join(OUTPUT_DIR, "dataset_stats.csv") |
| fieldnames = ["dataset", "repo", "text_col", "total_rows", "total_processed", |
| "ar_count", "ar_min", "ar_max", "ar_mean", "ar_median", |
| "az_count", "az_min", "az_max", "az_mean", "az_median", |
| "mi_count", "mi_min", "mi_max", "mi_mean", "mi_median", |
| "ALL_count", "ALL_min", "ALL_max", "ALL_mean", "ALL_median"] |
| with open(csv_path, "w", newline="") as f: |
| w = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore") |
| w.writeheader() |
| for s in all_stats: |
| row = {"dataset": s["dataset"], "repo": s["repo"], "text_col": s["text_col"], |
| "total_rows": s["total_rows"], "total_processed": s["total_processed"]} |
| for script in ("ar", "az", "mi", "ALL"): |
| for stat in ("count", "min", "max", "mean", "median"): |
| row[f"{script}_{stat}"] = s[script][stat] |
| w.writerow(row) |
| print(f"Saved CSV: {csv_path}") |
|
|
| json_path = os.path.join(OUTPUT_DIR, "dataset_stats.json") |
| with open(json_path, "w") as f: |
| json.dump(all_stats, f, indent=2, default=str) |
| print(f"Saved JSON: {json_path}") |
|
|
|
|
| def main(): |
| random.seed(42) |
| all_stats = [] |
| for cfg in DATASETS: |
| s = process_dataset( |
| cfg["name"], cfg["repo"], cfg["split"], |
| text_col=cfg.get("text_col"), config=cfg.get("config"), |
| max_rows=cfg.get("max_rows", 0), |
| ) |
| if s: |
| all_stats.append(s) |
| save_results(all_stats) |
|
|
| |
| print(f"\n{'='*120}") |
| print(f"{'Dataset':<22} {'Script':>6} {'Sentences':>10} {'Min':>6} {'Max':>7} " |
| f"{'Mean':>8} {'Median':>7}") |
| print("-" * 120) |
| for s in all_stats: |
| for script in ("ar", "az", "mi", "ALL"): |
| d = s[script] |
| label = "" if script == "ALL" else s["dataset"] |
| print(f"{label:<22} {script:>6} {d['count']:>10,} {d['min']:>6} " |
| f"{d['max']:>7} {d['mean']:>8.1f} {d['median']:>7.0f}") |
| print("-" * 120) |
| print("=" * 120) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|