customercore / src /streaming /data_loader.py
Saibalaji Namburi
fix(lint): resolve ruff code formatting and unused import warnings
15c00a9
Raw
History Blame Contribute Delete
14.8 kB
"""
src/streaming/data_loader.py
Multi-Dataset CustomerCore Data Loader
Downloads multiple open-source customer support datasets from Hugging Face
and publishes enriched events to Redpanda topics with progress tracking.
== Datasets (all open-source, no scraping, no API keys, legal in EU/Germany) ==
1. bitext/Bitext-customer-support-llm-chatbot-training-dataset
26,872 rows | CDLA-Sharing-1.0 | SaaS customer support Q&A (English)
Categories: billing, account, orders, technical, etc.
2. bitext/Bitext-retail-banking-llm-chatbot-training-dataset
25,545 rows | CDLA-Sharing-1.0 | B2B banking/financial support Q&A (English)
Categories: card, loan, account, transfer, compliance, etc.
3. mteb/amazon_massive_intent [de, fr, es]
11,514 rows x 3 languages = 34,542 rows | Apache 2.0
Real customer voice assistant intent utterances in German, French, Spanish
Intents map to: alarm, calendar, email, audio, transport, shopping, etc.
Source: Amazon MASSIVE (Fitzgerald et al., 2022)
Total: ~87,000 rows across 4 languages (en, de, fr, es)
== Multi-Language Strategy ==
A B2B SaaS platform serving EU customers receives tickets in German, French,
Spanish, Portuguese etc. Single-language support degrades classification accuracy
by 15-40% and is not acceptable for GDPR-compliant EU platforms.
MASSIVE gives us real customer utterances in each language β€” not translations.
== Run ==
python -m src.streaming.data_loader # all sources (default)
python -m src.streaming.data_loader --sources customer # SaaS English only
python -m src.streaming.data_loader --sources banking # Banking English only
python -m src.streaming.data_loader --sources massive # Multilingual only
python -m src.streaming.data_loader --sources all # Everything ~87k rows
python -m src.streaming.data_loader --limit 500 --sources all # Quick test
"""
import argparse
import json
import random
import time
import uuid
from datetime import datetime, timezone
from confluent_kafka import Producer
from datasets import load_dataset
from tqdm import tqdm
# ── Config ────────────────────────────────────────────────────
BROKER = "localhost:9092"
TOPIC = "support-tickets"
DATASET_CONFIGS = {
"customer": {
"id": "bitext/Bitext-customer-support-llm-chatbot-training-dataset",
"license": "CDLA-Sharing-1.0",
"domain": "saas_support",
"text_field": "instruction",
"response_field": "response",
"category_field": "category",
"tags_field": "tags",
"language": "en",
},
"banking": {
"id": "bitext/Bitext-retail-banking-llm-chatbot-training-dataset",
"license": "CDLA-Sharing-1.0",
"domain": "financial_support",
"text_field": "instruction",
"response_field": "response",
"category_field": "category",
"tags_field": "tags",
"language": "en",
},
}
# Multilingual MASSIVE configs (per language β€” Apache 2.0, no scraping)
MASSIVE_LANGUAGES = {
"de": {"name": "German", "mteb_config": "de"},
"fr": {"name": "French", "mteb_config": "fr"},
"es": {"name": "Spanish", "mteb_config": "es"},
}
# ── Enrichment pools (makes data multi-tenant) ────────────────
TENANTS = ["acme-corp", "globex-inc", "initech-ltd", "umbrella-co", "stark-tech"]
TIERS = ["enterprise", "professional", "free"]
CHANNELS = ["email", "web", "api", "chat"]
PRIORITY_MAP = {
# SaaS support categories
"ACCOUNT": ["medium", "high"],
"BILLING": ["high", "high", "critical"],
"CANCEL": ["high", "critical"],
"CONTACT": ["low", "medium"],
"DELIVERY": ["medium", "high"],
"FEEDBACK": ["low", "medium"],
"INVOICE": ["high", "critical"],
"NEWSLETTER": ["low"],
"ORDER": ["medium", "high"],
"PAYMENT": ["high", "critical"],
"REFUND": ["high", "high", "critical"],
"REVIEW": ["low", "medium"],
"SHIPPING": ["medium", "high"],
"SUBSCRIPTION": ["medium", "high"],
"SUPPORT": ["medium", "high"],
# Banking categories
"CARD": ["high", "critical"],
"LOAN": ["medium", "high"],
"TRANSFER": ["high", "critical"],
"COMPLIANCE": ["high", "critical"],
"MORTGAGE": ["medium", "high"],
"SAVINGS": ["low", "medium"],
"FRAUD": ["critical", "critical"],
}
def delivery_report(err, msg):
if err is not None:
print(f"\n [ERROR] Delivery failed: {err}")
def enrich(row: dict, domain: str, license_str: str) -> dict:
"""
Normalize a raw Bitext row into a CustomerCore ticket event.
Both Bitext datasets share the same column schema (instruction / response / category).
Domain tag differentiates SaaS support from financial support in downstream analysis.
"""
category = str(row.get("category", "SUPPORT")).upper()
tier = random.choice(TIERS)
priority_pool = PRIORITY_MAP.get(category[:8], ["low", "medium", "high"])
if tier == "enterprise":
priority_pool = [p for p in priority_pool if p in ("high", "critical")] or ["high"]
tags_raw = row.get("tags", "")
tags = [t.strip() for t in str(tags_raw).split(",") if t.strip()] if tags_raw else []
return {
"event_id": str(uuid.uuid4()),
"event_type": "support_ticket_created",
"timestamp": datetime.now(timezone.utc).isoformat(),
"tenant_id": random.choice(TENANTS),
"ticket_id": f"TKT-{random.randint(10000, 99999)}",
"customer_id": f"CUST-{random.randint(1000, 99999)}",
"customer_tier": tier,
"subject": str(row.get("instruction", ""))[:120],
"body": str(row.get("instruction", "")),
"suggested_response": str(row.get("response", "")),
"category": category.lower(),
"priority": random.choice(priority_pool),
"channel": random.choice(CHANNELS),
"reopen_count": random.randint(0, 2),
"tags": tags,
"source_domain": domain,
"source": f"huggingface:bitext-{domain}",
"license": license_str,
"language": "en",
"is_multilingual": False,
}
def enrich_massive(row: dict, language: str, lang_name: str) -> dict:
"""
Normalize an Amazon MASSIVE intent row into a CustomerCore ticket event.
MASSIVE rows have: id, label, label_text, text, lang
label_text = intent name (e.g. 'alarm_set', 'email_query', 'calendar_remove')
text = the customer utterance in the target language
Maps MASSIVE intents to support categories:
alarm/reminder/calendar -> account
email/messaging -> account
audio/music/podcast -> product
transport/travel -> order
shopping -> order
iot/smart_home -> technical
weather/datetime/news -> general
"""
INTENT_TO_CATEGORY = {
"alarm": "account", "reminder": "account", "calendar": "account",
"email": "account", "messaging": "account", "social": "account",
"audio": "technical", "music": "technical", "podcast": "technical",
"play": "technical", "iot": "technical", "smart": "technical",
"transport": "order", "travel": "order", "lists": "order",
"shopping": "order", "takeaway": "order", "cooking": "order",
"weather": "general", "datetime": "general", "news": "general",
"qa": "general", "recommendation": "general", "general": "general",
}
intent = str(row.get("label_text", "general")).lower()
category = "general"
for key, cat in INTENT_TO_CATEGORY.items():
if key in intent:
category = cat
break
tier = random.choice(TIERS)
priority_pool = ["low", "medium", "medium", "high"]
return {
"event_id": str(uuid.uuid4()),
"event_type": "support_ticket_created",
"timestamp": datetime.now(timezone.utc).isoformat(),
"tenant_id": random.choice(TENANTS),
"ticket_id": f"TKT-{random.randint(10000, 99999)}",
"customer_id": f"CUST-{random.randint(1000, 99999)}",
"customer_tier": tier,
"subject": str(row.get("text", ""))[:120],
"body": str(row.get("text", "")),
"suggested_response": "",
"category": category,
"priority": random.choice(priority_pool),
"channel": random.choice(CHANNELS),
"reopen_count": 0,
"tags": [intent.replace("_", "-")],
"source_domain": "multilingual_intent",
"source": f"huggingface:amazon-massive-{language}",
"license": "Apache-2.0",
"language": language,
"language_name": lang_name,
"is_multilingual": True,
"intent": intent,
}
def load_sources(sources: str) -> tuple[list, dict]:
"""
Download and concatenate the requested dataset sources.
Returns (list_of_(row, enrich_fn) tuples, dict of source->count).
"""
all_rows = []
total_per_source = {}
bitext_sources = []
load_massive = False
if sources == "all":
bitext_sources = list(DATASET_CONFIGS.items())
load_massive = True
elif sources == "massive":
load_massive = True
elif sources in DATASET_CONFIGS:
bitext_sources = [(sources, DATASET_CONFIGS[sources])]
else:
raise ValueError(
f"Unknown source '{sources}'. "
f"Choose from: {list(DATASET_CONFIGS.keys()) + ['massive', 'all']}"
)
# Load Bitext English datasets
for name, cfg in bitext_sources:
print(f"\n Downloading: {cfg['id']}")
print(f" License : {cfg['license']} | Language: English")
t0 = time.time()
ds = load_dataset(cfg["id"], split="train")
elapsed = time.time() - t0
print(f" Downloaded : {len(ds):,} rows in {elapsed:.1f}s")
total_per_source[name] = len(ds)
for row in ds:
all_rows.append((row, cfg["domain"], cfg["license"], "bitext", "en", "English"))
# Load multilingual MASSIVE datasets
if load_massive:
for lang_code, lang_cfg in MASSIVE_LANGUAGES.items():
lang_name = lang_cfg["name"]
mteb_config = lang_cfg["mteb_config"]
print(f"\n Downloading: mteb/amazon_massive_intent [{lang_code} - {lang_name}]")
print(f" License : Apache-2.0 | Language: {lang_name}")
t0 = time.time()
ds = load_dataset("mteb/amazon_massive_intent", mteb_config, split="train")
elapsed = time.time() - t0
print(f" Downloaded : {len(ds):,} rows in {elapsed:.1f}s")
total_per_source[f"massive_{lang_code}"] = len(ds)
for row in ds:
all_rows.append((row, "multilingual_intent", "Apache-2.0", "massive", lang_code, lang_name))
return all_rows, total_per_source
def main(limit: int = None, delay: float = 0.0, sources: str = "all"):
print("=" * 65)
print("CustomerCore Multi-Language Data Loader")
print("Sources: Bitext SaaS + Banking (EN) + Amazon MASSIVE (DE/FR/ES)")
print("License: CDLA-Sharing-1.0 + Apache-2.0 | Legal in EU/Germany")
print("=" * 65)
# ── 1. Download datasets ──────────────────────────────────
print("\n[1/3] Downloading datasets from Hugging Face...")
time.time()
all_rows, source_counts = load_sources(sources)
print("\n Source breakdown:")
for src, count in source_counts.items():
print(f" {src:18s}: {count:,} rows")
total = len(all_rows)
print(f" {'TOTAL':18s}: {total:,} rows")
if limit:
random.shuffle(all_rows)
all_rows = all_rows[:limit]
print(f"\n Limited to {len(all_rows):,} rows for this run")
# ── 2. Connect to Redpanda ────────────────────────────────
print(f"\n[2/3] Connecting to Redpanda at {BROKER}...")
producer = Producer({
"bootstrap.servers": BROKER,
"queue.buffering.max.messages": 10000,
"batch.num.messages": 500,
})
print(" Connected.")
# ── 3. Publish with progress bar ──────────────────────────
print(f"\n[3/3] Publishing {len(all_rows):,} events to '{TOPIC}'...")
print(f" Estimated time: {len(all_rows) * 0.001:.0f}–{len(all_rows) * 0.003:.0f} seconds")
failed = 0
lang_counts: dict[str, int] = {}
t0 = time.time()
with tqdm(
total=len(all_rows),
unit="msg",
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]"
) as pbar:
# Each tuple: (row, domain, license_str, source_type, lang_code, lang_name)
for row, domain, license_str, source_type, lang_code, lang_name in all_rows:
if source_type == "bitext":
event = enrich(row, domain, license_str)
else: # massive
event = enrich_massive(row, lang_code, lang_name)
lang_counts[lang_code] = lang_counts.get(lang_code, 0) + 1
producer.produce(
topic=TOPIC,
key=event["ticket_id"].encode(),
value=json.dumps(event).encode(),
callback=lambda err, msg: None if not err else None,
)
producer.poll(0)
pbar.update(1)
if delay:
time.sleep(delay)
producer.flush()
elapsed = time.time() - t0
sent = len(all_rows) - failed
print(f"\n{'=' * 65}")
print(f" Published : {sent:,} events")
print(f" Failed : {failed}")
print(f" Duration : {elapsed:.1f}s ({len(all_rows)/elapsed:.0f} msg/s)")
print(f" Topic : {TOPIC}")
print(f" Sources : {sources}")
print(" Language breakdown:")
for lang, count in sorted(lang_counts.items()):
print(f" {lang}: {count:,} rows")
print(f"{'=' * 65}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="CustomerCore Multi-Language HuggingFace loader")
parser.add_argument(
"--limit", type=int, default=None,
help="Max rows to load (default: all ~87k combined)"
)
parser.add_argument(
"--delay", type=float, default=0.0,
help="Delay between messages in seconds"
)
parser.add_argument(
"--sources",
choices=["all", "customer", "banking", "massive"],
default="all",
help="Which dataset sources to load (default: all = EN+DE+FR+ES)"
)
args = parser.parse_args()
main(limit=args.limit, delay=args.delay, sources=args.sources)