"""Repair data/raw/intent_data.csv after MASSIVE loader-script failure. Background: In the previous run, AmazonScience/massive failed to load because `datasets` 4.x no longer runs loader scripts (massive.py). We ended up with only 170 synthetic intent rows — too few to train a 6-class classifier reliably. Recovery strategy (try in order, stop at first success): 1. mteb/amazon_massive_intent — community parquet fork on HF Hub, no loader script. Fast if it works. 2. Direct download of the official MASSIVE 1.1 tar.gz from Amazon S3. Reliable and authoritative — official Amazon distribution. ~100 MB cached at data/raw/.massive-1.1.tar.gz. 3. Heavily expanded synthetic templates (~2500 examples). Always succeeds; produces less natural data but enough to train. We MERGE whatever we get with the existing 170 rows from collect_data.py (those came from synthetic_intent_data() and are still useful), then deduplicate and write data/raw/intent_data.csv. Other CSVs (lang_detection, ner, knowledge_base) are not touched. """ from __future__ import annotations import io import json import random import sys import tarfile from pathlib import Path from typing import Iterable import pandas as pd from tqdm import tqdm PROJECT_ROOT = Path(__file__).resolve().parent.parent RAW = PROJECT_ROOT / "data" / "raw" INTENT_CSV = RAW / "intent_data.csv" MASSIVE_CACHE = RAW / ".massive-1.1.tar.gz" random.seed(42) # ============================================================================ # Same MASSIVE -> our 6-class mapping as in collect_data.py # ============================================================================ INTENT_MAPPING: dict[str, set[str]] = { "booking": { "takeaway_order", "transport_taxi", "transport_ticket", "calendar_set", "email_sendemail", "alarm_set", "lists_createoradd", "iot_coffee", }, "inquiry": { "alarm_query", "calendar_query", "cooking_query", "cooking_recipe", "datetime_query", "datetime_convert", "email_query", "email_querycontact", "lists_query", "music_query", "news_query", "qa_currency", "qa_definition", "qa_factoid", "qa_maths", "qa_stock", "recommendation_events", "recommendation_locations", "recommendation_movies", "social_query", "takeaway_query", "transport_query", "transport_traffic", "weather_query", "audio_volume_other", }, "greeting": {"general_greet"}, } def map_massive_intent(name: str) -> str: """Return our 6-class label given a raw MASSIVE intent name.""" for cls, names in INTENT_MAPPING.items(): if name in names: return cls return "other" LANG_MAP_FULL = {"ar-SA": "AR", "en-US": "EN", "fr-FR": "FR"} # ============================================================================ # APPROACH 1: mteb/amazon_massive_intent (parquet on HF Hub) # ============================================================================ def try_mteb_massive() -> pd.DataFrame: """Try the parquet-only mteb fork. Configs vary across mirrors, so we probe a few candidate config names per language. """ try: from datasets import load_dataset except Exception as exc: # noqa: BLE001 print(f" [WARN] datasets import failed: {exc}") return pd.DataFrame(columns=["text", "intent", "language"]) print("\n[1] Trying mteb/amazon_massive_intent ...") candidates: dict[str, list[str]] = { "AR": ["ar-SA", "ar_SA", "ar"], "EN": ["en-US", "en_US", "en"], "FR": ["fr-FR", "fr_FR", "fr"], } parts: list[pd.DataFrame] = [] for lang_code, cfgs in candidates.items(): loaded = False for cfg in cfgs: try: ds = load_dataset("mteb/amazon_massive_intent", cfg, split="train") except Exception as exc: # noqa: BLE001 # Try next candidate config name continue print(f" {lang_code}: loaded mteb config '{cfg}' ({len(ds)} rows)") # Try to read fields. mteb usually has 'text' + 'label_text'/'label' cols = ds.column_names text_col = "text" if "text" in cols else ("utt" if "utt" in cols else cols[0]) label_col = ("label_text" if "label_text" in cols else "intent" if "intent" in cols else "label" if "label" in cols else None) if label_col is None: print(f" [WARN] no recognised label column in {cols}") continue # If the label column is integers, use feature names map label_names: list[str] | None = None try: feat = ds.features[label_col] if hasattr(feat, "names"): label_names = feat.names except Exception: # noqa: BLE001 label_names = None rows: list[dict[str, str]] = [] for ex in ds: text = (ex.get(text_col) or "").strip() if not text: continue lab = ex.get(label_col) if isinstance(lab, int) and label_names: lab = label_names[lab] lab = str(lab) rows.append({ "text": text, "intent": map_massive_intent(lab), "language": lang_code, }) parts.append(pd.DataFrame(rows)) loaded = True break if not loaded: print(f" {lang_code}: no mteb config matched -> trying next approach") if parts: df = pd.concat(parts, ignore_index=True) print(f" ✓ mteb total: {len(df)} rows") return df return pd.DataFrame(columns=["text", "intent", "language"]) # ============================================================================ # APPROACH 2: Direct download from Amazon's official MASSIVE S3 bucket # ============================================================================ MASSIVE_URL = ( "https://amazon-massive-nlu-dataset.s3.amazonaws.com/" "amazon-massive-dataset-1.1.tar.gz" ) def _download_massive_tar() -> Path | None: """Download the MASSIVE 1.1 tar.gz to data/raw/ if not already cached.""" if MASSIVE_CACHE.exists() and MASSIVE_CACHE.stat().st_size > 50 * 1024 * 1024: print(f" Cache hit: {MASSIVE_CACHE} ({MASSIVE_CACHE.stat().st_size / 1024 / 1024:.1f} MB)") return MASSIVE_CACHE try: import requests except Exception as exc: # noqa: BLE001 print(f" [WARN] requests import failed: {exc}") return None print(f" Downloading MASSIVE 1.1 from official S3 ({MASSIVE_URL}) ...") try: r = requests.get(MASSIVE_URL, stream=True, timeout=60) r.raise_for_status() except Exception as exc: # noqa: BLE001 print(f" [WARN] download failed: {exc}") return None total = int(r.headers.get("content-length") or 0) tmp = MASSIVE_CACHE.with_suffix(".tar.gz.partial") with open(tmp, "wb") as f: with tqdm(total=total, unit="B", unit_scale=True, desc="MASSIVE.tar.gz") as pbar: for chunk in r.iter_content(chunk_size=64 * 1024): if not chunk: continue f.write(chunk) pbar.update(len(chunk)) tmp.replace(MASSIVE_CACHE) print(f" ✓ saved: {MASSIVE_CACHE} ({MASSIVE_CACHE.stat().st_size / 1024 / 1024:.1f} MB)") return MASSIVE_CACHE def try_amazon_s3() -> pd.DataFrame: """Download MASSIVE 1.1 tar.gz from Amazon S3 and parse AR/EN/FR JSONL files.""" print("\n[2] Trying direct download from Amazon S3 (MASSIVE 1.1) ...") cache = _download_massive_tar() if cache is None: return pd.DataFrame(columns=["text", "intent", "language"]) rows: list[dict[str, str]] = [] try: with tarfile.open(cache, "r:gz") as tar: members = tar.getmembers() print(f" Archive contains {len(members)} members.") wanted = [m for m in members if any(m.name.endswith(f"{loc}.jsonl") for loc in LANG_MAP_FULL)] for m in wanted: locale = Path(m.name).stem # ar-SA / en-US / fr-FR lang_code = LANG_MAP_FULL.get(locale) if lang_code is None: continue print(f" Parsing {m.name} -> {lang_code}") f = tar.extractfile(m) if f is None: continue added = 0 for line in f: try: ex = json.loads(line) except Exception: continue if ex.get("partition") != "train": continue text = (ex.get("utt") or "").strip() if not text: continue raw_intent = ex.get("intent") or "" rows.append({ "text": text, "intent": map_massive_intent(raw_intent), "language": lang_code, }) added += 1 print(f" +{added} train rows for {lang_code}") except Exception as exc: # noqa: BLE001 print(f" [WARN] tarfile parse failed: {exc}") return pd.DataFrame(columns=["text", "intent", "language"]) df = pd.DataFrame(rows) print(f" ✓ Amazon S3 total: {len(df)} rows") return df # ============================================================================ # APPROACH 3: Heavy synthetic generation (slot-filling templates) # ============================================================================ # Slot vocabularies in 3 languages _SLOTS = { "EN": { "verb_book": ["book", "reserve", "schedule", "arrange", "set up", "make a reservation for"], "verb_order": ["order", "get", "buy", "purchase"], "object": ["a flight", "a hotel room", "a taxi", "a train ticket", "a table for two", "a meeting room", "a rental car", "a doctor's appointment", "a meeting with the team"], "food": ["a pizza", "a burger", "two coffees", "lunch", "a sandwich"], "time": ["today", "tomorrow", "this evening", "next Monday", "next weekend", "in two hours", "for Friday"], "place": ["to Paris", "to Cairo", "to London", "to the airport", "to the conference center", "downtown"], "issue": ["my account", "my order", "the website", "the mobile app", "the payment system", "my subscription"], "complaint_intro": ["I have a problem with", "There is an issue with", "I am very unhappy with", "I cannot use", "Something is wrong with"], "inquiry_q": ["What are your opening hours?", "How much does the premium plan cost?", "Can I pay with PayPal?", "How long does international shipping take?", "Do you have a refund policy?", "Where is your main office?", "What payment methods do you accept?", "Is there a free trial?", "How do I cancel my subscription?", "Can I change my plan later?"], "greeting": ["hello", "hi there", "good morning", "good afternoon", "good evening", "hey", "greetings", "howdy"], "farewell": ["goodbye", "bye", "see you later", "see you soon", "take care", "have a nice day", "talk to you later", "farewell", "thanks, bye"], "other": ["I love classical music.", "The weather is beautiful today.", "Yesterday I went to the cinema with friends.", "Football is the most popular sport in the world.", "Reading books is a great hobby.", "I really enjoy Italian cuisine.", "My cat sleeps a lot during the day.", "Jazz music is relaxing.", "I like to walk in the park on weekends.", "She speaks three languages fluently."], }, "FR": { "verb_book": ["réserver", "planifier", "organiser", "prendre"], "verb_order": ["commander", "acheter"], "object": ["un vol", "une chambre d'hôtel", "un taxi", "un billet de train", "une table pour deux", "une salle de réunion", "une voiture de location", "un rendez-vous chez le médecin", "une réunion avec l'équipe"], "food": ["une pizza", "un burger", "deux cafés", "le déjeuner", "un sandwich"], "time": ["aujourd'hui", "demain", "ce soir", "lundi prochain", "le week-end prochain", "dans deux heures", "pour vendredi"], "place": ["pour Paris", "pour le Caire", "pour Londres", "pour l'aéroport", "pour le centre des congrès", "au centre-ville"], "issue": ["mon compte", "ma commande", "le site web", "l'application mobile", "le système de paiement", "mon abonnement"], "complaint_intro": ["J'ai un problème avec", "Il y a un souci avec", "Je ne suis pas content de", "Je ne peux pas utiliser", "Quelque chose ne va pas avec"], "inquiry_q": ["Quels sont vos horaires d'ouverture ?", "Combien coûte la formule premium ?", "Puis-je payer avec PayPal ?", "Combien de temps prend la livraison internationale ?", "Avez-vous une politique de remboursement ?", "Où se trouve votre siège ?", "Quels modes de paiement acceptez-vous ?", "Y a-t-il une période d'essai gratuite ?", "Comment annuler mon abonnement ?", "Puis-je changer mon forfait plus tard ?"], "greeting": ["bonjour", "salut", "bonsoir", "coucou", "salutations", "bonne journée", "ravi de vous parler"], "farewell": ["au revoir", "à bientôt", "à plus tard", "adieu", "à demain", "bonne soirée", "à la prochaine"], "other": ["J'aime la musique classique.", "Il fait très beau aujourd'hui.", "Hier je suis allé au cinéma avec des amis.", "Le football est le sport le plus populaire au monde.", "Lire des livres est un excellent passe-temps.", "J'apprécie vraiment la cuisine italienne.", "Mon chat dort beaucoup pendant la journée.", "La musique jazz est relaxante.", "J'aime me promener au parc le week-end.", "Elle parle trois langues couramment."], }, "AR": { "verb_book": ["أريد حجز", "احجز لي", "أحتاج إلى حجز", "من فضلك احجز"], "verb_order": ["اطلب لي", "أريد طلب", "أحتاج إلى"], "object": ["رحلة طيران", "غرفة فندق", "تاكسي", "تذكرة قطار", "طاولة لشخصين", "قاعة اجتماعات", "سيارة للإيجار", "موعدا مع الطبيب", "اجتماعا مع الفريق"], "food": ["بيتزا", "برجر", "قهوتين", "وجبة الغداء", "ساندويتش"], "time": ["اليوم", "غدا", "هذا المساء", "الإثنين القادم", "نهاية الأسبوع القادم", "بعد ساعتين", "ليوم الجمعة"], "place": ["إلى باريس", "إلى القاهرة", "إلى لندن", "إلى المطار", "إلى مركز المؤتمرات", "إلى وسط المدينة"], "issue": ["حسابي", "طلبي", "الموقع الإلكتروني", "تطبيق الجوال", "نظام الدفع", "اشتراكي"], "complaint_intro": ["لدي مشكلة في", "هناك خطأ في", "أنا غير راض عن", "لا أستطيع استخدام", "شيء ما خطأ في"], "inquiry_q": ["ما هي ساعات العمل؟", "كم تكلفة الباقة المتميزة؟", "هل يمكنني الدفع عبر باي بال؟", "كم يستغرق الشحن الدولي؟", "هل لديكم سياسة استرداد؟", "أين يقع مقركم الرئيسي؟", "ما هي طرق الدفع المقبولة؟", "هل توجد فترة تجربة مجانية؟", "كيف أقوم بإلغاء اشتراكي؟", "هل يمكنني تغيير الباقة لاحقا؟"], "greeting": ["مرحبا", "أهلا", "السلام عليكم", "صباح الخير", "مساء الخير", "أهلا وسهلا", "صباح النور", "مساء النور"], "farewell": ["مع السلامة", "وداعا", "إلى اللقاء", "أراك لاحقا", "تصبح على خير", "في أمان الله", "نهارك سعيد"], "other": ["أحب الموسيقى الكلاسيكية.", "الجو جميل جدا اليوم.", "أمس ذهبت إلى السينما مع الأصدقاء.", "كرة القدم هي أكثر الرياضات شعبية في العالم.", "قراءة الكتب هواية رائعة.", "أستمتع حقا بالمطبخ الإيطالي.", "قطتي تنام كثيرا خلال النهار.", "موسيقى الجاز مريحة.", "أحب التنزه في الحديقة في عطلات نهاية الأسبوع.", "هي تتحدث ثلاث لغات بطلاقة."], }, } def _join_ar(parts: Iterable[str]) -> str: """Arabic-friendly joining (no extra space normalization needed here).""" return " ".join(p for p in parts if p) def expanded_synthetic_intent(target_per_bucket: int = 140) -> pd.DataFrame: """Generate ~target_per_bucket examples per (intent, language) bucket. Uses simple slot-filling templates so that we get varied surface forms even though the underlying patterns are limited. Output ~2500 rows total when target_per_bucket=140 (140 * 6 intents * 3 langs). """ print(f"\n[3] Expanded synthetic intent generation ({target_per_bucket}/bucket) ...") def emit_booking(lang: str) -> list[str]: s = _SLOTS[lang] out = [] for verb in s["verb_book"]: for obj in s["object"]: for tm in s["time"]: if lang == "EN": out.append(f"I want to {verb} {obj} {tm}.") out.append(f"Please {verb} {obj} {tm}.") elif lang == "FR": out.append(f"Je veux {verb} {obj} {tm}.") out.append(f"Pouvez-vous {verb} {obj} {tm} ?") else: # AR out.append(f"{verb} {obj} {tm}") # Add food / takeaway for verb in s["verb_order"]: for food in s["food"]: for tm in s["time"]: if lang == "EN": out.append(f"Can you {verb} {food} {tm}?") elif lang == "FR": out.append(f"Pouvez-vous {verb} {food} {tm} ?") else: out.append(f"{verb} {food} {tm}") return out def emit_complaint(lang: str) -> list[str]: s = _SLOTS[lang] out = [] for intro in s["complaint_intro"]: for issue in s["issue"]: if lang == "AR": out.append(f"{intro} {issue}.") else: out.append(f"{intro} {issue}.") # Stock complaints if lang == "EN": out += [ "This is completely unacceptable.", "I want a refund right now.", "Customer support has not responded for two days.", "I waited an hour on the phone.", "The product I received is damaged.", "I was charged twice on my credit card.", "Your service is the worst I have ever used.", ] elif lang == "FR": out += [ "C'est totalement inacceptable.", "Je veux un remboursement immédiatement.", "Le service client n'a pas répondu depuis deux jours.", "J'ai attendu une heure au téléphone.", "Le produit que j'ai reçu est endommagé.", "J'ai été facturé deux fois sur ma carte de crédit.", "Votre service est le pire que j'aie jamais utilisé.", ] else: out += [ "هذا غير مقبول إطلاقا.", "أريد استرداد أموالي الآن.", "خدمة العملاء لم ترد منذ يومين.", "انتظرت ساعة على الهاتف.", "المنتج الذي استلمته تالف.", "تم خصم المبلغ مرتين من بطاقتي الائتمانية.", "خدمتكم هي الأسوأ التي استخدمتها على الإطلاق.", ] return out def emit_inquiry(lang: str) -> list[str]: s = _SLOTS[lang] # Inquiries are mostly direct questions; add small variations out = list(s["inquiry_q"]) # Add prefix variants if lang == "EN": for q in s["inquiry_q"]: out.append(f"Could you tell me {q.lower().rstrip('?')}?") out.append(f"I would like to know {q.lower().rstrip('?')}.") elif lang == "FR": for q in s["inquiry_q"]: out.append(f"Pouvez-vous me dire {q.lower().rstrip(' ?')} ?") out.append(f"J'aimerais savoir {q.lower().rstrip(' ?')}.") else: for q in s["inquiry_q"]: out.append(f"هل يمكنك إخباري {q.rstrip('؟')}؟") out.append(f"أود أن أعرف {q.rstrip('؟')}.") return out def emit_greeting(lang: str) -> list[str]: s = _SLOTS[lang] base = list(s["greeting"]) out = list(base) for g in base: if lang == "EN": out.append(f"{g.capitalize()}, how are you?") out.append(f"{g.capitalize()}, I hope you're well.") elif lang == "FR": out.append(f"{g.capitalize()}, comment ça va ?") out.append(f"{g.capitalize()}, j'espère que vous allez bien.") else: out.append(f"{g} كيف حالك؟") out.append(f"{g} أتمنى أنك بخير.") return out def emit_farewell(lang: str) -> list[str]: s = _SLOTS[lang] base = list(s["farewell"]) out = list(base) for g in base: if lang == "EN": out.append(f"{g.capitalize()}, have a nice day.") out.append(f"Thanks, {g}.") elif lang == "FR": out.append(f"{g.capitalize()}, bonne journée.") out.append(f"Merci, {g}.") else: out.append(f"{g}، نهارك سعيد.") out.append(f"شكرا، {g}.") return out def emit_other(lang: str) -> list[str]: return list(_SLOTS[lang]["other"]) emitters = { "booking": emit_booking, "complaint": emit_complaint, "inquiry": emit_inquiry, "greeting": emit_greeting, "farewell": emit_farewell, "other": emit_other, } rows: list[dict[str, str]] = [] for lang in ("EN", "FR", "AR"): for intent, fn in emitters.items(): pool = list(dict.fromkeys(fn(lang))) # dedup, preserve order random.shuffle(pool) # Cycle through pool to reach target chosen: list[str] = [] i = 0 while len(chosen) < target_per_bucket and pool: chosen.append(pool[i % len(pool)]) i += 1 for s in chosen: rows.append({"text": s, "intent": intent, "language": lang}) df = pd.DataFrame(rows).drop_duplicates(subset=["text", "intent", "language"]) print(f" ✓ generated {len(df)} synthetic rows") return df # ============================================================================ # Merge + write # ============================================================================ def load_existing() -> pd.DataFrame: """Read the (broken) intent_data.csv if it exists.""" if not INTENT_CSV.exists(): return pd.DataFrame(columns=["text", "intent", "language"]) df = pd.read_csv(INTENT_CSV) print(f" Existing rows: {len(df)} (will be merged + dedup'd)") return df def main() -> int: """Run the recovery pipeline and rewrite intent_data.csv.""" print("=" * 72) print("Repairing data/raw/intent_data.csv") print("=" * 72) parts: list[pd.DataFrame] = [] # Approach 1: mteb fork df1 = try_mteb_massive() parts.append(df1) have_real_data = len(df1) >= 2000 # Approach 2: Amazon S3 (if mteb didn't give us enough) if not have_real_data: df2 = try_amazon_s3() parts.append(df2) have_real_data = len(df2) >= 2000 # Approach 3: synthetic (always run as a robust supplement) df3 = expanded_synthetic_intent(target_per_bucket=140 if not have_real_data else 60) parts.append(df3) # Merge with existing 170 rows parts.append(load_existing()) df = pd.concat(parts, ignore_index=True) df["text"] = df["text"].astype(str).str.strip() df = df[df["text"].str.len().between(2, 300)] df = df.drop_duplicates(subset=["text", "intent", "language"]).reset_index(drop=True) INTENT_CSV.write_text("") # truncate cleanly df.to_csv(INTENT_CSV, index=False) # Report print("\n" + "=" * 72) print("FINAL") print("=" * 72) print(f"Total rows written: {len(df)}") print(f"Path: {INTENT_CSV}") print(f"Language distribution: {df['language'].value_counts().to_dict()}") print(f"Intent distribution : {df['intent'].value_counts().to_dict()}") cross = df.groupby(["language", "intent"]).size().unstack(fill_value=0) print("Intent x language:") for line in cross.to_string().splitlines(): print(f" {line}") return 0 if __name__ == "__main__": try: sys.exit(main()) except KeyboardInterrupt: print("\nAborted by user.") sys.exit(130)