"""Build train/val/test splits from raw CSVs and save as HuggingFace Datasets. Pipeline per task: 1. Load raw CSV from data/raw/. 2. Apply MultilingualPreprocessor.clean_text() to text fields. (We do NOT apply normalize_arabic here — we want the model to see all hamza/tashkeel variants and learn from them. The function remains available on the preprocessor for inference-time use.) 3. Drop empty rows + duplicates. 4. (Classifiers only) Balance: cap each class to min_class * 3 by random undersampling, so largest:smallest ratio ≤ 3. We do NOT oversample (avoids leaking duplicates across train/val splits). 5. Stratified 80/10/10 split: - lang_detection : stratify by language - intent : stratify by (language, intent) - ner : stratify by language only 6. Save as DatasetDict to data/processed// via Arrow format. Also write a small labels.json with the label_name -> id map per task. The knowledge_base CSV has no labels and isn't used for training — it is saved as a single-split Dataset (no train/val/test) for the RAG step in Phase 5. Final step: a self-test that prints the preprocessor output for the 5 sentences specified in the project plan. """ from __future__ import annotations import json import sys from pathlib import Path from typing import Any import pandas as pd from datasets import Dataset, DatasetDict, Features, Sequence, Value, ClassLabel from sklearn.model_selection import train_test_split # Make src/ importable as a package for `from preprocessor import ...` SRC_DIR = Path(__file__).resolve().parent sys.path.insert(0, str(SRC_DIR)) from preprocessor import MultilingualPreprocessor # noqa: E402 PROJECT_ROOT = SRC_DIR.parent RAW = PROJECT_ROOT / "data" / "raw" PROCESSED = PROJECT_ROOT / "data" / "processed" PROCESSED.mkdir(parents=True, exist_ok=True) SEED = 42 # ============================================================================ # Generic helpers # ============================================================================ def balance_to_3x(df: pd.DataFrame, label_col: str, max_ratio: int = 3, seed: int = SEED) -> pd.DataFrame: """Random-undersample so that largest:smallest class ratio ≤ max_ratio. Smaller classes are kept as-is (no oversampling). This is intentional: oversampling before train/test split would leak duplicates. """ counts = df[label_col].value_counts() smallest = int(counts.min()) cap = smallest * max_ratio parts: list[pd.DataFrame] = [] for cls, n in counts.items(): sub = df[df[label_col] == cls] if len(sub) > cap: sub = sub.sample(n=cap, random_state=seed) parts.append(sub) out = pd.concat(parts, ignore_index=True) out = out.sample(frac=1, random_state=seed).reset_index(drop=True) return out def stratified_3way_split( df: pd.DataFrame, stratify_cols: list[str], val_frac: float = 0.10, test_frac: float = 0.10, seed: int = SEED, ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """Stratified 80/10/10 split. Strata = concat of `stratify_cols`. Drops rare strata that cannot support a 3-way split (need ≥ 3 examples). """ assert 0 < val_frac < 1 and 0 < test_frac < 1 key_train = df[stratify_cols].astype(str).agg("__".join, axis=1) # Drop strata with <3 rows (can't be stratified across 3 splits) counts = key_train.value_counts() keep = counts[counts >= 3].index df = df[key_train.isin(keep)].reset_index(drop=True) key_train = df[stratify_cols].astype(str).agg("__".join, axis=1) train, temp = train_test_split( df, test_size=val_frac + test_frac, stratify=key_train, random_state=seed, ) key_temp = temp[stratify_cols].astype(str).agg("__".join, axis=1) val_size = val_frac / (val_frac + test_frac) val, test = train_test_split( temp, test_size=1 - val_size, stratify=key_temp, random_state=seed, ) return (train.reset_index(drop=True), val.reset_index(drop=True), test.reset_index(drop=True)) def save_dataset_dict( train: pd.DataFrame, val: pd.DataFrame, test: pd.DataFrame, out_dir: Path, features: Features | None = None, ) -> None: """Save train/val/test DataFrames as a HuggingFace DatasetDict on disk.""" out_dir.mkdir(parents=True, exist_ok=True) if features is not None: ds_train = Dataset.from_pandas(train, features=features, preserve_index=False) ds_val = Dataset.from_pandas(val, features=features, preserve_index=False) ds_test = Dataset.from_pandas(test, features=features, preserve_index=False) else: ds_train = Dataset.from_pandas(train, preserve_index=False) ds_val = Dataset.from_pandas(val, preserve_index=False) ds_test = Dataset.from_pandas(test, preserve_index=False) DatasetDict({ "train": ds_train, "validation": ds_val, "test": ds_test, }).save_to_disk(str(out_dir)) def write_labels(out_dir: Path, label_to_id: dict[str, int]) -> None: """Write label_to_id and id_to_label to /labels.json.""" payload = { "label_to_id": label_to_id, "id_to_label": {v: k for k, v in label_to_id.items()}, } (out_dir / "labels.json").write_text(json.dumps(payload, indent=2, ensure_ascii=False)) def print_split_stats(name: str, train: pd.DataFrame, val: pd.DataFrame, test: pd.DataFrame, group_cols: list[str]) -> None: """Print per-split row counts and label distribution.""" print(f"\n [{name}] split sizes: train={len(train)} val={len(val)} test={len(test)}") for split_name, dfx in [("train", train), ("val", val), ("test", test)]: if not group_cols: continue head = dfx.groupby(group_cols).size() # Pretty-print as a small table print(f" {split_name} dist over {group_cols}:") for line in head.to_string().splitlines(): print(f" {line}") # ============================================================================ # Task 1: Language detection # ============================================================================ def build_lang_detection(pre: MultilingualPreprocessor) -> None: """Build the language-detection dataset (4-class: AR/EN/FR/CS).""" print("\n" + "=" * 72) print("Task 1: Language detection") print("=" * 72) df = pd.read_csv(RAW / "lang_detection_data.csv") print(f" Loaded raw rows: {len(df)}") df["text"] = df["text"].astype(str).map(pre.clean_text) df = df[df["text"].str.len() > 1] df = df.drop_duplicates(subset=["text"]).reset_index(drop=True) print(f" After clean+dedup: {len(df)}") print(f" Class counts (pre-balance): {df['language'].value_counts().to_dict()}") df = balance_to_3x(df, "language") print(f" After 3x balance: {len(df)}") print(f" Class counts (post): {df['language'].value_counts().to_dict()}") label_names = sorted(df["language"].unique()) label_to_id = {n: i for i, n in enumerate(label_names)} df["label"] = df["language"].map(label_to_id).astype(int) train, val, test = stratified_3way_split(df, ["language"]) out_dir = PROCESSED / "lang_detection" features = Features({ "text": Value("string"), "language": Value("string"), "label": ClassLabel(names=label_names), }) save_dataset_dict( train[["text", "language", "label"]], val[["text", "language", "label"]], test[["text", "language", "label"]], out_dir, features=features, ) write_labels(out_dir, label_to_id) print(f" Saved to: {out_dir}") print(f" Labels : {label_to_id}") print_split_stats("lang_detection", train, val, test, ["language"]) # ============================================================================ # Task 2: Intent # ============================================================================ def build_intent(pre: MultilingualPreprocessor) -> None: """Build the intent-classification dataset (6 intents x 3 languages).""" print("\n" + "=" * 72) print("Task 2: Intent classification") print("=" * 72) df = pd.read_csv(RAW / "intent_data.csv") print(f" Loaded raw rows: {len(df)}") df["text"] = df["text"].astype(str).map(pre.clean_text) df = df[df["text"].str.len() > 1] df = df.drop_duplicates(subset=["text", "intent", "language"]).reset_index(drop=True) print(f" After clean+dedup: {len(df)}") print(f" Intent counts (pre): {df['intent'].value_counts().to_dict()}") df = balance_to_3x(df, "intent") print(f" Intent counts (post 3x): {df['intent'].value_counts().to_dict()}") intent_names = sorted(df["intent"].unique()) intent_to_id = {n: i for i, n in enumerate(intent_names)} df["label"] = df["intent"].map(intent_to_id).astype(int) train, val, test = stratified_3way_split(df, ["language", "intent"]) out_dir = PROCESSED / "intent" features = Features({ "text": Value("string"), "language": Value("string"), "intent": Value("string"), "label": ClassLabel(names=intent_names), }) save_dataset_dict( train[["text", "language", "intent", "label"]], val[["text", "language", "intent", "label"]], test[["text", "language", "intent", "label"]], out_dir, features=features, ) write_labels(out_dir, intent_to_id) print(f" Saved to: {out_dir}") print(f" Labels : {intent_to_id}") print_split_stats("intent", train, val, test, ["language", "intent"]) # ============================================================================ # Task 3: NER # ============================================================================ # Unified BIO tag set across wikiann (PER/LOC/ORG) + synthetic (DATE). NER_LABEL_NAMES = [ "O", "B-PER", "I-PER", "B-LOC", "I-LOC", "B-ORG", "I-ORG", "B-DATE", "I-DATE", ] def build_ner(pre: MultilingualPreprocessor) -> None: """Build the NER token-classification dataset. The raw CSV stores tokens/ner_tags as JSON strings; we decode back to Python lists of strings, then map tag strings to integer IDs using the canonical NER_LABEL_NAMES order. """ print("\n" + "=" * 72) print("Task 3: NER (token classification, 9 BIO tags)") print("=" * 72) df = pd.read_csv(RAW / "ner_data.csv") print(f" Loaded raw rows: {len(df)}") # Decode JSON-string columns df["tokens"] = df["tokens"].map(json.loads) df["ner_tags"] = df["ner_tags"].map(json.loads) # Drop length-mismatched or empty df = df[df["tokens"].apply(len) == df["ner_tags"].apply(len)] df = df[df["tokens"].apply(len) > 0] print(f" After shape filter: {len(df)}") # Light cleaning per-token def _clean_tokens(toks: list[str]) -> list[str]: return [pre.clean_text(t) or t for t in toks] df["tokens"] = df["tokens"].map(_clean_tokens) # Validate tags against our scheme. Anything outside NER_LABEL_NAMES is # mapped to 'O' (defensive — should not happen with our raw data). label_to_id = {n: i for i, n in enumerate(NER_LABEL_NAMES)} def _to_ids(tags: list[str]) -> list[int]: return [label_to_id.get(t, 0) for t in tags] df["ner_tag_ids"] = df["ner_tags"].map(_to_ids) # Sanity: report how often each tag appears flat_tags = [t for tags in df["ner_tags"] for t in tags] tag_counts = pd.Series(flat_tags).value_counts().to_dict() print(f" Tag distribution: {tag_counts}") train, val, test = stratified_3way_split(df, ["language"]) out_dir = PROCESSED / "ner" # Use Sequence(Value('string')) for tokens / tags, Sequence(ClassLabel) for ids features = Features({ "tokens": Sequence(Value("string")), "ner_tags": Sequence(Value("string")), "ner_tag_ids": Sequence(ClassLabel(names=NER_LABEL_NAMES)), "language": Value("string"), }) save_dataset_dict( train[["tokens", "ner_tags", "ner_tag_ids", "language"]], val[["tokens", "ner_tags", "ner_tag_ids", "language"]], test[["tokens", "ner_tags", "ner_tag_ids", "language"]], out_dir, features=features, ) write_labels(out_dir, label_to_id) print(f" Saved to: {out_dir}") print(f" Labels : {label_to_id}") print_split_stats("ner", train, val, test, ["language"]) # ============================================================================ # Task 4: Knowledge base # ============================================================================ def build_knowledge_base(pre: MultilingualPreprocessor) -> None: """Save the FAQ knowledge base as a single-split Dataset for RAG.""" print("\n" + "=" * 72) print("Task 4: Knowledge base (single split, no train/val/test)") print("=" * 72) df = pd.read_csv(RAW / "knowledge_base.csv") df["question"] = df["question"].astype(str).map(pre.clean_text) df["answer"] = df["answer"].astype(str).map(pre.clean_text) df = df.drop_duplicates(subset=["question", "answer", "language"]).reset_index(drop=True) print(f" Cleaned rows: {len(df)}") out_dir = PROCESSED / "knowledge_base" out_dir.mkdir(parents=True, exist_ok=True) Dataset.from_pandas(df, preserve_index=False).save_to_disk(str(out_dir)) print(f" Saved to: {out_dir}") print(f" Topics : {df['topic'].value_counts().to_dict()}") # ============================================================================ # Preprocessor self-test # ============================================================================ def preprocessor_self_test(pre: MultilingualPreprocessor) -> None: """Run the 5 spec-mandated test sentences through the preprocessor.""" print("\n" + "=" * 72) print("Preprocessor self-test (5 spec-mandated sentences)") print("=" * 72) cases = [ "ana bde booking بكرا please", "j'ai un problème avec mon compte", "I want to cancel my order الرجاء", "مرحبا hello bonjour كيف حالك", "3andi mochkil m3a l'application", ] expected = ["CS", "FR", "CS", "CS", "CS"] n_correct = 0 for sent, exp in zip(cases, expected): lang = pre.detect_language(sent) arabizi = pre.detect_arabizi(sent) cleaned = pre.clean_text(sent) norm_ar = pre.normalize_arabic(sent) # XLM-R tokenisation (just preview the first 12 ids/strings) ids = pre.tokenize_for_xlmr(sent, max_length=64)["input_ids"] toks = pre.tokenizer.convert_ids_to_tokens(ids)[:12] ok = "✓" if lang == exp else "✗" n_correct += int(lang == exp) print(f"\n{ok} {sent!r}") print(f" expected language : {exp}") print(f" detect_language : {lang}") print(f" detect_arabizi : {arabizi}") print(f" clean_text : {cleaned!r}") print(f" normalize_arabic : {norm_ar!r}") print(f" xlmr toks (first 12): {toks}") print(f"\n ==> {n_correct}/{len(cases)} correct on language detection.") # ============================================================================ # main # ============================================================================ def main() -> int: """Run all four dataset-build tasks plus the preprocessor self-test.""" print("=" * 72) print("Build processed datasets") print("=" * 72) print(f"Raw dir : {RAW}") print(f"Processed dir : {PROCESSED}") pre = MultilingualPreprocessor() build_lang_detection(pre) build_intent(pre) build_ner(pre) build_knowledge_base(pre) preprocessor_self_test(pre) # Summary list of artefacts print("\n" + "=" * 72) print("ARTEFACTS") print("=" * 72) for sub in ("lang_detection", "intent", "ner", "knowledge_base"): d = PROCESSED / sub if d.exists(): entries = sorted(p.name for p in d.iterdir()) print(f" {d}") for e in entries: print(f" - {e}") return 0 if __name__ == "__main__": try: sys.exit(main()) except KeyboardInterrupt: print("\nAborted by user.") sys.exit(130)