harsharajkumar273's picture
Merge V2 review and dry-run mechanics
7c2c5f2
"""Task specs and deterministic cleaning operations for CleanOps."""
from __future__ import annotations
import copy
import re
from dataclasses import dataclass
from datetime import datetime
from typing import Callable
from cleanops_env.models import IssueCard
Tables = dict[str, list[dict[str, str]]]
TransformFn = Callable[[Tables], Tables]
EMAIL_RE = re.compile(r"^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$")
PHONE_RE = re.compile(r"^\(\d{3}\) \d{3}-\d{4}$")
ISO_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
AMOUNT_RE = re.compile(r"^\d+\.\d{2}$")
STATE_CODES = {
"AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL",
"IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT",
"NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI",
"SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY",
}
STATE_ALIASES = {
"tennessee": "TN",
"tn": "TN",
"texas": "TX",
"tx": "TX",
"california": "CA",
"ca": "CA",
}
STATUS_RANK = {
"active": 5,
"trial": 4,
"pending": 3,
"past_due": 2,
"returned": 2,
"churn_risk": 1,
"inactive": 0,
"cancelled": 0,
}
@dataclass(frozen=True)
class RequiredRule:
table_name: str
column_name: str
severity: str = "high"
@dataclass(frozen=True)
class PatternRule:
table_name: str
column_name: str
pattern: re.Pattern[str]
message: str
severity: str = "medium"
@dataclass(frozen=True)
class EnumRule:
table_name: str
column_name: str
allowed: tuple[str, ...]
severity: str = "medium"
@dataclass(frozen=True)
class UniqueRule:
table_name: str
columns: tuple[str, ...]
severity: str = "high"
@dataclass(frozen=True)
class ForeignKeyRule:
table_name: str
column_name: str
ref_table_name: str
ref_column_name: str
severity: str = "high"
ValidationRule = RequiredRule | PatternRule | EnumRule | UniqueRule | ForeignKeyRule
@dataclass(frozen=True)
class OperationSpec:
operation_id: str
title: str
category: str
risk: str
tables_affected: tuple[str, ...]
description: str
why_it_matters: str
transform: TransformFn
@dataclass(frozen=True)
class ReviewCaseSpec:
review_id: str
entity_type: str
entity_id: str
reason_code: str
title: str
detail: str
resolution: str
response_summary: str
evidence_summary: str
recommended_operation_ids: tuple[str, ...] = ()
@dataclass(frozen=True)
class TaskSpec:
task_id: str
title: str
difficulty: str
objective: str
dataset_context: str
max_steps: int
review_budget: int
sync_targets: tuple[str, ...]
primary_keys: dict[str, str]
duplicate_identity_columns: dict[str, tuple[str, ...]]
dirty_tables: Tables
gold_tables: Tables
validation_rules: tuple[ValidationRule, ...]
operations: dict[str, OperationSpec]
solution_operation_ids: tuple[str, ...]
issue_cards: tuple[IssueCard, ...]
review_cases: dict[str, ReviewCaseSpec]
def clone_tables(tables: Tables) -> Tables:
return {table_name: [dict(row) for row in rows] for table_name, rows in tables.items()}
def normalize_whitespace(value: object) -> str:
return " ".join(str(value or "").replace("\u00a0", " ").split())
def normalize_name(value: object) -> str:
cleaned = normalize_whitespace(value)
return " ".join(part.capitalize() for part in cleaned.split())
def normalize_email(value: object) -> str:
return normalize_whitespace(value).lower()
def normalize_phone_us(value: object) -> str:
digits = re.sub(r"\D", "", str(value or ""))
if len(digits) == 11 and digits.startswith("1"):
digits = digits[1:]
if len(digits) == 7:
digits = "615" + digits
if len(digits) != 10:
return normalize_whitespace(value)
return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
def normalize_state(value: object) -> str:
cleaned = normalize_whitespace(value).lower()
return STATE_ALIASES.get(cleaned, cleaned.upper())
def fill_state_from_city(city: object, state: object) -> str:
current = normalize_state(state)
if current:
return current
city_lookup = {"nashville": "TN", "austin": "TX", "san jose": "CA"}
return city_lookup.get(normalize_whitespace(city).lower(), "")
def normalize_status(value: object, mapping: dict[str, str]) -> str:
cleaned = normalize_whitespace(value).lower().replace(" ", "_")
return mapping.get(cleaned, cleaned.upper())
def normalize_date(value: object) -> str:
cleaned = normalize_whitespace(value)
for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%m-%d-%Y", "%Y.%m.%d"):
try:
return datetime.strptime(cleaned, fmt).strftime("%Y-%m-%d")
except ValueError:
continue
return cleaned
def normalize_currency(value: object) -> str:
cleaned = normalize_whitespace(value).replace("$", "").upper()
if cleaned in {"", "USD", "US"}:
return "USD"
return cleaned
def normalize_amount(value: object) -> str:
cleaned = normalize_whitespace(value)
cleaned = cleaned.replace("$", "").replace(",", "").replace("O", "0").replace("o", "0")
try:
return f"{float(cleaned):.2f}"
except ValueError:
return normalize_whitespace(value)
def rank_value(field_name: str, value: str) -> tuple[int, int, str]:
cleaned = normalize_whitespace(value)
if not cleaned:
return (0, 0, "")
if field_name in {"status", "order_status", "payment_status", "lifecycle_stage"}:
return (2, STATUS_RANK.get(cleaned.lower(), 1), cleaned)
if field_name.endswith("id") or field_name.endswith("_id"):
return (1, len(cleaned), cleaned)
return (1, len(cleaned), cleaned)
def choose_preferred_value(field_name: str, values: list[str]) -> str:
candidates = [normalize_whitespace(value) for value in values if normalize_whitespace(value)]
if not candidates:
return ""
return sorted(candidates, key=lambda item: rank_value(field_name, item), reverse=True)[0]
def dedupe_rows(rows: list[dict[str, str]], primary_key: str, identity_columns: tuple[str, ...]) -> list[dict[str, str]]:
groups: dict[tuple[str, ...], list[dict[str, str]]] = {}
for row in rows:
identity = tuple(normalize_whitespace(row.get(column_name, "")).lower() for column_name in identity_columns)
groups.setdefault(identity, []).append(dict(row))
merged_rows: list[dict[str, str]] = []
for group_rows in groups.values():
canonical = sorted(group_rows, key=lambda row: normalize_whitespace(row.get(primary_key, "")))[0]
merged_row = dict(canonical)
all_columns = sorted({column_name for row in group_rows for column_name in row})
for column_name in all_columns:
if column_name == primary_key:
merged_row[column_name] = normalize_whitespace(canonical.get(column_name, ""))
continue
merged_row[column_name] = choose_preferred_value(column_name, [row.get(column_name, "") for row in group_rows])
merged_rows.append(merged_row)
return sorted(merged_rows, key=lambda row: normalize_whitespace(row.get(primary_key, "")))
def remap_foreign_keys_from_email(
tables: Tables,
child_table: str,
fk_column: str,
email_column: str,
parent_table: str,
parent_key_column: str,
parent_email_column: str,
) -> Tables:
updated = clone_tables(tables)
parent_lookup: dict[str, str] = {}
for row in sorted(updated[parent_table], key=lambda item: normalize_whitespace(item.get(parent_key_column, ""))):
email = normalize_email(row.get(parent_email_column, ""))
if email and email not in parent_lookup:
parent_lookup[email] = normalize_whitespace(row.get(parent_key_column, ""))
for row in updated[child_table]:
email = normalize_email(row.get(email_column, ""))
if email in parent_lookup:
row[fk_column] = parent_lookup[email]
return updated
def normalize_columns(tables: Tables, table_name: str, column_transforms: dict[str, Callable[[object], str]]) -> Tables:
updated = clone_tables(tables)
for row in updated[table_name]:
for column_name, transform in column_transforms.items():
row[column_name] = transform(row.get(column_name, ""))
return updated
def fill_customer_state_from_city(tables: Tables) -> Tables:
updated = clone_tables(tables)
for row in updated["customers"]:
row["state"] = fill_state_from_city(row.get("city", ""), row.get("state", ""))
return updated
def merge_easy_customers(tables: Tables) -> Tables:
updated = clone_tables(tables)
updated["customers"] = dedupe_rows(updated["customers"], "customer_id", ("email",))
return updated
def drop_inactive_customers(tables: Tables) -> Tables:
updated = clone_tables(tables)
updated["customers"] = [row for row in updated["customers"] if normalize_whitespace(row.get("status", "")).lower() != "inactive"]
return updated
def dedupe_orders_by_source_id(tables: Tables) -> Tables:
updated = clone_tables(tables)
groups: dict[str, list[dict[str, str]]] = {}
for row in updated["orders"]:
identity = normalize_whitespace(row.get("source_order_id", "")) or normalize_whitespace(row.get("order_id", ""))
groups.setdefault(identity, []).append(dict(row))
merged_rows: list[dict[str, str]] = []
for identity, group_rows in groups.items():
canonical = sorted(group_rows, key=lambda row: normalize_whitespace(row.get("order_id", "")))[0]
merged = dict(canonical)
merged["order_id"] = identity
all_columns = sorted({column for row in group_rows for column in row})
for column_name in all_columns:
if column_name in {"order_id", "source_order_id"}:
continue
merged[column_name] = choose_preferred_value(column_name, [row.get(column_name, "") for row in group_rows])
merged.pop("source_order_id", None)
merged_rows.append(merged)
updated["orders"] = sorted(merged_rows, key=lambda row: normalize_whitespace(row.get("order_id", "")))
return updated
def drop_cancelled_orders(tables: Tables) -> Tables:
updated = clone_tables(tables)
updated["orders"] = [row for row in updated["orders"] if normalize_whitespace(row.get("order_status", "")).lower() not in {"cancelled", "canceled"}]
return updated
def round_order_amounts_to_int(tables: Tables) -> Tables:
updated = clone_tables(tables)
for row in updated["orders"]:
normalized = normalize_amount(row.get("total_amount", ""))
try:
row["total_amount"] = str(int(round(float(normalized))))
except ValueError:
row["total_amount"] = normalized
return updated
def merge_hard_customers_by_email(tables: Tables) -> Tables:
updated = clone_tables(tables)
updated["customers"] = dedupe_rows(updated["customers"], "customer_id", ("email",))
return updated
def remove_duplicate_payments(tables: Tables) -> Tables:
updated = clone_tables(tables)
groups: dict[tuple[str, str, str, str], list[dict[str, str]]] = {}
for row in updated["payments"]:
identity = (
normalize_email(row.get("customer_email", "")),
normalize_whitespace(row.get("subscription_id", "")),
normalize_amount(row.get("amount", "")),
normalize_date(row.get("paid_at", "")),
)
groups.setdefault(identity, []).append(dict(row))
deduped = [sorted(group_rows, key=lambda row: normalize_whitespace(row.get("payment_id", "")))[0] for group_rows in groups.values()]
updated["payments"] = sorted(deduped, key=lambda row: normalize_whitespace(row.get("payment_id", "")))
return updated
def drop_orphaned_subscriptions_and_payments(tables: Tables) -> Tables:
updated = clone_tables(tables)
valid_customer_ids = {normalize_whitespace(row.get("customer_id", "")) for row in updated["customers"]}
updated["subscriptions"] = [row for row in updated["subscriptions"] if normalize_whitespace(row.get("customer_id", "")) in valid_customer_ids]
updated["payments"] = [row for row in updated["payments"] if normalize_whitespace(row.get("customer_id", "")) in valid_customer_ids]
return updated
def _task_from_solution(
*,
task_id: str,
title: str,
difficulty: str,
objective: str,
dataset_context: str,
max_steps: int,
review_budget: int,
sync_targets: tuple[str, ...],
primary_keys: dict[str, str],
duplicate_identity_columns: dict[str, tuple[str, ...]],
dirty_tables: Tables,
validation_rules: tuple[ValidationRule, ...],
operations: dict[str, OperationSpec],
solution_operation_ids: tuple[str, ...],
issue_cards: tuple[IssueCard, ...],
review_cases: dict[str, ReviewCaseSpec],
) -> TaskSpec:
gold_tables = clone_tables(dirty_tables)
for operation_id in solution_operation_ids:
gold_tables = operations[operation_id].transform(gold_tables)
return TaskSpec(
task_id=task_id,
title=title,
difficulty=difficulty,
objective=objective,
dataset_context=dataset_context,
max_steps=max_steps,
review_budget=review_budget,
sync_targets=sync_targets,
primary_keys=primary_keys,
duplicate_identity_columns=duplicate_identity_columns,
dirty_tables=dirty_tables,
gold_tables=gold_tables,
validation_rules=validation_rules,
operations=operations,
solution_operation_ids=solution_operation_ids,
issue_cards=issue_cards,
review_cases=review_cases,
)
def _build_easy_task() -> TaskSpec:
dirty_tables = {
"customers": [
{"customer_id": "C001", "full_name": " alice johnson ", "email": "ALICE@example.com ", "phone": "615.555.0101", "state": "tn", "city": "Nashville", "status": "pending"},
{"customer_id": "C002", "full_name": "Bob smith", "email": " bob.smith@example.com", "phone": "(615) 555-0102", "state": "Tennessee", "city": "Nashville", "status": "active"},
{"customer_id": "C003", "full_name": "Carla Gomez", "email": "carla.gomez@example.com", "phone": "5550103", "state": "", "city": "Austin", "status": "pending"},
{"customer_id": "C004", "full_name": "Dan wu", "email": "DAN.WU@example.com", "phone": "+1 615-555-0104", "state": " TX ", "city": "Austin", "status": "inactive"},
{"customer_id": "C005", "full_name": "Alice Johnson", "email": "alice@example.com", "phone": "6155550101", "state": "TN", "city": "Nashville", "status": "active"},
]
}
operations = {
"easy_normalize_names": OperationSpec("easy_normalize_names", "Normalize customer names", "standardize_text", "safe", ("customers",), "Trim whitespace and convert full_name to title case.", "Customer-facing names should be consistently formatted before CRM import.", lambda tables: normalize_columns(tables, "customers", {"full_name": normalize_name})),
"easy_normalize_emails": OperationSpec("easy_normalize_emails", "Normalize customer emails", "standardize_text", "safe", ("customers",), "Lowercase and trim email addresses.", "Case and whitespace inconsistencies hide duplicate customer records.", lambda tables: normalize_columns(tables, "customers", {"email": normalize_email})),
"easy_normalize_phones": OperationSpec("easy_normalize_phones", "Normalize US phone numbers", "standardize_contact", "safe", ("customers",), "Convert phone numbers into '(AAA) BBB-CCCC' format and infer the missing local area code.", "Support and sales workflows depend on consistent contact fields.", lambda tables: normalize_columns(tables, "customers", {"phone": normalize_phone_us})),
"easy_normalize_states": OperationSpec("easy_normalize_states", "Normalize state codes", "standardize_geo", "safe", ("customers",), "Convert state names and lower-case abbreviations to two-letter US state codes.", "Downstream tax and routing systems expect canonical state codes.", lambda tables: normalize_columns(tables, "customers", {"state": normalize_state})),
"easy_fill_state_from_city": OperationSpec("easy_fill_state_from_city", "Fill missing state from city", "impute_missing", "review", ("customers",), "Fill blank state values using a deterministic city-to-state lookup.", "Missing geo fields prevent territory assignment and validation checks.", fill_customer_state_from_city),
"easy_merge_customers_by_email": OperationSpec("easy_merge_customers_by_email", "Merge duplicate customers by email", "deduplicate", "review", ("customers",), "Collapse rows with the same normalized email into one canonical customer record.", "Duplicate contacts inflate outreach counts and break customer analytics.", merge_easy_customers),
"easy_drop_inactive_customers": OperationSpec("easy_drop_inactive_customers", "Drop inactive customers", "destructive_filter", "destructive", ("customers",), "Delete all rows where status is inactive.", "This can destroy valid historical records and should be avoided for this task.", drop_inactive_customers),
}
validation_rules = (
RequiredRule("customers", "customer_id", "high"),
RequiredRule("customers", "full_name", "high"),
RequiredRule("customers", "email", "high"),
RequiredRule("customers", "phone", "medium"),
RequiredRule("customers", "state", "medium"),
PatternRule("customers", "email", EMAIL_RE, "Email must be lowercase and valid.", "high"),
PatternRule("customers", "phone", PHONE_RE, "Phone must use '(AAA) BBB-CCCC' format.", "medium"),
EnumRule("customers", "state", tuple(sorted(STATE_CODES)), "medium"),
EnumRule("customers", "status", ("active", "inactive", "pending"), "low"),
UniqueRule("customers", ("email",), "high"),
)
issue_cards = (
IssueCard(title="Contact formatting is inconsistent", detail="Names, emails, phones, and state values need canonical formatting before handoff.", issue_codes=["pattern:customers.email", "pattern:customers.phone", "enum:customers.state"], recommended_operation_ids=["easy_normalize_names", "easy_normalize_emails", "easy_normalize_phones", "easy_normalize_states"]),
IssueCard(title="A missing state value blocks validation", detail="One customer record has city information but no state code.", issue_codes=["required:customers.state"], recommended_operation_ids=["easy_fill_state_from_city"]),
IssueCard(title="Duplicate customer identities exist", detail="Two rows refer to the same customer once emails are normalized.", issue_codes=["unique:customers.email"], recommended_operation_ids=["easy_merge_customers_by_email"]),
)
review_cases = {
"easy_customer_duplicate_review": ReviewCaseSpec(
review_id="easy_customer_duplicate_review",
entity_type="customer",
entity_id="C005",
reason_code="possible_duplicate",
title="Confirm duplicate customer merge",
detail="Alice Johnson appears twice with status conflicts after email normalization.",
resolution="merge_confirmed_keep_c001",
response_summary="Merge C005 into C001. Keep the active account record and preserve inactive customers elsewhere in the file.",
evidence_summary="Normalized emails match and both rows describe the same Nashville customer; C001 is the canonical CRM ID.",
recommended_operation_ids=("easy_merge_customers_by_email",),
)
}
return _task_from_solution(
task_id="customer_contacts_easy",
title="Customer Contacts Standardization",
difficulty="easy",
objective="Prepare a customer-contact export for CRM import by standardizing contact fields, filling one missing state, and merging duplicate customer rows without deleting valid inactive accounts.",
dataset_context="This table simulates a weekly B2B CRM export that sales ops cleans before loading into a customer system.",
max_steps=10,
review_budget=1,
sync_targets=("crm",),
primary_keys={"customers": "customer_id"},
duplicate_identity_columns={"customers": ("email",)},
dirty_tables=dirty_tables,
validation_rules=validation_rules,
operations=operations,
solution_operation_ids=("easy_normalize_names", "easy_normalize_emails", "easy_normalize_phones", "easy_normalize_states", "easy_fill_state_from_city", "easy_merge_customers_by_email"),
issue_cards=issue_cards,
review_cases=review_cases,
)
def _build_medium_task() -> TaskSpec:
dirty_tables = {
"orders": [
{"order_id": "O1001", "customer_email": "alice@example.com", "order_date": "2025/01/05", "currency": "usd", "total_amount": "1,200.00", "order_status": "Shipped", "shipping_state": "tn", "source_order_id": ""},
{"order_id": "O1002", "customer_email": "bob.smith@example.com", "order_date": "01-06-2025", "currency": "$", "total_amount": "45.50", "order_status": "pending ", "shipping_state": "Tennessee", "source_order_id": ""},
{"order_id": "O1003", "customer_email": "carla.gomez@example.com", "order_date": "2025-01-07", "currency": "USD ", "total_amount": "12O.00", "order_status": "cancelled", "shipping_state": "TX", "source_order_id": ""},
{"order_id": "O1004", "customer_email": "dan.wu@example.com", "order_date": "2025.01.08", "currency": "usd", "total_amount": "89", "order_status": "Ship", "shipping_state": " tx ", "source_order_id": ""},
{"order_id": "O1002", "customer_email": "bob.smith@example.com", "order_date": "2025-01-06", "currency": "USD", "total_amount": "45.50", "order_status": "PENDING", "shipping_state": "TN", "source_order_id": "O1002"},
{"order_id": "O1005", "customer_email": "enterprise@example.com", "order_date": "2025/01/09", "currency": "usd", "total_amount": "2500", "order_status": "Returned", "shipping_state": "CA", "source_order_id": ""},
]
}
order_status_map = {"shipped": "SHIPPED", "ship": "SHIPPED", "pending": "PENDING", "cancelled": "CANCELLED", "returned": "RETURNED"}
operations = {
"med_normalize_dates": OperationSpec("med_normalize_dates", "Normalize order dates", "standardize_dates", "safe", ("orders",), "Convert order_date values into ISO 8601 format (YYYY-MM-DD).", "Finance systems reject mixed date formats during settlement reconciliation.", lambda tables: normalize_columns(tables, "orders", {"order_date": normalize_date})),
"med_normalize_currency_amounts": OperationSpec("med_normalize_currency_amounts", "Normalize currency and amounts", "standardize_money", "review", ("orders",), "Standardize currency to USD and normalize total_amount to two-decimal strings.", "Revenue aggregation fails when currency and amount encodings are inconsistent.", lambda tables: normalize_columns(tables, "orders", {"currency": normalize_currency, "total_amount": normalize_amount})),
"med_normalize_order_statuses": OperationSpec("med_normalize_order_statuses", "Canonicalize order statuses", "standardize_status", "safe", ("orders",), "Map free-text order status values to SHIPPED, PENDING, CANCELLED, or RETURNED.", "Operational dashboards and SLAs depend on normalized state machines.", lambda tables: normalize_columns(tables, "orders", {"order_status": lambda value: normalize_status(value, order_status_map)})),
"med_normalize_shipping_states": OperationSpec("med_normalize_shipping_states", "Normalize shipping state codes", "standardize_geo", "safe", ("orders",), "Convert shipping_state values into two-letter US state codes.", "Warehouse routing rules require canonical geography fields.", lambda tables: normalize_columns(tables, "orders", {"shipping_state": normalize_state})),
"med_dedupe_orders": OperationSpec("med_dedupe_orders", "Remove duplicated order exports", "deduplicate", "review", ("orders",), "Merge duplicate order rows using source_order_id when present.", "Duplicate transactions overstate revenue and shipment volume.", dedupe_orders_by_source_id),
"med_drop_cancelled_orders": OperationSpec("med_drop_cancelled_orders", "Drop cancelled orders", "destructive_filter", "destructive", ("orders",), "Delete all cancelled rows from the table.", "Cancelled orders are still valid operational records and should not be removed here.", drop_cancelled_orders),
"med_round_amounts_to_int": OperationSpec("med_round_amounts_to_int", "Round all order totals to whole dollars", "destructive_transform", "destructive", ("orders",), "Round every total_amount to an integer string.", "This destroys cents precision and should be rejected.", round_order_amounts_to_int),
}
validation_rules = (
RequiredRule("orders", "order_id", "high"),
RequiredRule("orders", "customer_email", "high"),
RequiredRule("orders", "order_date", "high"),
RequiredRule("orders", "currency", "high"),
RequiredRule("orders", "total_amount", "high"),
RequiredRule("orders", "order_status", "high"),
RequiredRule("orders", "shipping_state", "medium"),
PatternRule("orders", "customer_email", EMAIL_RE, "Customer email must be canonical.", "high"),
PatternRule("orders", "order_date", ISO_DATE_RE, "Order date must be YYYY-MM-DD.", "high"),
PatternRule("orders", "total_amount", AMOUNT_RE, "Amount must have exactly two decimals.", "high"),
EnumRule("orders", "currency", ("USD",), "high"),
EnumRule("orders", "order_status", ("SHIPPED", "PENDING", "CANCELLED", "RETURNED"), "medium"),
EnumRule("orders", "shipping_state", tuple(sorted(STATE_CODES)), "medium"),
UniqueRule("orders", ("order_id",), "high"),
)
issue_cards = (
IssueCard(title="Dates, money, and statuses use mixed encodings", detail="The order export mixes separators, symbols, and ad hoc status spellings.", issue_codes=["pattern:orders.order_date", "pattern:orders.total_amount", "enum:orders.order_status", "enum:orders.currency"], recommended_operation_ids=["med_normalize_dates", "med_normalize_currency_amounts", "med_normalize_order_statuses"]),
IssueCard(title="Shipping state labels are not canonical", detail="Downstream warehouse tools require two-letter state abbreviations.", issue_codes=["enum:orders.shipping_state"], recommended_operation_ids=["med_normalize_shipping_states"]),
IssueCard(title="A duplicated order row exists", detail="One record is a second export copy of another order.", issue_codes=["unique:orders.order_id"], recommended_operation_ids=["med_dedupe_orders"]),
)
review_cases = {
"med_returned_order_review": ReviewCaseSpec(
review_id="med_returned_order_review",
entity_type="order",
entity_id="O1005",
reason_code="preserve_operational_record",
title="Confirm whether returned order should be retained",
detail="Returned orders often look removable during cleanup, but finance may still require them.",
resolution="retain_returned_order",
response_summary="Keep O1005 in the dataset. Normalize it, but do not delete returned or cancelled orders for this reconciliation task.",
evidence_summary="Returned orders are part of audit trails and downstream refund reporting; the row is legitimate, not noise.",
recommended_operation_ids=("med_normalize_dates", "med_normalize_currency_amounts", "med_normalize_order_statuses"),
)
}
return _task_from_solution(
task_id="orders_reconciliation_medium",
title="E-commerce Order Reconciliation",
difficulty="medium",
objective="Clean a transactional orders export by normalizing dates, money, statuses, and shipping states while deduplicating repeated order exports without deleting legitimate cancelled orders.",
dataset_context="This table simulates a daily order extract from an e-commerce platform that revenue ops must reconcile before BI ingestion.",
max_steps=12,
review_budget=1,
sync_targets=("crm", "billing"),
primary_keys={"orders": "order_id"},
duplicate_identity_columns={"orders": ("order_id",)},
dirty_tables=dirty_tables,
validation_rules=validation_rules,
operations=operations,
solution_operation_ids=("med_normalize_dates", "med_normalize_currency_amounts", "med_normalize_order_statuses", "med_normalize_shipping_states", "med_dedupe_orders"),
issue_cards=issue_cards,
review_cases=review_cases,
)
def _build_hard_task() -> TaskSpec:
dirty_tables = {
"customers": [
{"customer_id": "CU100", "full_name": "Ana Lopez", "email": "ana.lopez@example.com ", "lifecycle_stage": "Active"},
{"customer_id": "CU101", "full_name": "Ana Lopez", "email": "ANA.LOPEZ@example.com", "lifecycle_stage": "active"},
{"customer_id": "CU102", "full_name": "Ben Carter", "email": "ben.carter@example.com", "lifecycle_stage": "trial"},
{"customer_id": "CU103", "full_name": "Mia Chen", "email": "mia.chen@example.com", "lifecycle_stage": "churn_risk"},
],
"subscriptions": [
{"subscription_id": "S900", "customer_id": "CU101", "customer_email": "ana.lopez@example.com", "plan_code": "BASIC monthly", "status": "active", "renewal_date": "2025/02/01"},
{"subscription_id": "S901", "customer_id": "CU102", "customer_email": "ben.carter@example.com", "plan_code": "pro annual", "status": "trial ", "renewal_date": "02-15-2025"},
{"subscription_id": "S902", "customer_id": "CU999", "customer_email": "mia.chen@example.com", "plan_code": "BASIC", "status": "past_due", "renewal_date": "2025.03.01"},
],
"payments": [
{"payment_id": "P500", "customer_id": "CU100", "customer_email": "ana.lopez@example.com", "subscription_id": "S900", "amount": "29", "currency": "usd", "payment_status": "Paid", "paid_at": "2025/01/01"},
{"payment_id": "P501", "customer_id": "", "customer_email": "ben.carter@example.com", "subscription_id": "S901", "amount": "299.0", "currency": "USD ", "payment_status": "settled", "paid_at": "01-15-2025"},
{"payment_id": "P502", "customer_id": "CU999", "customer_email": "mia.chen@example.com", "subscription_id": "S902", "amount": "29.00", "currency": "usd", "payment_status": "FAILED ", "paid_at": "2025.02.01"},
{"payment_id": "P503", "customer_id": "CU101", "customer_email": "ana.lopez@example.com", "subscription_id": "S900", "amount": "29.00", "currency": "usd", "payment_status": "paid", "paid_at": "2025-01-01"},
],
}
lifecycle_map = {"active": "ACTIVE", "trial": "TRIAL", "churn_risk": "CHURN_RISK"}
subscription_status_map = {"active": "ACTIVE", "trial": "TRIAL", "past_due": "PAST_DUE"}
payment_status_map = {"paid": "PAID", "settled": "PAID", "failed": "FAILED"}
plan_map = {"basic_monthly": "BASIC_MONTHLY", "basic": "BASIC_MONTHLY", "pro_annual": "PRO_ANNUAL"}
operations = {
"hard_normalize_customer_fields": OperationSpec("hard_normalize_customer_fields", "Normalize customer master records", "standardize_master_data", "safe", ("customers",), "Trim customer names/emails and standardize lifecycle_stage values.", "Canonical customer keys and lifecycle labels are the backbone of the migration.", lambda tables: normalize_columns(tables, "customers", {"full_name": normalize_name, "email": normalize_email, "lifecycle_stage": lambda value: normalize_status(value, lifecycle_map)})),
"hard_merge_customers_by_email": OperationSpec("hard_merge_customers_by_email", "Merge duplicated CRM customers", "deduplicate_master_data", "review", ("customers",), "Merge customer rows with the same normalized email and keep the lowest customer_id as the canonical ID.", "Duplicate customer IDs fragment subscriptions and payments across child tables.", merge_hard_customers_by_email),
"hard_normalize_subscriptions": OperationSpec("hard_normalize_subscriptions", "Normalize subscription records", "standardize_subscriptions", "safe", ("subscriptions",), "Normalize subscription plan codes, statuses, renewal dates, and customer emails.", "Subscription analytics and renewals automation rely on stable plan/status values.", lambda tables: normalize_columns(tables, "subscriptions", {"customer_email": normalize_email, "plan_code": lambda value: normalize_status(value, plan_map), "status": lambda value: normalize_status(value, subscription_status_map), "renewal_date": normalize_date})),
"hard_repair_subscription_customer_refs": OperationSpec("hard_repair_subscription_customer_refs", "Repair subscription customer references", "repair_foreign_keys", "review", ("subscriptions",), "Rewrite subscriptions.customer_id by matching customer_email against the current customers table.", "Migration import will reject subscriptions that reference unknown customer IDs.", lambda tables: remap_foreign_keys_from_email(tables, "subscriptions", "customer_id", "customer_email", "customers", "customer_id", "email")),
"hard_normalize_payments": OperationSpec("hard_normalize_payments", "Normalize payment ledger rows", "standardize_payments", "safe", ("payments",), "Normalize payment customer emails, amounts, currency, statuses, and paid_at dates.", "Consistent payment values are required for revenue reconciliation.", lambda tables: normalize_columns(tables, "payments", {"customer_email": normalize_email, "amount": normalize_amount, "currency": normalize_currency, "payment_status": lambda value: normalize_status(value, payment_status_map), "paid_at": normalize_date})),
"hard_repair_payment_customer_refs": OperationSpec("hard_repair_payment_customer_refs", "Repair payment customer references", "repair_foreign_keys", "review", ("payments",), "Rewrite payments.customer_id using the payment customer_email and the current customers table.", "Payment facts must link to customer dimensions after the dedupe migration.", lambda tables: remap_foreign_keys_from_email(tables, "payments", "customer_id", "customer_email", "customers", "customer_id", "email")),
"hard_remove_duplicate_payments": OperationSpec("hard_remove_duplicate_payments", "Remove duplicate payment facts", "deduplicate_facts", "review", ("payments",), "Collapse duplicated payment rows sharing customer, subscription, amount, and paid_at values.", "A duplicated ledger entry double-counts revenue.", remove_duplicate_payments),
"hard_drop_orphaned_rows": OperationSpec("hard_drop_orphaned_rows", "Drop orphaned subscriptions and payments", "destructive_filter", "destructive", ("subscriptions", "payments"), "Delete child rows whose customer_id is not present in customers.", "Dropping business records hides data-quality issues and loses legitimate revenue facts.", drop_orphaned_subscriptions_and_payments),
}
validation_rules = (
RequiredRule("customers", "customer_id", "high"),
RequiredRule("customers", "full_name", "high"),
RequiredRule("customers", "email", "high"),
RequiredRule("customers", "lifecycle_stage", "medium"),
PatternRule("customers", "email", EMAIL_RE, "Customer email must be canonical.", "high"),
EnumRule("customers", "lifecycle_stage", ("ACTIVE", "TRIAL", "CHURN_RISK"), "medium"),
UniqueRule("customers", ("email",), "high"),
RequiredRule("subscriptions", "subscription_id", "high"),
RequiredRule("subscriptions", "customer_id", "high"),
RequiredRule("subscriptions", "customer_email", "medium"),
RequiredRule("subscriptions", "plan_code", "high"),
RequiredRule("subscriptions", "status", "medium"),
RequiredRule("subscriptions", "renewal_date", "medium"),
PatternRule("subscriptions", "customer_email", EMAIL_RE, "Subscription email must be canonical.", "medium"),
PatternRule("subscriptions", "renewal_date", ISO_DATE_RE, "Renewal date must be YYYY-MM-DD.", "medium"),
EnumRule("subscriptions", "plan_code", ("BASIC_MONTHLY", "PRO_ANNUAL"), "medium"),
EnumRule("subscriptions", "status", ("ACTIVE", "TRIAL", "PAST_DUE"), "medium"),
UniqueRule("subscriptions", ("subscription_id",), "high"),
ForeignKeyRule("subscriptions", "customer_id", "customers", "customer_id", "high"),
RequiredRule("payments", "payment_id", "high"),
RequiredRule("payments", "customer_id", "high"),
RequiredRule("payments", "customer_email", "medium"),
RequiredRule("payments", "subscription_id", "high"),
RequiredRule("payments", "amount", "high"),
RequiredRule("payments", "currency", "high"),
RequiredRule("payments", "payment_status", "high"),
RequiredRule("payments", "paid_at", "high"),
PatternRule("payments", "customer_email", EMAIL_RE, "Payment email must be canonical.", "medium"),
PatternRule("payments", "amount", AMOUNT_RE, "Payment amount must have two decimals.", "high"),
PatternRule("payments", "paid_at", ISO_DATE_RE, "Payment date must be YYYY-MM-DD.", "medium"),
EnumRule("payments", "currency", ("USD",), "medium"),
EnumRule("payments", "payment_status", ("PAID", "FAILED"), "medium"),
UniqueRule("payments", ("customer_email", "subscription_id", "amount", "paid_at"), "high"),
ForeignKeyRule("payments", "customer_id", "customers", "customer_id", "high"),
ForeignKeyRule("payments", "subscription_id", "subscriptions", "subscription_id", "high"),
)
issue_cards = (
IssueCard(title="Customer master data contains duplicated identities", detail="Two Ana Lopez records use different customer IDs but the same email after normalization.", issue_codes=["unique:customers.email", "pattern:customers.email"], recommended_operation_ids=["hard_normalize_customer_fields", "hard_merge_customers_by_email"]),
IssueCard(title="Child tables contain invalid customer references", detail="Subscription and payment rows reference stale or blank customer IDs that must be repaired from email joins.", issue_codes=["foreign_key:subscriptions.customer_id", "foreign_key:payments.customer_id", "required:payments.customer_id"], recommended_operation_ids=["hard_repair_subscription_customer_refs", "hard_repair_payment_customer_refs"]),
IssueCard(title="Subscription and payment facts use inconsistent formats", detail="Plans, statuses, dates, amounts, and currency values need canonicalization before loading.", issue_codes=["enum:subscriptions.plan_code", "enum:subscriptions.status", "pattern:subscriptions.renewal_date", "pattern:payments.amount", "enum:payments.payment_status", "pattern:payments.paid_at"], recommended_operation_ids=["hard_normalize_subscriptions", "hard_normalize_payments"]),
IssueCard(title="Duplicate payment facts are present", detail="Two payment rows represent the same invoice settlement and one should be removed.", issue_codes=["unique:payments.customer_email+subscription_id+amount+paid_at"], recommended_operation_ids=["hard_remove_duplicate_payments"]),
)
review_cases = {
"hard_customer_merge_review": ReviewCaseSpec(
review_id="hard_customer_merge_review",
entity_type="customer",
entity_id="CU101",
reason_code="possible_duplicate",
title="Confirm duplicate customer merge",
detail="CU100 and CU101 normalize to the same email, but child tables disagree on which customer ID is canonical.",
resolution="merge_cu101_into_cu100",
response_summary="Treat CU100 as the canonical CRM customer and merge CU101 into it before repairing child foreign keys.",
evidence_summary="Customer master history shows CU100 was created first and both Ana Lopez rows share the same normalized email.",
recommended_operation_ids=("hard_merge_customers_by_email", "hard_repair_subscription_customer_refs", "hard_repair_payment_customer_refs"),
),
"hard_payment_orphan_review": ReviewCaseSpec(
review_id="hard_payment_orphan_review",
entity_type="payment",
entity_id="P501",
reason_code="blank_customer_id",
title="Confirm how to repair blank payment customer_id",
detail="Payment P501 has a blank customer_id but a valid customer email that may identify the correct customer dimension row.",
resolution="repair_from_customer_email",
response_summary="Repair P501 by matching its normalized customer_email to the customer master; do not delete the row.",
evidence_summary="The billing export preserved ben.carter@example.com, so the customer foreign key can be restored deterministically.",
recommended_operation_ids=("hard_normalize_payments", "hard_repair_payment_customer_refs"),
),
}
return _task_from_solution(
task_id="crm_migration_hard",
title="CRM Migration Referential Cleanup",
difficulty="hard",
objective="Repair a three-table CRM migration extract by standardizing customer, subscription, and payment data; merging duplicate customers; fixing foreign keys from email joins; and removing duplicate payment facts without dropping legitimate orphan-like child rows.",
dataset_context="This dataset simulates a SaaS CRM and billing migration where a team must clean customer master data and child ledger references before import.",
max_steps=18,
review_budget=2,
sync_targets=("crm", "billing"),
primary_keys={"customers": "customer_id", "subscriptions": "subscription_id", "payments": "payment_id"},
duplicate_identity_columns={"customers": ("email",), "subscriptions": ("subscription_id",), "payments": ("customer_email", "subscription_id", "amount", "paid_at")},
dirty_tables=dirty_tables,
validation_rules=validation_rules,
operations=operations,
solution_operation_ids=("hard_normalize_customer_fields", "hard_merge_customers_by_email", "hard_normalize_subscriptions", "hard_repair_subscription_customer_refs", "hard_normalize_payments", "hard_repair_payment_customer_refs", "hard_remove_duplicate_payments"),
issue_cards=issue_cards,
review_cases=review_cases,
)
TASK_CATALOG: dict[str, TaskSpec] = {spec.task_id: spec for spec in (_build_easy_task(), _build_medium_task(), _build_hard_task())}
def list_task_ids() -> list[str]:
return list(TASK_CATALOG.keys())
def get_task_spec(task_id: str) -> TaskSpec:
if task_id not in TASK_CATALOG:
raise KeyError(f"Unknown task_id '{task_id}'. Available tasks: {list_task_ids()}")
return TASK_CATALOG[task_id]
def first_table_name(task_spec: TaskSpec) -> str:
return next(iter(task_spec.dirty_tables))
def apply_operation_to_tables(task_spec: TaskSpec, tables: Tables, operation_id: str) -> Tables:
if operation_id not in task_spec.operations:
raise KeyError(f"Unknown operation_id '{operation_id}' for task '{task_spec.task_id}'.")
transform = task_spec.operations[operation_id].transform
return transform(clone_tables(tables))
def sorted_rows(rows: list[dict[str, str]], primary_key: str) -> list[dict[str, str]]:
return sorted(copy.deepcopy(rows), key=lambda row: normalize_whitespace(row.get(primary_key, "")))