Spaces:
Running
Running
| """ | |
| src/streaming/data_loader.py | |
| Multi-Dataset CustomerCore Data Loader | |
| Downloads multiple open-source customer support datasets from Hugging Face | |
| and publishes enriched events to Redpanda topics with progress tracking. | |
| == Datasets (all open-source, no scraping, no API keys, legal in EU/Germany) == | |
| 1. bitext/Bitext-customer-support-llm-chatbot-training-dataset | |
| 26,872 rows | CDLA-Sharing-1.0 | SaaS customer support Q&A (English) | |
| Categories: billing, account, orders, technical, etc. | |
| 2. bitext/Bitext-retail-banking-llm-chatbot-training-dataset | |
| 25,545 rows | CDLA-Sharing-1.0 | B2B banking/financial support Q&A (English) | |
| Categories: card, loan, account, transfer, compliance, etc. | |
| 3. mteb/amazon_massive_intent [de, fr, es] | |
| 11,514 rows x 3 languages = 34,542 rows | Apache 2.0 | |
| Real customer voice assistant intent utterances in German, French, Spanish | |
| Intents map to: alarm, calendar, email, audio, transport, shopping, etc. | |
| Source: Amazon MASSIVE (Fitzgerald et al., 2022) | |
| Total: ~87,000 rows across 4 languages (en, de, fr, es) | |
| == Multi-Language Strategy == | |
| A B2B SaaS platform serving EU customers receives tickets in German, French, | |
| Spanish, Portuguese etc. Single-language support degrades classification accuracy | |
| by 15-40% and is not acceptable for GDPR-compliant EU platforms. | |
| MASSIVE gives us real customer utterances in each language β not translations. | |
| == Run == | |
| python -m src.streaming.data_loader # all sources (default) | |
| python -m src.streaming.data_loader --sources customer # SaaS English only | |
| python -m src.streaming.data_loader --sources banking # Banking English only | |
| python -m src.streaming.data_loader --sources massive # Multilingual only | |
| python -m src.streaming.data_loader --sources all # Everything ~87k rows | |
| python -m src.streaming.data_loader --limit 500 --sources all # Quick test | |
| """ | |
| import argparse | |
| import json | |
| import random | |
| import time | |
| import uuid | |
| from datetime import datetime, timezone | |
| from confluent_kafka import Producer | |
| from datasets import load_dataset | |
| from tqdm import tqdm | |
| # ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BROKER = "localhost:9092" | |
| TOPIC = "support-tickets" | |
| DATASET_CONFIGS = { | |
| "customer": { | |
| "id": "bitext/Bitext-customer-support-llm-chatbot-training-dataset", | |
| "license": "CDLA-Sharing-1.0", | |
| "domain": "saas_support", | |
| "text_field": "instruction", | |
| "response_field": "response", | |
| "category_field": "category", | |
| "tags_field": "tags", | |
| "language": "en", | |
| }, | |
| "banking": { | |
| "id": "bitext/Bitext-retail-banking-llm-chatbot-training-dataset", | |
| "license": "CDLA-Sharing-1.0", | |
| "domain": "financial_support", | |
| "text_field": "instruction", | |
| "response_field": "response", | |
| "category_field": "category", | |
| "tags_field": "tags", | |
| "language": "en", | |
| }, | |
| } | |
| # Multilingual MASSIVE configs (per language β Apache 2.0, no scraping) | |
| MASSIVE_LANGUAGES = { | |
| "de": {"name": "German", "mteb_config": "de"}, | |
| "fr": {"name": "French", "mteb_config": "fr"}, | |
| "es": {"name": "Spanish", "mteb_config": "es"}, | |
| } | |
| # ββ Enrichment pools (makes data multi-tenant) ββββββββββββββββ | |
| TENANTS = ["acme-corp", "globex-inc", "initech-ltd", "umbrella-co", "stark-tech"] | |
| TIERS = ["enterprise", "professional", "free"] | |
| CHANNELS = ["email", "web", "api", "chat"] | |
| PRIORITY_MAP = { | |
| # SaaS support categories | |
| "ACCOUNT": ["medium", "high"], | |
| "BILLING": ["high", "high", "critical"], | |
| "CANCEL": ["high", "critical"], | |
| "CONTACT": ["low", "medium"], | |
| "DELIVERY": ["medium", "high"], | |
| "FEEDBACK": ["low", "medium"], | |
| "INVOICE": ["high", "critical"], | |
| "NEWSLETTER": ["low"], | |
| "ORDER": ["medium", "high"], | |
| "PAYMENT": ["high", "critical"], | |
| "REFUND": ["high", "high", "critical"], | |
| "REVIEW": ["low", "medium"], | |
| "SHIPPING": ["medium", "high"], | |
| "SUBSCRIPTION": ["medium", "high"], | |
| "SUPPORT": ["medium", "high"], | |
| # Banking categories | |
| "CARD": ["high", "critical"], | |
| "LOAN": ["medium", "high"], | |
| "TRANSFER": ["high", "critical"], | |
| "COMPLIANCE": ["high", "critical"], | |
| "MORTGAGE": ["medium", "high"], | |
| "SAVINGS": ["low", "medium"], | |
| "FRAUD": ["critical", "critical"], | |
| } | |
| def delivery_report(err, msg): | |
| if err is not None: | |
| print(f"\n [ERROR] Delivery failed: {err}") | |
| def enrich(row: dict, domain: str, license_str: str) -> dict: | |
| """ | |
| Normalize a raw Bitext row into a CustomerCore ticket event. | |
| Both Bitext datasets share the same column schema (instruction / response / category). | |
| Domain tag differentiates SaaS support from financial support in downstream analysis. | |
| """ | |
| category = str(row.get("category", "SUPPORT")).upper() | |
| tier = random.choice(TIERS) | |
| priority_pool = PRIORITY_MAP.get(category[:8], ["low", "medium", "high"]) | |
| if tier == "enterprise": | |
| priority_pool = [p for p in priority_pool if p in ("high", "critical")] or ["high"] | |
| tags_raw = row.get("tags", "") | |
| tags = [t.strip() for t in str(tags_raw).split(",") if t.strip()] if tags_raw else [] | |
| return { | |
| "event_id": str(uuid.uuid4()), | |
| "event_type": "support_ticket_created", | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "tenant_id": random.choice(TENANTS), | |
| "ticket_id": f"TKT-{random.randint(10000, 99999)}", | |
| "customer_id": f"CUST-{random.randint(1000, 99999)}", | |
| "customer_tier": tier, | |
| "subject": str(row.get("instruction", ""))[:120], | |
| "body": str(row.get("instruction", "")), | |
| "suggested_response": str(row.get("response", "")), | |
| "category": category.lower(), | |
| "priority": random.choice(priority_pool), | |
| "channel": random.choice(CHANNELS), | |
| "reopen_count": random.randint(0, 2), | |
| "tags": tags, | |
| "source_domain": domain, | |
| "source": f"huggingface:bitext-{domain}", | |
| "license": license_str, | |
| "language": "en", | |
| "is_multilingual": False, | |
| } | |
| def enrich_massive(row: dict, language: str, lang_name: str) -> dict: | |
| """ | |
| Normalize an Amazon MASSIVE intent row into a CustomerCore ticket event. | |
| MASSIVE rows have: id, label, label_text, text, lang | |
| label_text = intent name (e.g. 'alarm_set', 'email_query', 'calendar_remove') | |
| text = the customer utterance in the target language | |
| Maps MASSIVE intents to support categories: | |
| alarm/reminder/calendar -> account | |
| email/messaging -> account | |
| audio/music/podcast -> product | |
| transport/travel -> order | |
| shopping -> order | |
| iot/smart_home -> technical | |
| weather/datetime/news -> general | |
| """ | |
| INTENT_TO_CATEGORY = { | |
| "alarm": "account", "reminder": "account", "calendar": "account", | |
| "email": "account", "messaging": "account", "social": "account", | |
| "audio": "technical", "music": "technical", "podcast": "technical", | |
| "play": "technical", "iot": "technical", "smart": "technical", | |
| "transport": "order", "travel": "order", "lists": "order", | |
| "shopping": "order", "takeaway": "order", "cooking": "order", | |
| "weather": "general", "datetime": "general", "news": "general", | |
| "qa": "general", "recommendation": "general", "general": "general", | |
| } | |
| intent = str(row.get("label_text", "general")).lower() | |
| category = "general" | |
| for key, cat in INTENT_TO_CATEGORY.items(): | |
| if key in intent: | |
| category = cat | |
| break | |
| tier = random.choice(TIERS) | |
| priority_pool = ["low", "medium", "medium", "high"] | |
| return { | |
| "event_id": str(uuid.uuid4()), | |
| "event_type": "support_ticket_created", | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "tenant_id": random.choice(TENANTS), | |
| "ticket_id": f"TKT-{random.randint(10000, 99999)}", | |
| "customer_id": f"CUST-{random.randint(1000, 99999)}", | |
| "customer_tier": tier, | |
| "subject": str(row.get("text", ""))[:120], | |
| "body": str(row.get("text", "")), | |
| "suggested_response": "", | |
| "category": category, | |
| "priority": random.choice(priority_pool), | |
| "channel": random.choice(CHANNELS), | |
| "reopen_count": 0, | |
| "tags": [intent.replace("_", "-")], | |
| "source_domain": "multilingual_intent", | |
| "source": f"huggingface:amazon-massive-{language}", | |
| "license": "Apache-2.0", | |
| "language": language, | |
| "language_name": lang_name, | |
| "is_multilingual": True, | |
| "intent": intent, | |
| } | |
| def load_sources(sources: str) -> tuple[list, dict]: | |
| """ | |
| Download and concatenate the requested dataset sources. | |
| Returns (list_of_(row, enrich_fn) tuples, dict of source->count). | |
| """ | |
| all_rows = [] | |
| total_per_source = {} | |
| bitext_sources = [] | |
| load_massive = False | |
| if sources == "all": | |
| bitext_sources = list(DATASET_CONFIGS.items()) | |
| load_massive = True | |
| elif sources == "massive": | |
| load_massive = True | |
| elif sources in DATASET_CONFIGS: | |
| bitext_sources = [(sources, DATASET_CONFIGS[sources])] | |
| else: | |
| raise ValueError( | |
| f"Unknown source '{sources}'. " | |
| f"Choose from: {list(DATASET_CONFIGS.keys()) + ['massive', 'all']}" | |
| ) | |
| # Load Bitext English datasets | |
| for name, cfg in bitext_sources: | |
| print(f"\n Downloading: {cfg['id']}") | |
| print(f" License : {cfg['license']} | Language: English") | |
| t0 = time.time() | |
| ds = load_dataset(cfg["id"], split="train") | |
| elapsed = time.time() - t0 | |
| print(f" Downloaded : {len(ds):,} rows in {elapsed:.1f}s") | |
| total_per_source[name] = len(ds) | |
| for row in ds: | |
| all_rows.append((row, cfg["domain"], cfg["license"], "bitext", "en", "English")) | |
| # Load multilingual MASSIVE datasets | |
| if load_massive: | |
| for lang_code, lang_cfg in MASSIVE_LANGUAGES.items(): | |
| lang_name = lang_cfg["name"] | |
| mteb_config = lang_cfg["mteb_config"] | |
| print(f"\n Downloading: mteb/amazon_massive_intent [{lang_code} - {lang_name}]") | |
| print(f" License : Apache-2.0 | Language: {lang_name}") | |
| t0 = time.time() | |
| ds = load_dataset("mteb/amazon_massive_intent", mteb_config, split="train") | |
| elapsed = time.time() - t0 | |
| print(f" Downloaded : {len(ds):,} rows in {elapsed:.1f}s") | |
| total_per_source[f"massive_{lang_code}"] = len(ds) | |
| for row in ds: | |
| all_rows.append((row, "multilingual_intent", "Apache-2.0", "massive", lang_code, lang_name)) | |
| return all_rows, total_per_source | |
| def main(limit: int = None, delay: float = 0.0, sources: str = "all"): | |
| print("=" * 65) | |
| print("CustomerCore Multi-Language Data Loader") | |
| print("Sources: Bitext SaaS + Banking (EN) + Amazon MASSIVE (DE/FR/ES)") | |
| print("License: CDLA-Sharing-1.0 + Apache-2.0 | Legal in EU/Germany") | |
| print("=" * 65) | |
| # ββ 1. Download datasets ββββββββββββββββββββββββββββββββββ | |
| print("\n[1/3] Downloading datasets from Hugging Face...") | |
| time.time() | |
| all_rows, source_counts = load_sources(sources) | |
| print("\n Source breakdown:") | |
| for src, count in source_counts.items(): | |
| print(f" {src:18s}: {count:,} rows") | |
| total = len(all_rows) | |
| print(f" {'TOTAL':18s}: {total:,} rows") | |
| if limit: | |
| random.shuffle(all_rows) | |
| all_rows = all_rows[:limit] | |
| print(f"\n Limited to {len(all_rows):,} rows for this run") | |
| # ββ 2. Connect to Redpanda ββββββββββββββββββββββββββββββββ | |
| print(f"\n[2/3] Connecting to Redpanda at {BROKER}...") | |
| producer = Producer({ | |
| "bootstrap.servers": BROKER, | |
| "queue.buffering.max.messages": 10000, | |
| "batch.num.messages": 500, | |
| }) | |
| print(" Connected.") | |
| # ββ 3. Publish with progress bar ββββββββββββββββββββββββββ | |
| print(f"\n[3/3] Publishing {len(all_rows):,} events to '{TOPIC}'...") | |
| print(f" Estimated time: {len(all_rows) * 0.001:.0f}β{len(all_rows) * 0.003:.0f} seconds") | |
| failed = 0 | |
| lang_counts: dict[str, int] = {} | |
| t0 = time.time() | |
| with tqdm( | |
| total=len(all_rows), | |
| unit="msg", | |
| bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]" | |
| ) as pbar: | |
| # Each tuple: (row, domain, license_str, source_type, lang_code, lang_name) | |
| for row, domain, license_str, source_type, lang_code, lang_name in all_rows: | |
| if source_type == "bitext": | |
| event = enrich(row, domain, license_str) | |
| else: # massive | |
| event = enrich_massive(row, lang_code, lang_name) | |
| lang_counts[lang_code] = lang_counts.get(lang_code, 0) + 1 | |
| producer.produce( | |
| topic=TOPIC, | |
| key=event["ticket_id"].encode(), | |
| value=json.dumps(event).encode(), | |
| callback=lambda err, msg: None if not err else None, | |
| ) | |
| producer.poll(0) | |
| pbar.update(1) | |
| if delay: | |
| time.sleep(delay) | |
| producer.flush() | |
| elapsed = time.time() - t0 | |
| sent = len(all_rows) - failed | |
| print(f"\n{'=' * 65}") | |
| print(f" Published : {sent:,} events") | |
| print(f" Failed : {failed}") | |
| print(f" Duration : {elapsed:.1f}s ({len(all_rows)/elapsed:.0f} msg/s)") | |
| print(f" Topic : {TOPIC}") | |
| print(f" Sources : {sources}") | |
| print(" Language breakdown:") | |
| for lang, count in sorted(lang_counts.items()): | |
| print(f" {lang}: {count:,} rows") | |
| print(f"{'=' * 65}") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="CustomerCore Multi-Language HuggingFace loader") | |
| parser.add_argument( | |
| "--limit", type=int, default=None, | |
| help="Max rows to load (default: all ~87k combined)" | |
| ) | |
| parser.add_argument( | |
| "--delay", type=float, default=0.0, | |
| help="Delay between messages in seconds" | |
| ) | |
| parser.add_argument( | |
| "--sources", | |
| choices=["all", "customer", "banking", "massive"], | |
| default="all", | |
| help="Which dataset sources to load (default: all = EN+DE+FR+ES)" | |
| ) | |
| args = parser.parse_args() | |
| main(limit=args.limit, delay=args.delay, sources=args.sources) | |