"""Build 20-40 case bundles from raw datasets. Each case bundle = one customer/incident/problem chain. Field provenance (real vs synthetic): REAL from Tobi-Bueck/customer-support-tickets: - ticket_text (from body) - email_thread (from answer) - priority (from priority field) - language (from language field) - source_dataset tags: tag_1..tag_8, queue, type REAL from Bitext dataset: - conversation_snippet (from instruction + response) - ticket_text (constructed from category + instruction) SYNTHETIC (always): - vip_tier — no real VIP labels available - handle_time_minutes — no real handle times available - churned_within_30d — no real churn labels available Synthetic logic is deterministic (seed=42) and explicitly documented. """ import json import random import hashlib from pathlib import Path from pipeline.schemas import CaseBundle from pipeline.normalize import normalize_case, detect_language from pipeline.loaders import save_case_bundle RAW_DIR = Path("data/raw") CASES_DIR = Path("data/cases") # Deterministic seed for reproducibility random.seed(42) # --------------------------------------------------------------------------- # Synthetic augmentation (only for fields that have no real source) # --------------------------------------------------------------------------- VIP_TIERS = ["standard", "standard", "standard", "vip", "unknown"] PRIORITIES = ["low", "medium", "medium", "high", "critical"] def _synthetic_vip_tier() -> str: """SYNTHETIC: No real VIP labels in source data.""" return random.choice(VIP_TIERS) def _synthetic_priority() -> str: """SYNTHETIC: Used only when real priority is missing.""" return random.choice(PRIORITIES) def _synthetic_handle_time() -> float: """SYNTHETIC: No real handle times in source data.""" return round(random.uniform(3.0, 90.0), 1) def _synthetic_churn(priority: str, vip_tier: str) -> bool: """SYNTHETIC: No real churn labels in source data. Churn probability increases with priority and VIP tier. """ base = 0.1 if priority in ("high", "critical"): base += 0.2 if vip_tier == "vip": base += 0.15 return random.random() < base def _make_case_id(source: str, index: int) -> str: """Deterministic case ID from source and index.""" raw = f"{source}:{index}" return f"case-{hashlib.md5(raw.encode()).hexdigest()[:8]}" # --------------------------------------------------------------------------- # Build from support tickets (Dataset 1) # --------------------------------------------------------------------------- def build_from_tickets(max_cases: int = 25) -> list[CaseBundle]: """Build case bundles from support ticket JSONL. Real fields used: body, answer, priority, language, queue, type, tag_1..tag_8 Synthetic fields: vip_tier, handle_time_minutes, churned_within_30d """ tickets_path = RAW_DIR / "support_tickets.jsonl" if not tickets_path.exists(): print(f"Warning: {tickets_path} not found. Run scripts/ingest_data.py first.") return [] cases = [] with open(tickets_path, encoding="utf-8") as f: for i, line in enumerate(f): if i >= max_cases: break row = json.loads(line) is_synthetic = row.get("_synthetic", False) # --- REAL fields --- ticket_text = row.get("body") or row.get("subject") or "" if not ticket_text.strip(): continue # Use real priority if valid, otherwise synthesize priority = (row.get("priority") or "").lower().strip() if priority not in ("low", "medium", "high", "critical"): priority = _synthetic_priority() # Use real language from dataset language = (row.get("language") or "").lower().strip() if not language: language = detect_language(ticket_text) # Use agent answer as conversation context (real) answer = row.get("answer", "") # Collect real tags for auditability real_tags = [] for tag_key in ["queue", "type"] + [f"tag_{j}" for j in range(1, 9)]: val = row.get(tag_key) if val and str(val).strip(): real_tags.append(f"{tag_key}={val}") # Build subject line for richer ticket text subject = row.get("subject", "") if subject and subject not in ticket_text: ticket_text = f"[{subject}]\n{ticket_text}" # --- SYNTHETIC fields (explicitly marked) --- vip_tier = _synthetic_vip_tier() handle_time = _synthetic_handle_time() churned = _synthetic_churn(priority, vip_tier) case = CaseBundle( case_id=_make_case_id("ticket", i), ticket_text=ticket_text, conversation_snippet=answer, email_thread=[], vip_tier=vip_tier, priority=priority, handle_time_minutes=handle_time, churned_within_30d=churned, source_dataset="support_tickets" + (" (synthetic)" if is_synthetic else " (real)"), language=language, ) cases.append(normalize_case(case)) real_count = sum(1 for c in cases if "(real)" in c.source_dataset) synth_count = sum(1 for c in cases if "(synthetic)" in c.source_dataset) print(f"Built {len(cases)} cases from support tickets ({real_count} real, {synth_count} synthetic)") return cases # --------------------------------------------------------------------------- # Build from Bitext dialogues (Dataset 2) # --------------------------------------------------------------------------- def build_from_bitext(max_cases: int = 15) -> list[CaseBundle]: """Build case bundles from Bitext dialogue JSONL. Real fields used: instruction, response, category, intent Synthetic fields: vip_tier, handle_time_minutes, churned_within_30d, priority """ bitext_path = RAW_DIR / "bitext_dialogues.jsonl" if not bitext_path.exists(): print(f"Info: {bitext_path} not found. Trying legacy samsum_conversations.jsonl...") return _build_from_samsum_legacy(max_cases) cases = [] with open(bitext_path, encoding="utf-8") as f: for i, line in enumerate(f): if i >= max_cases: break row = json.loads(line) is_synthetic = row.get("_synthetic", False) # --- REAL fields --- instruction = row.get("instruction", "") response = row.get("response", "") category = row.get("category", "").lower() intent = row.get("intent", "") if not instruction.strip(): continue # Build ticket text from real category + instruction ticket_text = f"[{category.upper()}] {instruction}" # Build conversation from instruction/response pair conversation = f"Customer: {instruction}\nAgent: {response}" # Map category to priority heuristic high_priority_categories = {"refund", "cancellation_fee", "complaint"} priority = _synthetic_priority() if any(kw in intent.lower() for kw in ["complain", "refund", "cancel"]): priority = random.choice(["high", "critical"]) # --- SYNTHETIC fields --- vip_tier = _synthetic_vip_tier() handle_time = _synthetic_handle_time() churned = _synthetic_churn(priority, vip_tier) case = CaseBundle( case_id=_make_case_id("bitext", i), ticket_text=ticket_text, conversation_snippet=conversation, email_thread=[], vip_tier=vip_tier, priority=priority, handle_time_minutes=handle_time, churned_within_30d=churned, source_dataset="bitext_dialogues" + (" (synthetic)" if is_synthetic else " (real)"), language="en", ) cases.append(normalize_case(case)) real_count = sum(1 for c in cases if "(real)" in c.source_dataset) synth_count = sum(1 for c in cases if "(synthetic)" in c.source_dataset) print(f"Built {len(cases)} cases from Bitext dialogues ({real_count} real, {synth_count} synthetic)") return cases def _build_from_samsum_legacy(max_cases: int = 15) -> list[CaseBundle]: """Fallback: build from legacy samsum_conversations.jsonl if bitext is unavailable.""" samsum_path = RAW_DIR / "samsum_conversations.jsonl" if not samsum_path.exists(): print(f"Warning: No dialogue data found. Run scripts/ingest_data.py first.") return [] cases = [] with open(samsum_path, encoding="utf-8") as f: for i, line in enumerate(f): if i >= max_cases: break row = json.loads(line) dialogue = row.get("dialogue", "") summary = row.get("summary", "") if not dialogue.strip(): continue vip_tier = _synthetic_vip_tier() priority = _synthetic_priority() handle_time = _synthetic_handle_time() case = CaseBundle( case_id=_make_case_id("samsum", i), ticket_text=f"Customer conversation summary: {summary}", conversation_snippet=dialogue, email_thread=[], vip_tier=vip_tier, priority=priority, handle_time_minutes=handle_time, churned_within_30d=_synthetic_churn(priority, vip_tier), source_dataset="samsum (synthetic)", language="en", ) cases.append(normalize_case(case)) print(f"Built {len(cases)} cases from SAMSum (legacy, all synthetic)") return cases # --------------------------------------------------------------------------- # Main builder # --------------------------------------------------------------------------- def build_all_cases() -> list[CaseBundle]: """Build all case bundles and save to data/cases/.""" CASES_DIR.mkdir(parents=True, exist_ok=True) # Clear existing cases for old in CASES_DIR.glob("*.json"): old.unlink() all_cases = [] all_cases.extend(build_from_tickets(max_cases=25)) all_cases.extend(build_from_bitext(max_cases=15)) if not all_cases: print("ERROR: No cases built. Ensure raw data exists in data/raw/.") print("Run: python scripts/ingest_data.py") return [] for case in all_cases: save_case_bundle(case, CASES_DIR) # Summary real_count = sum(1 for c in all_cases if "(real)" in c.source_dataset) synth_count = sum(1 for c in all_cases if "(synthetic)" in c.source_dataset) print(f"\nTotal: {len(all_cases)} case bundles saved to {CASES_DIR}/") print(f" Real source data: {real_count}") print(f" Synthetic fallback: {synth_count}") return all_cases if __name__ == "__main__": build_all_cases()