Spaces:
Sleeping
Sleeping
| """Build train/val/test splits from raw CSVs and save as HuggingFace Datasets. | |
| Pipeline per task: | |
| 1. Load raw CSV from data/raw/. | |
| 2. Apply MultilingualPreprocessor.clean_text() to text fields. | |
| (We do NOT apply normalize_arabic here — we want the model to see all | |
| hamza/tashkeel variants and learn from them. The function remains | |
| available on the preprocessor for inference-time use.) | |
| 3. Drop empty rows + duplicates. | |
| 4. (Classifiers only) Balance: cap each class to min_class * 3 by random | |
| undersampling, so largest:smallest ratio ≤ 3. We do NOT oversample | |
| (avoids leaking duplicates across train/val splits). | |
| 5. Stratified 80/10/10 split: | |
| - lang_detection : stratify by language | |
| - intent : stratify by (language, intent) | |
| - ner : stratify by language only | |
| 6. Save as DatasetDict to data/processed/<task>/ via Arrow format. | |
| Also write a small labels.json with the label_name -> id map per task. | |
| The knowledge_base CSV has no labels and isn't used for training — it is saved | |
| as a single-split Dataset (no train/val/test) for the RAG step in Phase 5. | |
| Final step: a self-test that prints the preprocessor output for the 5 | |
| sentences specified in the project plan. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import sys | |
| from pathlib import Path | |
| from typing import Any | |
| import pandas as pd | |
| from datasets import Dataset, DatasetDict, Features, Sequence, Value, ClassLabel | |
| from sklearn.model_selection import train_test_split | |
| # Make src/ importable as a package for `from preprocessor import ...` | |
| SRC_DIR = Path(__file__).resolve().parent | |
| sys.path.insert(0, str(SRC_DIR)) | |
| from preprocessor import MultilingualPreprocessor # noqa: E402 | |
| PROJECT_ROOT = SRC_DIR.parent | |
| RAW = PROJECT_ROOT / "data" / "raw" | |
| PROCESSED = PROJECT_ROOT / "data" / "processed" | |
| PROCESSED.mkdir(parents=True, exist_ok=True) | |
| SEED = 42 | |
| # ============================================================================ | |
| # Generic helpers | |
| # ============================================================================ | |
| def balance_to_3x(df: pd.DataFrame, label_col: str, max_ratio: int = 3, | |
| seed: int = SEED) -> pd.DataFrame: | |
| """Random-undersample so that largest:smallest class ratio ≤ max_ratio. | |
| Smaller classes are kept as-is (no oversampling). This is intentional: | |
| oversampling before train/test split would leak duplicates. | |
| """ | |
| counts = df[label_col].value_counts() | |
| smallest = int(counts.min()) | |
| cap = smallest * max_ratio | |
| parts: list[pd.DataFrame] = [] | |
| for cls, n in counts.items(): | |
| sub = df[df[label_col] == cls] | |
| if len(sub) > cap: | |
| sub = sub.sample(n=cap, random_state=seed) | |
| parts.append(sub) | |
| out = pd.concat(parts, ignore_index=True) | |
| out = out.sample(frac=1, random_state=seed).reset_index(drop=True) | |
| return out | |
| def stratified_3way_split( | |
| df: pd.DataFrame, | |
| stratify_cols: list[str], | |
| val_frac: float = 0.10, | |
| test_frac: float = 0.10, | |
| seed: int = SEED, | |
| ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: | |
| """Stratified 80/10/10 split. Strata = concat of `stratify_cols`. | |
| Drops rare strata that cannot support a 3-way split (need ≥ 3 examples). | |
| """ | |
| assert 0 < val_frac < 1 and 0 < test_frac < 1 | |
| key_train = df[stratify_cols].astype(str).agg("__".join, axis=1) | |
| # Drop strata with <3 rows (can't be stratified across 3 splits) | |
| counts = key_train.value_counts() | |
| keep = counts[counts >= 3].index | |
| df = df[key_train.isin(keep)].reset_index(drop=True) | |
| key_train = df[stratify_cols].astype(str).agg("__".join, axis=1) | |
| train, temp = train_test_split( | |
| df, test_size=val_frac + test_frac, | |
| stratify=key_train, random_state=seed, | |
| ) | |
| key_temp = temp[stratify_cols].astype(str).agg("__".join, axis=1) | |
| val_size = val_frac / (val_frac + test_frac) | |
| val, test = train_test_split( | |
| temp, test_size=1 - val_size, | |
| stratify=key_temp, random_state=seed, | |
| ) | |
| return (train.reset_index(drop=True), | |
| val.reset_index(drop=True), | |
| test.reset_index(drop=True)) | |
| def save_dataset_dict( | |
| train: pd.DataFrame, val: pd.DataFrame, test: pd.DataFrame, | |
| out_dir: Path, features: Features | None = None, | |
| ) -> None: | |
| """Save train/val/test DataFrames as a HuggingFace DatasetDict on disk.""" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| if features is not None: | |
| ds_train = Dataset.from_pandas(train, features=features, preserve_index=False) | |
| ds_val = Dataset.from_pandas(val, features=features, preserve_index=False) | |
| ds_test = Dataset.from_pandas(test, features=features, preserve_index=False) | |
| else: | |
| ds_train = Dataset.from_pandas(train, preserve_index=False) | |
| ds_val = Dataset.from_pandas(val, preserve_index=False) | |
| ds_test = Dataset.from_pandas(test, preserve_index=False) | |
| DatasetDict({ | |
| "train": ds_train, "validation": ds_val, "test": ds_test, | |
| }).save_to_disk(str(out_dir)) | |
| def write_labels(out_dir: Path, label_to_id: dict[str, int]) -> None: | |
| """Write label_to_id and id_to_label to <out_dir>/labels.json.""" | |
| payload = { | |
| "label_to_id": label_to_id, | |
| "id_to_label": {v: k for k, v in label_to_id.items()}, | |
| } | |
| (out_dir / "labels.json").write_text(json.dumps(payload, indent=2, ensure_ascii=False)) | |
| def print_split_stats(name: str, train: pd.DataFrame, val: pd.DataFrame, | |
| test: pd.DataFrame, group_cols: list[str]) -> None: | |
| """Print per-split row counts and label distribution.""" | |
| print(f"\n [{name}] split sizes: train={len(train)} val={len(val)} test={len(test)}") | |
| for split_name, dfx in [("train", train), ("val", val), ("test", test)]: | |
| if not group_cols: | |
| continue | |
| head = dfx.groupby(group_cols).size() | |
| # Pretty-print as a small table | |
| print(f" {split_name} dist over {group_cols}:") | |
| for line in head.to_string().splitlines(): | |
| print(f" {line}") | |
| # ============================================================================ | |
| # Task 1: Language detection | |
| # ============================================================================ | |
| def build_lang_detection(pre: MultilingualPreprocessor) -> None: | |
| """Build the language-detection dataset (4-class: AR/EN/FR/CS).""" | |
| print("\n" + "=" * 72) | |
| print("Task 1: Language detection") | |
| print("=" * 72) | |
| df = pd.read_csv(RAW / "lang_detection_data.csv") | |
| print(f" Loaded raw rows: {len(df)}") | |
| df["text"] = df["text"].astype(str).map(pre.clean_text) | |
| df = df[df["text"].str.len() > 1] | |
| df = df.drop_duplicates(subset=["text"]).reset_index(drop=True) | |
| print(f" After clean+dedup: {len(df)}") | |
| print(f" Class counts (pre-balance): {df['language'].value_counts().to_dict()}") | |
| df = balance_to_3x(df, "language") | |
| print(f" After 3x balance: {len(df)}") | |
| print(f" Class counts (post): {df['language'].value_counts().to_dict()}") | |
| label_names = sorted(df["language"].unique()) | |
| label_to_id = {n: i for i, n in enumerate(label_names)} | |
| df["label"] = df["language"].map(label_to_id).astype(int) | |
| train, val, test = stratified_3way_split(df, ["language"]) | |
| out_dir = PROCESSED / "lang_detection" | |
| features = Features({ | |
| "text": Value("string"), | |
| "language": Value("string"), | |
| "label": ClassLabel(names=label_names), | |
| }) | |
| save_dataset_dict( | |
| train[["text", "language", "label"]], | |
| val[["text", "language", "label"]], | |
| test[["text", "language", "label"]], | |
| out_dir, features=features, | |
| ) | |
| write_labels(out_dir, label_to_id) | |
| print(f" Saved to: {out_dir}") | |
| print(f" Labels : {label_to_id}") | |
| print_split_stats("lang_detection", train, val, test, ["language"]) | |
| # ============================================================================ | |
| # Task 2: Intent | |
| # ============================================================================ | |
| def build_intent(pre: MultilingualPreprocessor) -> None: | |
| """Build the intent-classification dataset (6 intents x 3 languages).""" | |
| print("\n" + "=" * 72) | |
| print("Task 2: Intent classification") | |
| print("=" * 72) | |
| df = pd.read_csv(RAW / "intent_data.csv") | |
| print(f" Loaded raw rows: {len(df)}") | |
| df["text"] = df["text"].astype(str).map(pre.clean_text) | |
| df = df[df["text"].str.len() > 1] | |
| df = df.drop_duplicates(subset=["text", "intent", "language"]).reset_index(drop=True) | |
| print(f" After clean+dedup: {len(df)}") | |
| print(f" Intent counts (pre): {df['intent'].value_counts().to_dict()}") | |
| df = balance_to_3x(df, "intent") | |
| print(f" Intent counts (post 3x): {df['intent'].value_counts().to_dict()}") | |
| intent_names = sorted(df["intent"].unique()) | |
| intent_to_id = {n: i for i, n in enumerate(intent_names)} | |
| df["label"] = df["intent"].map(intent_to_id).astype(int) | |
| train, val, test = stratified_3way_split(df, ["language", "intent"]) | |
| out_dir = PROCESSED / "intent" | |
| features = Features({ | |
| "text": Value("string"), | |
| "language": Value("string"), | |
| "intent": Value("string"), | |
| "label": ClassLabel(names=intent_names), | |
| }) | |
| save_dataset_dict( | |
| train[["text", "language", "intent", "label"]], | |
| val[["text", "language", "intent", "label"]], | |
| test[["text", "language", "intent", "label"]], | |
| out_dir, features=features, | |
| ) | |
| write_labels(out_dir, intent_to_id) | |
| print(f" Saved to: {out_dir}") | |
| print(f" Labels : {intent_to_id}") | |
| print_split_stats("intent", train, val, test, ["language", "intent"]) | |
| # ============================================================================ | |
| # Task 3: NER | |
| # ============================================================================ | |
| # Unified BIO tag set across wikiann (PER/LOC/ORG) + synthetic (DATE). | |
| NER_LABEL_NAMES = [ | |
| "O", | |
| "B-PER", "I-PER", | |
| "B-LOC", "I-LOC", | |
| "B-ORG", "I-ORG", | |
| "B-DATE", "I-DATE", | |
| ] | |
| def build_ner(pre: MultilingualPreprocessor) -> None: | |
| """Build the NER token-classification dataset. | |
| The raw CSV stores tokens/ner_tags as JSON strings; we decode back to | |
| Python lists of strings, then map tag strings to integer IDs using the | |
| canonical NER_LABEL_NAMES order. | |
| """ | |
| print("\n" + "=" * 72) | |
| print("Task 3: NER (token classification, 9 BIO tags)") | |
| print("=" * 72) | |
| df = pd.read_csv(RAW / "ner_data.csv") | |
| print(f" Loaded raw rows: {len(df)}") | |
| # Decode JSON-string columns | |
| df["tokens"] = df["tokens"].map(json.loads) | |
| df["ner_tags"] = df["ner_tags"].map(json.loads) | |
| # Drop length-mismatched or empty | |
| df = df[df["tokens"].apply(len) == df["ner_tags"].apply(len)] | |
| df = df[df["tokens"].apply(len) > 0] | |
| print(f" After shape filter: {len(df)}") | |
| # Light cleaning per-token | |
| def _clean_tokens(toks: list[str]) -> list[str]: | |
| return [pre.clean_text(t) or t for t in toks] | |
| df["tokens"] = df["tokens"].map(_clean_tokens) | |
| # Validate tags against our scheme. Anything outside NER_LABEL_NAMES is | |
| # mapped to 'O' (defensive — should not happen with our raw data). | |
| label_to_id = {n: i for i, n in enumerate(NER_LABEL_NAMES)} | |
| def _to_ids(tags: list[str]) -> list[int]: | |
| return [label_to_id.get(t, 0) for t in tags] | |
| df["ner_tag_ids"] = df["ner_tags"].map(_to_ids) | |
| # Sanity: report how often each tag appears | |
| flat_tags = [t for tags in df["ner_tags"] for t in tags] | |
| tag_counts = pd.Series(flat_tags).value_counts().to_dict() | |
| print(f" Tag distribution: {tag_counts}") | |
| train, val, test = stratified_3way_split(df, ["language"]) | |
| out_dir = PROCESSED / "ner" | |
| # Use Sequence(Value('string')) for tokens / tags, Sequence(ClassLabel) for ids | |
| features = Features({ | |
| "tokens": Sequence(Value("string")), | |
| "ner_tags": Sequence(Value("string")), | |
| "ner_tag_ids": Sequence(ClassLabel(names=NER_LABEL_NAMES)), | |
| "language": Value("string"), | |
| }) | |
| save_dataset_dict( | |
| train[["tokens", "ner_tags", "ner_tag_ids", "language"]], | |
| val[["tokens", "ner_tags", "ner_tag_ids", "language"]], | |
| test[["tokens", "ner_tags", "ner_tag_ids", "language"]], | |
| out_dir, features=features, | |
| ) | |
| write_labels(out_dir, label_to_id) | |
| print(f" Saved to: {out_dir}") | |
| print(f" Labels : {label_to_id}") | |
| print_split_stats("ner", train, val, test, ["language"]) | |
| # ============================================================================ | |
| # Task 4: Knowledge base | |
| # ============================================================================ | |
| def build_knowledge_base(pre: MultilingualPreprocessor) -> None: | |
| """Save the FAQ knowledge base as a single-split Dataset for RAG.""" | |
| print("\n" + "=" * 72) | |
| print("Task 4: Knowledge base (single split, no train/val/test)") | |
| print("=" * 72) | |
| df = pd.read_csv(RAW / "knowledge_base.csv") | |
| df["question"] = df["question"].astype(str).map(pre.clean_text) | |
| df["answer"] = df["answer"].astype(str).map(pre.clean_text) | |
| df = df.drop_duplicates(subset=["question", "answer", "language"]).reset_index(drop=True) | |
| print(f" Cleaned rows: {len(df)}") | |
| out_dir = PROCESSED / "knowledge_base" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| Dataset.from_pandas(df, preserve_index=False).save_to_disk(str(out_dir)) | |
| print(f" Saved to: {out_dir}") | |
| print(f" Topics : {df['topic'].value_counts().to_dict()}") | |
| # ============================================================================ | |
| # Preprocessor self-test | |
| # ============================================================================ | |
| def preprocessor_self_test(pre: MultilingualPreprocessor) -> None: | |
| """Run the 5 spec-mandated test sentences through the preprocessor.""" | |
| print("\n" + "=" * 72) | |
| print("Preprocessor self-test (5 spec-mandated sentences)") | |
| print("=" * 72) | |
| cases = [ | |
| "ana bde booking بكرا please", | |
| "j'ai un problème avec mon compte", | |
| "I want to cancel my order الرجاء", | |
| "مرحبا hello bonjour كيف حالك", | |
| "3andi mochkil m3a l'application", | |
| ] | |
| expected = ["CS", "FR", "CS", "CS", "CS"] | |
| n_correct = 0 | |
| for sent, exp in zip(cases, expected): | |
| lang = pre.detect_language(sent) | |
| arabizi = pre.detect_arabizi(sent) | |
| cleaned = pre.clean_text(sent) | |
| norm_ar = pre.normalize_arabic(sent) | |
| # XLM-R tokenisation (just preview the first 12 ids/strings) | |
| ids = pre.tokenize_for_xlmr(sent, max_length=64)["input_ids"] | |
| toks = pre.tokenizer.convert_ids_to_tokens(ids)[:12] | |
| ok = "✓" if lang == exp else "✗" | |
| n_correct += int(lang == exp) | |
| print(f"\n{ok} {sent!r}") | |
| print(f" expected language : {exp}") | |
| print(f" detect_language : {lang}") | |
| print(f" detect_arabizi : {arabizi}") | |
| print(f" clean_text : {cleaned!r}") | |
| print(f" normalize_arabic : {norm_ar!r}") | |
| print(f" xlmr toks (first 12): {toks}") | |
| print(f"\n ==> {n_correct}/{len(cases)} correct on language detection.") | |
| # ============================================================================ | |
| # main | |
| # ============================================================================ | |
| def main() -> int: | |
| """Run all four dataset-build tasks plus the preprocessor self-test.""" | |
| print("=" * 72) | |
| print("Build processed datasets") | |
| print("=" * 72) | |
| print(f"Raw dir : {RAW}") | |
| print(f"Processed dir : {PROCESSED}") | |
| pre = MultilingualPreprocessor() | |
| build_lang_detection(pre) | |
| build_intent(pre) | |
| build_ner(pre) | |
| build_knowledge_base(pre) | |
| preprocessor_self_test(pre) | |
| # Summary list of artefacts | |
| print("\n" + "=" * 72) | |
| print("ARTEFACTS") | |
| print("=" * 72) | |
| for sub in ("lang_detection", "intent", "ner", "knowledge_base"): | |
| d = PROCESSED / sub | |
| if d.exists(): | |
| entries = sorted(p.name for p in d.iterdir()) | |
| print(f" {d}") | |
| for e in entries: | |
| print(f" - {e}") | |
| return 0 | |
| if __name__ == "__main__": | |
| try: | |
| sys.exit(main()) | |
| except KeyboardInterrupt: | |
| print("\nAborted by user.") | |
| sys.exit(130) | |