forward-deployed-ai-sim / scripts /build_cases.py
bobaoxu2001
Deploy forward-deployed AI simulation dashboard
c4fe0a4
"""Build 20-40 case bundles from raw datasets.
Each case bundle = one customer/incident/problem chain.
Field provenance (real vs synthetic):
REAL from Tobi-Bueck/customer-support-tickets:
- ticket_text (from body)
- email_thread (from answer)
- priority (from priority field)
- language (from language field)
- source_dataset tags: tag_1..tag_8, queue, type
REAL from Bitext dataset:
- conversation_snippet (from instruction + response)
- ticket_text (constructed from category + instruction)
SYNTHETIC (always):
- vip_tier — no real VIP labels available
- handle_time_minutes — no real handle times available
- churned_within_30d — no real churn labels available
Synthetic logic is deterministic (seed=42) and explicitly documented.
"""
import json
import random
import hashlib
from pathlib import Path
from pipeline.schemas import CaseBundle
from pipeline.normalize import normalize_case, detect_language
from pipeline.loaders import save_case_bundle
RAW_DIR = Path("data/raw")
CASES_DIR = Path("data/cases")
# Deterministic seed for reproducibility
random.seed(42)
# ---------------------------------------------------------------------------
# Synthetic augmentation (only for fields that have no real source)
# ---------------------------------------------------------------------------
VIP_TIERS = ["standard", "standard", "standard", "vip", "unknown"]
PRIORITIES = ["low", "medium", "medium", "high", "critical"]
def _synthetic_vip_tier() -> str:
"""SYNTHETIC: No real VIP labels in source data."""
return random.choice(VIP_TIERS)
def _synthetic_priority() -> str:
"""SYNTHETIC: Used only when real priority is missing."""
return random.choice(PRIORITIES)
def _synthetic_handle_time() -> float:
"""SYNTHETIC: No real handle times in source data."""
return round(random.uniform(3.0, 90.0), 1)
def _synthetic_churn(priority: str, vip_tier: str) -> bool:
"""SYNTHETIC: No real churn labels in source data.
Churn probability increases with priority and VIP tier.
"""
base = 0.1
if priority in ("high", "critical"):
base += 0.2
if vip_tier == "vip":
base += 0.15
return random.random() < base
def _make_case_id(source: str, index: int) -> str:
"""Deterministic case ID from source and index."""
raw = f"{source}:{index}"
return f"case-{hashlib.md5(raw.encode()).hexdigest()[:8]}"
# ---------------------------------------------------------------------------
# Build from support tickets (Dataset 1)
# ---------------------------------------------------------------------------
def build_from_tickets(max_cases: int = 25) -> list[CaseBundle]:
"""Build case bundles from support ticket JSONL.
Real fields used: body, answer, priority, language, queue, type, tag_1..tag_8
Synthetic fields: vip_tier, handle_time_minutes, churned_within_30d
"""
tickets_path = RAW_DIR / "support_tickets.jsonl"
if not tickets_path.exists():
print(f"Warning: {tickets_path} not found. Run scripts/ingest_data.py first.")
return []
cases = []
with open(tickets_path, encoding="utf-8") as f:
for i, line in enumerate(f):
if i >= max_cases:
break
row = json.loads(line)
is_synthetic = row.get("_synthetic", False)
# --- REAL fields ---
ticket_text = row.get("body") or row.get("subject") or ""
if not ticket_text.strip():
continue
# Use real priority if valid, otherwise synthesize
priority = (row.get("priority") or "").lower().strip()
if priority not in ("low", "medium", "high", "critical"):
priority = _synthetic_priority()
# Use real language from dataset
language = (row.get("language") or "").lower().strip()
if not language:
language = detect_language(ticket_text)
# Use agent answer as conversation context (real)
answer = row.get("answer", "")
# Collect real tags for auditability
real_tags = []
for tag_key in ["queue", "type"] + [f"tag_{j}" for j in range(1, 9)]:
val = row.get(tag_key)
if val and str(val).strip():
real_tags.append(f"{tag_key}={val}")
# Build subject line for richer ticket text
subject = row.get("subject", "")
if subject and subject not in ticket_text:
ticket_text = f"[{subject}]\n{ticket_text}"
# --- SYNTHETIC fields (explicitly marked) ---
vip_tier = _synthetic_vip_tier()
handle_time = _synthetic_handle_time()
churned = _synthetic_churn(priority, vip_tier)
case = CaseBundle(
case_id=_make_case_id("ticket", i),
ticket_text=ticket_text,
conversation_snippet=answer,
email_thread=[],
vip_tier=vip_tier,
priority=priority,
handle_time_minutes=handle_time,
churned_within_30d=churned,
source_dataset="support_tickets" + (" (synthetic)" if is_synthetic else " (real)"),
language=language,
)
cases.append(normalize_case(case))
real_count = sum(1 for c in cases if "(real)" in c.source_dataset)
synth_count = sum(1 for c in cases if "(synthetic)" in c.source_dataset)
print(f"Built {len(cases)} cases from support tickets ({real_count} real, {synth_count} synthetic)")
return cases
# ---------------------------------------------------------------------------
# Build from Bitext dialogues (Dataset 2)
# ---------------------------------------------------------------------------
def build_from_bitext(max_cases: int = 15) -> list[CaseBundle]:
"""Build case bundles from Bitext dialogue JSONL.
Real fields used: instruction, response, category, intent
Synthetic fields: vip_tier, handle_time_minutes, churned_within_30d, priority
"""
bitext_path = RAW_DIR / "bitext_dialogues.jsonl"
if not bitext_path.exists():
print(f"Info: {bitext_path} not found. Trying legacy samsum_conversations.jsonl...")
return _build_from_samsum_legacy(max_cases)
cases = []
with open(bitext_path, encoding="utf-8") as f:
for i, line in enumerate(f):
if i >= max_cases:
break
row = json.loads(line)
is_synthetic = row.get("_synthetic", False)
# --- REAL fields ---
instruction = row.get("instruction", "")
response = row.get("response", "")
category = row.get("category", "").lower()
intent = row.get("intent", "")
if not instruction.strip():
continue
# Build ticket text from real category + instruction
ticket_text = f"[{category.upper()}] {instruction}"
# Build conversation from instruction/response pair
conversation = f"Customer: {instruction}\nAgent: {response}"
# Map category to priority heuristic
high_priority_categories = {"refund", "cancellation_fee", "complaint"}
priority = _synthetic_priority()
if any(kw in intent.lower() for kw in ["complain", "refund", "cancel"]):
priority = random.choice(["high", "critical"])
# --- SYNTHETIC fields ---
vip_tier = _synthetic_vip_tier()
handle_time = _synthetic_handle_time()
churned = _synthetic_churn(priority, vip_tier)
case = CaseBundle(
case_id=_make_case_id("bitext", i),
ticket_text=ticket_text,
conversation_snippet=conversation,
email_thread=[],
vip_tier=vip_tier,
priority=priority,
handle_time_minutes=handle_time,
churned_within_30d=churned,
source_dataset="bitext_dialogues" + (" (synthetic)" if is_synthetic else " (real)"),
language="en",
)
cases.append(normalize_case(case))
real_count = sum(1 for c in cases if "(real)" in c.source_dataset)
synth_count = sum(1 for c in cases if "(synthetic)" in c.source_dataset)
print(f"Built {len(cases)} cases from Bitext dialogues ({real_count} real, {synth_count} synthetic)")
return cases
def _build_from_samsum_legacy(max_cases: int = 15) -> list[CaseBundle]:
"""Fallback: build from legacy samsum_conversations.jsonl if bitext is unavailable."""
samsum_path = RAW_DIR / "samsum_conversations.jsonl"
if not samsum_path.exists():
print(f"Warning: No dialogue data found. Run scripts/ingest_data.py first.")
return []
cases = []
with open(samsum_path, encoding="utf-8") as f:
for i, line in enumerate(f):
if i >= max_cases:
break
row = json.loads(line)
dialogue = row.get("dialogue", "")
summary = row.get("summary", "")
if not dialogue.strip():
continue
vip_tier = _synthetic_vip_tier()
priority = _synthetic_priority()
handle_time = _synthetic_handle_time()
case = CaseBundle(
case_id=_make_case_id("samsum", i),
ticket_text=f"Customer conversation summary: {summary}",
conversation_snippet=dialogue,
email_thread=[],
vip_tier=vip_tier,
priority=priority,
handle_time_minutes=handle_time,
churned_within_30d=_synthetic_churn(priority, vip_tier),
source_dataset="samsum (synthetic)",
language="en",
)
cases.append(normalize_case(case))
print(f"Built {len(cases)} cases from SAMSum (legacy, all synthetic)")
return cases
# ---------------------------------------------------------------------------
# Main builder
# ---------------------------------------------------------------------------
def build_all_cases() -> list[CaseBundle]:
"""Build all case bundles and save to data/cases/."""
CASES_DIR.mkdir(parents=True, exist_ok=True)
# Clear existing cases
for old in CASES_DIR.glob("*.json"):
old.unlink()
all_cases = []
all_cases.extend(build_from_tickets(max_cases=25))
all_cases.extend(build_from_bitext(max_cases=15))
if not all_cases:
print("ERROR: No cases built. Ensure raw data exists in data/raw/.")
print("Run: python scripts/ingest_data.py")
return []
for case in all_cases:
save_case_bundle(case, CASES_DIR)
# Summary
real_count = sum(1 for c in all_cases if "(real)" in c.source_dataset)
synth_count = sum(1 for c in all_cases if "(synthetic)" in c.source_dataset)
print(f"\nTotal: {len(all_cases)} case bundles saved to {CASES_DIR}/")
print(f" Real source data: {real_count}")
print(f" Synthetic fallback: {synth_count}")
return all_cases
if __name__ == "__main__":
build_all_cases()