multilingual-chatbot / src /build_datasets.py
momenalhamza's picture
Deploy chatbot: code + RAG + Qwen (3 BERT classifiers loaded from HF Hub)
469ef7f verified
"""Build train/val/test splits from raw CSVs and save as HuggingFace Datasets.
Pipeline per task:
1. Load raw CSV from data/raw/.
2. Apply MultilingualPreprocessor.clean_text() to text fields.
(We do NOT apply normalize_arabic here — we want the model to see all
hamza/tashkeel variants and learn from them. The function remains
available on the preprocessor for inference-time use.)
3. Drop empty rows + duplicates.
4. (Classifiers only) Balance: cap each class to min_class * 3 by random
undersampling, so largest:smallest ratio ≤ 3. We do NOT oversample
(avoids leaking duplicates across train/val splits).
5. Stratified 80/10/10 split:
- lang_detection : stratify by language
- intent : stratify by (language, intent)
- ner : stratify by language only
6. Save as DatasetDict to data/processed/<task>/ via Arrow format.
Also write a small labels.json with the label_name -> id map per task.
The knowledge_base CSV has no labels and isn't used for training — it is saved
as a single-split Dataset (no train/val/test) for the RAG step in Phase 5.
Final step: a self-test that prints the preprocessor output for the 5
sentences specified in the project plan.
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
from typing import Any
import pandas as pd
from datasets import Dataset, DatasetDict, Features, Sequence, Value, ClassLabel
from sklearn.model_selection import train_test_split
# Make src/ importable as a package for `from preprocessor import ...`
SRC_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(SRC_DIR))
from preprocessor import MultilingualPreprocessor # noqa: E402
PROJECT_ROOT = SRC_DIR.parent
RAW = PROJECT_ROOT / "data" / "raw"
PROCESSED = PROJECT_ROOT / "data" / "processed"
PROCESSED.mkdir(parents=True, exist_ok=True)
SEED = 42
# ============================================================================
# Generic helpers
# ============================================================================
def balance_to_3x(df: pd.DataFrame, label_col: str, max_ratio: int = 3,
seed: int = SEED) -> pd.DataFrame:
"""Random-undersample so that largest:smallest class ratio ≤ max_ratio.
Smaller classes are kept as-is (no oversampling). This is intentional:
oversampling before train/test split would leak duplicates.
"""
counts = df[label_col].value_counts()
smallest = int(counts.min())
cap = smallest * max_ratio
parts: list[pd.DataFrame] = []
for cls, n in counts.items():
sub = df[df[label_col] == cls]
if len(sub) > cap:
sub = sub.sample(n=cap, random_state=seed)
parts.append(sub)
out = pd.concat(parts, ignore_index=True)
out = out.sample(frac=1, random_state=seed).reset_index(drop=True)
return out
def stratified_3way_split(
df: pd.DataFrame,
stratify_cols: list[str],
val_frac: float = 0.10,
test_frac: float = 0.10,
seed: int = SEED,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Stratified 80/10/10 split. Strata = concat of `stratify_cols`.
Drops rare strata that cannot support a 3-way split (need ≥ 3 examples).
"""
assert 0 < val_frac < 1 and 0 < test_frac < 1
key_train = df[stratify_cols].astype(str).agg("__".join, axis=1)
# Drop strata with <3 rows (can't be stratified across 3 splits)
counts = key_train.value_counts()
keep = counts[counts >= 3].index
df = df[key_train.isin(keep)].reset_index(drop=True)
key_train = df[stratify_cols].astype(str).agg("__".join, axis=1)
train, temp = train_test_split(
df, test_size=val_frac + test_frac,
stratify=key_train, random_state=seed,
)
key_temp = temp[stratify_cols].astype(str).agg("__".join, axis=1)
val_size = val_frac / (val_frac + test_frac)
val, test = train_test_split(
temp, test_size=1 - val_size,
stratify=key_temp, random_state=seed,
)
return (train.reset_index(drop=True),
val.reset_index(drop=True),
test.reset_index(drop=True))
def save_dataset_dict(
train: pd.DataFrame, val: pd.DataFrame, test: pd.DataFrame,
out_dir: Path, features: Features | None = None,
) -> None:
"""Save train/val/test DataFrames as a HuggingFace DatasetDict on disk."""
out_dir.mkdir(parents=True, exist_ok=True)
if features is not None:
ds_train = Dataset.from_pandas(train, features=features, preserve_index=False)
ds_val = Dataset.from_pandas(val, features=features, preserve_index=False)
ds_test = Dataset.from_pandas(test, features=features, preserve_index=False)
else:
ds_train = Dataset.from_pandas(train, preserve_index=False)
ds_val = Dataset.from_pandas(val, preserve_index=False)
ds_test = Dataset.from_pandas(test, preserve_index=False)
DatasetDict({
"train": ds_train, "validation": ds_val, "test": ds_test,
}).save_to_disk(str(out_dir))
def write_labels(out_dir: Path, label_to_id: dict[str, int]) -> None:
"""Write label_to_id and id_to_label to <out_dir>/labels.json."""
payload = {
"label_to_id": label_to_id,
"id_to_label": {v: k for k, v in label_to_id.items()},
}
(out_dir / "labels.json").write_text(json.dumps(payload, indent=2, ensure_ascii=False))
def print_split_stats(name: str, train: pd.DataFrame, val: pd.DataFrame,
test: pd.DataFrame, group_cols: list[str]) -> None:
"""Print per-split row counts and label distribution."""
print(f"\n [{name}] split sizes: train={len(train)} val={len(val)} test={len(test)}")
for split_name, dfx in [("train", train), ("val", val), ("test", test)]:
if not group_cols:
continue
head = dfx.groupby(group_cols).size()
# Pretty-print as a small table
print(f" {split_name} dist over {group_cols}:")
for line in head.to_string().splitlines():
print(f" {line}")
# ============================================================================
# Task 1: Language detection
# ============================================================================
def build_lang_detection(pre: MultilingualPreprocessor) -> None:
"""Build the language-detection dataset (4-class: AR/EN/FR/CS)."""
print("\n" + "=" * 72)
print("Task 1: Language detection")
print("=" * 72)
df = pd.read_csv(RAW / "lang_detection_data.csv")
print(f" Loaded raw rows: {len(df)}")
df["text"] = df["text"].astype(str).map(pre.clean_text)
df = df[df["text"].str.len() > 1]
df = df.drop_duplicates(subset=["text"]).reset_index(drop=True)
print(f" After clean+dedup: {len(df)}")
print(f" Class counts (pre-balance): {df['language'].value_counts().to_dict()}")
df = balance_to_3x(df, "language")
print(f" After 3x balance: {len(df)}")
print(f" Class counts (post): {df['language'].value_counts().to_dict()}")
label_names = sorted(df["language"].unique())
label_to_id = {n: i for i, n in enumerate(label_names)}
df["label"] = df["language"].map(label_to_id).astype(int)
train, val, test = stratified_3way_split(df, ["language"])
out_dir = PROCESSED / "lang_detection"
features = Features({
"text": Value("string"),
"language": Value("string"),
"label": ClassLabel(names=label_names),
})
save_dataset_dict(
train[["text", "language", "label"]],
val[["text", "language", "label"]],
test[["text", "language", "label"]],
out_dir, features=features,
)
write_labels(out_dir, label_to_id)
print(f" Saved to: {out_dir}")
print(f" Labels : {label_to_id}")
print_split_stats("lang_detection", train, val, test, ["language"])
# ============================================================================
# Task 2: Intent
# ============================================================================
def build_intent(pre: MultilingualPreprocessor) -> None:
"""Build the intent-classification dataset (6 intents x 3 languages)."""
print("\n" + "=" * 72)
print("Task 2: Intent classification")
print("=" * 72)
df = pd.read_csv(RAW / "intent_data.csv")
print(f" Loaded raw rows: {len(df)}")
df["text"] = df["text"].astype(str).map(pre.clean_text)
df = df[df["text"].str.len() > 1]
df = df.drop_duplicates(subset=["text", "intent", "language"]).reset_index(drop=True)
print(f" After clean+dedup: {len(df)}")
print(f" Intent counts (pre): {df['intent'].value_counts().to_dict()}")
df = balance_to_3x(df, "intent")
print(f" Intent counts (post 3x): {df['intent'].value_counts().to_dict()}")
intent_names = sorted(df["intent"].unique())
intent_to_id = {n: i for i, n in enumerate(intent_names)}
df["label"] = df["intent"].map(intent_to_id).astype(int)
train, val, test = stratified_3way_split(df, ["language", "intent"])
out_dir = PROCESSED / "intent"
features = Features({
"text": Value("string"),
"language": Value("string"),
"intent": Value("string"),
"label": ClassLabel(names=intent_names),
})
save_dataset_dict(
train[["text", "language", "intent", "label"]],
val[["text", "language", "intent", "label"]],
test[["text", "language", "intent", "label"]],
out_dir, features=features,
)
write_labels(out_dir, intent_to_id)
print(f" Saved to: {out_dir}")
print(f" Labels : {intent_to_id}")
print_split_stats("intent", train, val, test, ["language", "intent"])
# ============================================================================
# Task 3: NER
# ============================================================================
# Unified BIO tag set across wikiann (PER/LOC/ORG) + synthetic (DATE).
NER_LABEL_NAMES = [
"O",
"B-PER", "I-PER",
"B-LOC", "I-LOC",
"B-ORG", "I-ORG",
"B-DATE", "I-DATE",
]
def build_ner(pre: MultilingualPreprocessor) -> None:
"""Build the NER token-classification dataset.
The raw CSV stores tokens/ner_tags as JSON strings; we decode back to
Python lists of strings, then map tag strings to integer IDs using the
canonical NER_LABEL_NAMES order.
"""
print("\n" + "=" * 72)
print("Task 3: NER (token classification, 9 BIO tags)")
print("=" * 72)
df = pd.read_csv(RAW / "ner_data.csv")
print(f" Loaded raw rows: {len(df)}")
# Decode JSON-string columns
df["tokens"] = df["tokens"].map(json.loads)
df["ner_tags"] = df["ner_tags"].map(json.loads)
# Drop length-mismatched or empty
df = df[df["tokens"].apply(len) == df["ner_tags"].apply(len)]
df = df[df["tokens"].apply(len) > 0]
print(f" After shape filter: {len(df)}")
# Light cleaning per-token
def _clean_tokens(toks: list[str]) -> list[str]:
return [pre.clean_text(t) or t for t in toks]
df["tokens"] = df["tokens"].map(_clean_tokens)
# Validate tags against our scheme. Anything outside NER_LABEL_NAMES is
# mapped to 'O' (defensive — should not happen with our raw data).
label_to_id = {n: i for i, n in enumerate(NER_LABEL_NAMES)}
def _to_ids(tags: list[str]) -> list[int]:
return [label_to_id.get(t, 0) for t in tags]
df["ner_tag_ids"] = df["ner_tags"].map(_to_ids)
# Sanity: report how often each tag appears
flat_tags = [t for tags in df["ner_tags"] for t in tags]
tag_counts = pd.Series(flat_tags).value_counts().to_dict()
print(f" Tag distribution: {tag_counts}")
train, val, test = stratified_3way_split(df, ["language"])
out_dir = PROCESSED / "ner"
# Use Sequence(Value('string')) for tokens / tags, Sequence(ClassLabel) for ids
features = Features({
"tokens": Sequence(Value("string")),
"ner_tags": Sequence(Value("string")),
"ner_tag_ids": Sequence(ClassLabel(names=NER_LABEL_NAMES)),
"language": Value("string"),
})
save_dataset_dict(
train[["tokens", "ner_tags", "ner_tag_ids", "language"]],
val[["tokens", "ner_tags", "ner_tag_ids", "language"]],
test[["tokens", "ner_tags", "ner_tag_ids", "language"]],
out_dir, features=features,
)
write_labels(out_dir, label_to_id)
print(f" Saved to: {out_dir}")
print(f" Labels : {label_to_id}")
print_split_stats("ner", train, val, test, ["language"])
# ============================================================================
# Task 4: Knowledge base
# ============================================================================
def build_knowledge_base(pre: MultilingualPreprocessor) -> None:
"""Save the FAQ knowledge base as a single-split Dataset for RAG."""
print("\n" + "=" * 72)
print("Task 4: Knowledge base (single split, no train/val/test)")
print("=" * 72)
df = pd.read_csv(RAW / "knowledge_base.csv")
df["question"] = df["question"].astype(str).map(pre.clean_text)
df["answer"] = df["answer"].astype(str).map(pre.clean_text)
df = df.drop_duplicates(subset=["question", "answer", "language"]).reset_index(drop=True)
print(f" Cleaned rows: {len(df)}")
out_dir = PROCESSED / "knowledge_base"
out_dir.mkdir(parents=True, exist_ok=True)
Dataset.from_pandas(df, preserve_index=False).save_to_disk(str(out_dir))
print(f" Saved to: {out_dir}")
print(f" Topics : {df['topic'].value_counts().to_dict()}")
# ============================================================================
# Preprocessor self-test
# ============================================================================
def preprocessor_self_test(pre: MultilingualPreprocessor) -> None:
"""Run the 5 spec-mandated test sentences through the preprocessor."""
print("\n" + "=" * 72)
print("Preprocessor self-test (5 spec-mandated sentences)")
print("=" * 72)
cases = [
"ana bde booking بكرا please",
"j'ai un problème avec mon compte",
"I want to cancel my order الرجاء",
"مرحبا hello bonjour كيف حالك",
"3andi mochkil m3a l'application",
]
expected = ["CS", "FR", "CS", "CS", "CS"]
n_correct = 0
for sent, exp in zip(cases, expected):
lang = pre.detect_language(sent)
arabizi = pre.detect_arabizi(sent)
cleaned = pre.clean_text(sent)
norm_ar = pre.normalize_arabic(sent)
# XLM-R tokenisation (just preview the first 12 ids/strings)
ids = pre.tokenize_for_xlmr(sent, max_length=64)["input_ids"]
toks = pre.tokenizer.convert_ids_to_tokens(ids)[:12]
ok = "✓" if lang == exp else "✗"
n_correct += int(lang == exp)
print(f"\n{ok} {sent!r}")
print(f" expected language : {exp}")
print(f" detect_language : {lang}")
print(f" detect_arabizi : {arabizi}")
print(f" clean_text : {cleaned!r}")
print(f" normalize_arabic : {norm_ar!r}")
print(f" xlmr toks (first 12): {toks}")
print(f"\n ==> {n_correct}/{len(cases)} correct on language detection.")
# ============================================================================
# main
# ============================================================================
def main() -> int:
"""Run all four dataset-build tasks plus the preprocessor self-test."""
print("=" * 72)
print("Build processed datasets")
print("=" * 72)
print(f"Raw dir : {RAW}")
print(f"Processed dir : {PROCESSED}")
pre = MultilingualPreprocessor()
build_lang_detection(pre)
build_intent(pre)
build_ner(pre)
build_knowledge_base(pre)
preprocessor_self_test(pre)
# Summary list of artefacts
print("\n" + "=" * 72)
print("ARTEFACTS")
print("=" * 72)
for sub in ("lang_detection", "intent", "ner", "knowledge_base"):
d = PROCESSED / sub
if d.exists():
entries = sorted(p.name for p in d.iterdir())
print(f" {d}")
for e in entries:
print(f" - {e}")
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
print("\nAborted by user.")
sys.exit(130)