"""Task specs and deterministic cleaning operations for CleanOps.""" from __future__ import annotations import copy import re from dataclasses import dataclass from datetime import datetime from typing import Callable from cleanops_env.models import IssueCard Tables = dict[str, list[dict[str, str]]] TransformFn = Callable[[Tables], Tables] EMAIL_RE = re.compile(r"^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$") PHONE_RE = re.compile(r"^\(\d{3}\) \d{3}-\d{4}$") ISO_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$") AMOUNT_RE = re.compile(r"^\d+\.\d{2}$") STATE_CODES = { "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY", } STATE_ALIASES = { "tennessee": "TN", "tn": "TN", "texas": "TX", "tx": "TX", "california": "CA", "ca": "CA", } STATUS_RANK = { "active": 5, "trial": 4, "pending": 3, "past_due": 2, "returned": 2, "churn_risk": 1, "inactive": 0, "cancelled": 0, } @dataclass(frozen=True) class RequiredRule: table_name: str column_name: str severity: str = "high" @dataclass(frozen=True) class PatternRule: table_name: str column_name: str pattern: re.Pattern[str] message: str severity: str = "medium" @dataclass(frozen=True) class EnumRule: table_name: str column_name: str allowed: tuple[str, ...] severity: str = "medium" @dataclass(frozen=True) class UniqueRule: table_name: str columns: tuple[str, ...] severity: str = "high" @dataclass(frozen=True) class ForeignKeyRule: table_name: str column_name: str ref_table_name: str ref_column_name: str severity: str = "high" ValidationRule = RequiredRule | PatternRule | EnumRule | UniqueRule | ForeignKeyRule @dataclass(frozen=True) class OperationSpec: operation_id: str title: str category: str risk: str tables_affected: tuple[str, ...] description: str why_it_matters: str transform: TransformFn @dataclass(frozen=True) class ReviewCaseSpec: review_id: str entity_type: str entity_id: str reason_code: str title: str detail: str resolution: str response_summary: str evidence_summary: str recommended_operation_ids: tuple[str, ...] = () @dataclass(frozen=True) class TaskSpec: task_id: str title: str difficulty: str objective: str dataset_context: str max_steps: int review_budget: int sync_targets: tuple[str, ...] primary_keys: dict[str, str] duplicate_identity_columns: dict[str, tuple[str, ...]] dirty_tables: Tables gold_tables: Tables validation_rules: tuple[ValidationRule, ...] operations: dict[str, OperationSpec] solution_operation_ids: tuple[str, ...] issue_cards: tuple[IssueCard, ...] review_cases: dict[str, ReviewCaseSpec] def clone_tables(tables: Tables) -> Tables: return {table_name: [dict(row) for row in rows] for table_name, rows in tables.items()} def normalize_whitespace(value: object) -> str: return " ".join(str(value or "").replace("\u00a0", " ").split()) def normalize_name(value: object) -> str: cleaned = normalize_whitespace(value) return " ".join(part.capitalize() for part in cleaned.split()) def normalize_email(value: object) -> str: return normalize_whitespace(value).lower() def normalize_phone_us(value: object) -> str: digits = re.sub(r"\D", "", str(value or "")) if len(digits) == 11 and digits.startswith("1"): digits = digits[1:] if len(digits) == 7: digits = "615" + digits if len(digits) != 10: return normalize_whitespace(value) return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}" def normalize_state(value: object) -> str: cleaned = normalize_whitespace(value).lower() return STATE_ALIASES.get(cleaned, cleaned.upper()) def fill_state_from_city(city: object, state: object) -> str: current = normalize_state(state) if current: return current city_lookup = {"nashville": "TN", "austin": "TX", "san jose": "CA"} return city_lookup.get(normalize_whitespace(city).lower(), "") def normalize_status(value: object, mapping: dict[str, str]) -> str: cleaned = normalize_whitespace(value).lower().replace(" ", "_") return mapping.get(cleaned, cleaned.upper()) def normalize_date(value: object) -> str: cleaned = normalize_whitespace(value) for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%m-%d-%Y", "%Y.%m.%d"): try: return datetime.strptime(cleaned, fmt).strftime("%Y-%m-%d") except ValueError: continue return cleaned def normalize_currency(value: object) -> str: cleaned = normalize_whitespace(value).replace("$", "").upper() if cleaned in {"", "USD", "US"}: return "USD" return cleaned def normalize_amount(value: object) -> str: cleaned = normalize_whitespace(value) cleaned = cleaned.replace("$", "").replace(",", "").replace("O", "0").replace("o", "0") try: return f"{float(cleaned):.2f}" except ValueError: return normalize_whitespace(value) def rank_value(field_name: str, value: str) -> tuple[int, int, str]: cleaned = normalize_whitespace(value) if not cleaned: return (0, 0, "") if field_name in {"status", "order_status", "payment_status", "lifecycle_stage"}: return (2, STATUS_RANK.get(cleaned.lower(), 1), cleaned) if field_name.endswith("id") or field_name.endswith("_id"): return (1, len(cleaned), cleaned) return (1, len(cleaned), cleaned) def choose_preferred_value(field_name: str, values: list[str]) -> str: candidates = [normalize_whitespace(value) for value in values if normalize_whitespace(value)] if not candidates: return "" return sorted(candidates, key=lambda item: rank_value(field_name, item), reverse=True)[0] def dedupe_rows(rows: list[dict[str, str]], primary_key: str, identity_columns: tuple[str, ...]) -> list[dict[str, str]]: groups: dict[tuple[str, ...], list[dict[str, str]]] = {} for row in rows: identity = tuple(normalize_whitespace(row.get(column_name, "")).lower() for column_name in identity_columns) groups.setdefault(identity, []).append(dict(row)) merged_rows: list[dict[str, str]] = [] for group_rows in groups.values(): canonical = sorted(group_rows, key=lambda row: normalize_whitespace(row.get(primary_key, "")))[0] merged_row = dict(canonical) all_columns = sorted({column_name for row in group_rows for column_name in row}) for column_name in all_columns: if column_name == primary_key: merged_row[column_name] = normalize_whitespace(canonical.get(column_name, "")) continue merged_row[column_name] = choose_preferred_value(column_name, [row.get(column_name, "") for row in group_rows]) merged_rows.append(merged_row) return sorted(merged_rows, key=lambda row: normalize_whitespace(row.get(primary_key, ""))) def remap_foreign_keys_from_email( tables: Tables, child_table: str, fk_column: str, email_column: str, parent_table: str, parent_key_column: str, parent_email_column: str, ) -> Tables: updated = clone_tables(tables) parent_lookup: dict[str, str] = {} for row in sorted(updated[parent_table], key=lambda item: normalize_whitespace(item.get(parent_key_column, ""))): email = normalize_email(row.get(parent_email_column, "")) if email and email not in parent_lookup: parent_lookup[email] = normalize_whitespace(row.get(parent_key_column, "")) for row in updated[child_table]: email = normalize_email(row.get(email_column, "")) if email in parent_lookup: row[fk_column] = parent_lookup[email] return updated def normalize_columns(tables: Tables, table_name: str, column_transforms: dict[str, Callable[[object], str]]) -> Tables: updated = clone_tables(tables) for row in updated[table_name]: for column_name, transform in column_transforms.items(): row[column_name] = transform(row.get(column_name, "")) return updated def fill_customer_state_from_city(tables: Tables) -> Tables: updated = clone_tables(tables) for row in updated["customers"]: row["state"] = fill_state_from_city(row.get("city", ""), row.get("state", "")) return updated def merge_easy_customers(tables: Tables) -> Tables: updated = clone_tables(tables) updated["customers"] = dedupe_rows(updated["customers"], "customer_id", ("email",)) return updated def drop_inactive_customers(tables: Tables) -> Tables: updated = clone_tables(tables) updated["customers"] = [row for row in updated["customers"] if normalize_whitespace(row.get("status", "")).lower() != "inactive"] return updated def dedupe_orders_by_source_id(tables: Tables) -> Tables: updated = clone_tables(tables) groups: dict[str, list[dict[str, str]]] = {} for row in updated["orders"]: identity = normalize_whitespace(row.get("source_order_id", "")) or normalize_whitespace(row.get("order_id", "")) groups.setdefault(identity, []).append(dict(row)) merged_rows: list[dict[str, str]] = [] for identity, group_rows in groups.items(): canonical = sorted(group_rows, key=lambda row: normalize_whitespace(row.get("order_id", "")))[0] merged = dict(canonical) merged["order_id"] = identity all_columns = sorted({column for row in group_rows for column in row}) for column_name in all_columns: if column_name in {"order_id", "source_order_id"}: continue merged[column_name] = choose_preferred_value(column_name, [row.get(column_name, "") for row in group_rows]) merged.pop("source_order_id", None) merged_rows.append(merged) updated["orders"] = sorted(merged_rows, key=lambda row: normalize_whitespace(row.get("order_id", ""))) return updated def drop_cancelled_orders(tables: Tables) -> Tables: updated = clone_tables(tables) updated["orders"] = [row for row in updated["orders"] if normalize_whitespace(row.get("order_status", "")).lower() not in {"cancelled", "canceled"}] return updated def round_order_amounts_to_int(tables: Tables) -> Tables: updated = clone_tables(tables) for row in updated["orders"]: normalized = normalize_amount(row.get("total_amount", "")) try: row["total_amount"] = str(int(round(float(normalized)))) except ValueError: row["total_amount"] = normalized return updated def merge_hard_customers_by_email(tables: Tables) -> Tables: updated = clone_tables(tables) updated["customers"] = dedupe_rows(updated["customers"], "customer_id", ("email",)) return updated def remove_duplicate_payments(tables: Tables) -> Tables: updated = clone_tables(tables) groups: dict[tuple[str, str, str, str], list[dict[str, str]]] = {} for row in updated["payments"]: identity = ( normalize_email(row.get("customer_email", "")), normalize_whitespace(row.get("subscription_id", "")), normalize_amount(row.get("amount", "")), normalize_date(row.get("paid_at", "")), ) groups.setdefault(identity, []).append(dict(row)) deduped = [sorted(group_rows, key=lambda row: normalize_whitespace(row.get("payment_id", "")))[0] for group_rows in groups.values()] updated["payments"] = sorted(deduped, key=lambda row: normalize_whitespace(row.get("payment_id", ""))) return updated def drop_orphaned_subscriptions_and_payments(tables: Tables) -> Tables: updated = clone_tables(tables) valid_customer_ids = {normalize_whitespace(row.get("customer_id", "")) for row in updated["customers"]} updated["subscriptions"] = [row for row in updated["subscriptions"] if normalize_whitespace(row.get("customer_id", "")) in valid_customer_ids] updated["payments"] = [row for row in updated["payments"] if normalize_whitespace(row.get("customer_id", "")) in valid_customer_ids] return updated def _task_from_solution( *, task_id: str, title: str, difficulty: str, objective: str, dataset_context: str, max_steps: int, review_budget: int, sync_targets: tuple[str, ...], primary_keys: dict[str, str], duplicate_identity_columns: dict[str, tuple[str, ...]], dirty_tables: Tables, validation_rules: tuple[ValidationRule, ...], operations: dict[str, OperationSpec], solution_operation_ids: tuple[str, ...], issue_cards: tuple[IssueCard, ...], review_cases: dict[str, ReviewCaseSpec], ) -> TaskSpec: gold_tables = clone_tables(dirty_tables) for operation_id in solution_operation_ids: gold_tables = operations[operation_id].transform(gold_tables) return TaskSpec( task_id=task_id, title=title, difficulty=difficulty, objective=objective, dataset_context=dataset_context, max_steps=max_steps, review_budget=review_budget, sync_targets=sync_targets, primary_keys=primary_keys, duplicate_identity_columns=duplicate_identity_columns, dirty_tables=dirty_tables, gold_tables=gold_tables, validation_rules=validation_rules, operations=operations, solution_operation_ids=solution_operation_ids, issue_cards=issue_cards, review_cases=review_cases, ) def _build_easy_task() -> TaskSpec: dirty_tables = { "customers": [ {"customer_id": "C001", "full_name": " alice johnson ", "email": "ALICE@example.com ", "phone": "615.555.0101", "state": "tn", "city": "Nashville", "status": "pending"}, {"customer_id": "C002", "full_name": "Bob smith", "email": " bob.smith@example.com", "phone": "(615) 555-0102", "state": "Tennessee", "city": "Nashville", "status": "active"}, {"customer_id": "C003", "full_name": "Carla Gomez", "email": "carla.gomez@example.com", "phone": "5550103", "state": "", "city": "Austin", "status": "pending"}, {"customer_id": "C004", "full_name": "Dan wu", "email": "DAN.WU@example.com", "phone": "+1 615-555-0104", "state": " TX ", "city": "Austin", "status": "inactive"}, {"customer_id": "C005", "full_name": "Alice Johnson", "email": "alice@example.com", "phone": "6155550101", "state": "TN", "city": "Nashville", "status": "active"}, ] } operations = { "easy_normalize_names": OperationSpec("easy_normalize_names", "Normalize customer names", "standardize_text", "safe", ("customers",), "Trim whitespace and convert full_name to title case.", "Customer-facing names should be consistently formatted before CRM import.", lambda tables: normalize_columns(tables, "customers", {"full_name": normalize_name})), "easy_normalize_emails": OperationSpec("easy_normalize_emails", "Normalize customer emails", "standardize_text", "safe", ("customers",), "Lowercase and trim email addresses.", "Case and whitespace inconsistencies hide duplicate customer records.", lambda tables: normalize_columns(tables, "customers", {"email": normalize_email})), "easy_normalize_phones": OperationSpec("easy_normalize_phones", "Normalize US phone numbers", "standardize_contact", "safe", ("customers",), "Convert phone numbers into '(AAA) BBB-CCCC' format and infer the missing local area code.", "Support and sales workflows depend on consistent contact fields.", lambda tables: normalize_columns(tables, "customers", {"phone": normalize_phone_us})), "easy_normalize_states": OperationSpec("easy_normalize_states", "Normalize state codes", "standardize_geo", "safe", ("customers",), "Convert state names and lower-case abbreviations to two-letter US state codes.", "Downstream tax and routing systems expect canonical state codes.", lambda tables: normalize_columns(tables, "customers", {"state": normalize_state})), "easy_fill_state_from_city": OperationSpec("easy_fill_state_from_city", "Fill missing state from city", "impute_missing", "review", ("customers",), "Fill blank state values using a deterministic city-to-state lookup.", "Missing geo fields prevent territory assignment and validation checks.", fill_customer_state_from_city), "easy_merge_customers_by_email": OperationSpec("easy_merge_customers_by_email", "Merge duplicate customers by email", "deduplicate", "review", ("customers",), "Collapse rows with the same normalized email into one canonical customer record.", "Duplicate contacts inflate outreach counts and break customer analytics.", merge_easy_customers), "easy_drop_inactive_customers": OperationSpec("easy_drop_inactive_customers", "Drop inactive customers", "destructive_filter", "destructive", ("customers",), "Delete all rows where status is inactive.", "This can destroy valid historical records and should be avoided for this task.", drop_inactive_customers), } validation_rules = ( RequiredRule("customers", "customer_id", "high"), RequiredRule("customers", "full_name", "high"), RequiredRule("customers", "email", "high"), RequiredRule("customers", "phone", "medium"), RequiredRule("customers", "state", "medium"), PatternRule("customers", "email", EMAIL_RE, "Email must be lowercase and valid.", "high"), PatternRule("customers", "phone", PHONE_RE, "Phone must use '(AAA) BBB-CCCC' format.", "medium"), EnumRule("customers", "state", tuple(sorted(STATE_CODES)), "medium"), EnumRule("customers", "status", ("active", "inactive", "pending"), "low"), UniqueRule("customers", ("email",), "high"), ) issue_cards = ( IssueCard(title="Contact formatting is inconsistent", detail="Names, emails, phones, and state values need canonical formatting before handoff.", issue_codes=["pattern:customers.email", "pattern:customers.phone", "enum:customers.state"], recommended_operation_ids=["easy_normalize_names", "easy_normalize_emails", "easy_normalize_phones", "easy_normalize_states"]), IssueCard(title="A missing state value blocks validation", detail="One customer record has city information but no state code.", issue_codes=["required:customers.state"], recommended_operation_ids=["easy_fill_state_from_city"]), IssueCard(title="Duplicate customer identities exist", detail="Two rows refer to the same customer once emails are normalized.", issue_codes=["unique:customers.email"], recommended_operation_ids=["easy_merge_customers_by_email"]), ) review_cases = { "easy_customer_duplicate_review": ReviewCaseSpec( review_id="easy_customer_duplicate_review", entity_type="customer", entity_id="C005", reason_code="possible_duplicate", title="Confirm duplicate customer merge", detail="Alice Johnson appears twice with status conflicts after email normalization.", resolution="merge_confirmed_keep_c001", response_summary="Merge C005 into C001. Keep the active account record and preserve inactive customers elsewhere in the file.", evidence_summary="Normalized emails match and both rows describe the same Nashville customer; C001 is the canonical CRM ID.", recommended_operation_ids=("easy_merge_customers_by_email",), ) } return _task_from_solution( task_id="customer_contacts_easy", title="Customer Contacts Standardization", difficulty="easy", objective="Prepare a customer-contact export for CRM import by standardizing contact fields, filling one missing state, and merging duplicate customer rows without deleting valid inactive accounts.", dataset_context="This table simulates a weekly B2B CRM export that sales ops cleans before loading into a customer system.", max_steps=10, review_budget=1, sync_targets=("crm",), primary_keys={"customers": "customer_id"}, duplicate_identity_columns={"customers": ("email",)}, dirty_tables=dirty_tables, validation_rules=validation_rules, operations=operations, solution_operation_ids=("easy_normalize_names", "easy_normalize_emails", "easy_normalize_phones", "easy_normalize_states", "easy_fill_state_from_city", "easy_merge_customers_by_email"), issue_cards=issue_cards, review_cases=review_cases, ) def _build_medium_task() -> TaskSpec: dirty_tables = { "orders": [ {"order_id": "O1001", "customer_email": "alice@example.com", "order_date": "2025/01/05", "currency": "usd", "total_amount": "1,200.00", "order_status": "Shipped", "shipping_state": "tn", "source_order_id": ""}, {"order_id": "O1002", "customer_email": "bob.smith@example.com", "order_date": "01-06-2025", "currency": "$", "total_amount": "45.50", "order_status": "pending ", "shipping_state": "Tennessee", "source_order_id": ""}, {"order_id": "O1003", "customer_email": "carla.gomez@example.com", "order_date": "2025-01-07", "currency": "USD ", "total_amount": "12O.00", "order_status": "cancelled", "shipping_state": "TX", "source_order_id": ""}, {"order_id": "O1004", "customer_email": "dan.wu@example.com", "order_date": "2025.01.08", "currency": "usd", "total_amount": "89", "order_status": "Ship", "shipping_state": " tx ", "source_order_id": ""}, {"order_id": "O1002", "customer_email": "bob.smith@example.com", "order_date": "2025-01-06", "currency": "USD", "total_amount": "45.50", "order_status": "PENDING", "shipping_state": "TN", "source_order_id": "O1002"}, {"order_id": "O1005", "customer_email": "enterprise@example.com", "order_date": "2025/01/09", "currency": "usd", "total_amount": "2500", "order_status": "Returned", "shipping_state": "CA", "source_order_id": ""}, ] } order_status_map = {"shipped": "SHIPPED", "ship": "SHIPPED", "pending": "PENDING", "cancelled": "CANCELLED", "returned": "RETURNED"} operations = { "med_normalize_dates": OperationSpec("med_normalize_dates", "Normalize order dates", "standardize_dates", "safe", ("orders",), "Convert order_date values into ISO 8601 format (YYYY-MM-DD).", "Finance systems reject mixed date formats during settlement reconciliation.", lambda tables: normalize_columns(tables, "orders", {"order_date": normalize_date})), "med_normalize_currency_amounts": OperationSpec("med_normalize_currency_amounts", "Normalize currency and amounts", "standardize_money", "review", ("orders",), "Standardize currency to USD and normalize total_amount to two-decimal strings.", "Revenue aggregation fails when currency and amount encodings are inconsistent.", lambda tables: normalize_columns(tables, "orders", {"currency": normalize_currency, "total_amount": normalize_amount})), "med_normalize_order_statuses": OperationSpec("med_normalize_order_statuses", "Canonicalize order statuses", "standardize_status", "safe", ("orders",), "Map free-text order status values to SHIPPED, PENDING, CANCELLED, or RETURNED.", "Operational dashboards and SLAs depend on normalized state machines.", lambda tables: normalize_columns(tables, "orders", {"order_status": lambda value: normalize_status(value, order_status_map)})), "med_normalize_shipping_states": OperationSpec("med_normalize_shipping_states", "Normalize shipping state codes", "standardize_geo", "safe", ("orders",), "Convert shipping_state values into two-letter US state codes.", "Warehouse routing rules require canonical geography fields.", lambda tables: normalize_columns(tables, "orders", {"shipping_state": normalize_state})), "med_dedupe_orders": OperationSpec("med_dedupe_orders", "Remove duplicated order exports", "deduplicate", "review", ("orders",), "Merge duplicate order rows using source_order_id when present.", "Duplicate transactions overstate revenue and shipment volume.", dedupe_orders_by_source_id), "med_drop_cancelled_orders": OperationSpec("med_drop_cancelled_orders", "Drop cancelled orders", "destructive_filter", "destructive", ("orders",), "Delete all cancelled rows from the table.", "Cancelled orders are still valid operational records and should not be removed here.", drop_cancelled_orders), "med_round_amounts_to_int": OperationSpec("med_round_amounts_to_int", "Round all order totals to whole dollars", "destructive_transform", "destructive", ("orders",), "Round every total_amount to an integer string.", "This destroys cents precision and should be rejected.", round_order_amounts_to_int), } validation_rules = ( RequiredRule("orders", "order_id", "high"), RequiredRule("orders", "customer_email", "high"), RequiredRule("orders", "order_date", "high"), RequiredRule("orders", "currency", "high"), RequiredRule("orders", "total_amount", "high"), RequiredRule("orders", "order_status", "high"), RequiredRule("orders", "shipping_state", "medium"), PatternRule("orders", "customer_email", EMAIL_RE, "Customer email must be canonical.", "high"), PatternRule("orders", "order_date", ISO_DATE_RE, "Order date must be YYYY-MM-DD.", "high"), PatternRule("orders", "total_amount", AMOUNT_RE, "Amount must have exactly two decimals.", "high"), EnumRule("orders", "currency", ("USD",), "high"), EnumRule("orders", "order_status", ("SHIPPED", "PENDING", "CANCELLED", "RETURNED"), "medium"), EnumRule("orders", "shipping_state", tuple(sorted(STATE_CODES)), "medium"), UniqueRule("orders", ("order_id",), "high"), ) issue_cards = ( IssueCard(title="Dates, money, and statuses use mixed encodings", detail="The order export mixes separators, symbols, and ad hoc status spellings.", issue_codes=["pattern:orders.order_date", "pattern:orders.total_amount", "enum:orders.order_status", "enum:orders.currency"], recommended_operation_ids=["med_normalize_dates", "med_normalize_currency_amounts", "med_normalize_order_statuses"]), IssueCard(title="Shipping state labels are not canonical", detail="Downstream warehouse tools require two-letter state abbreviations.", issue_codes=["enum:orders.shipping_state"], recommended_operation_ids=["med_normalize_shipping_states"]), IssueCard(title="A duplicated order row exists", detail="One record is a second export copy of another order.", issue_codes=["unique:orders.order_id"], recommended_operation_ids=["med_dedupe_orders"]), ) review_cases = { "med_returned_order_review": ReviewCaseSpec( review_id="med_returned_order_review", entity_type="order", entity_id="O1005", reason_code="preserve_operational_record", title="Confirm whether returned order should be retained", detail="Returned orders often look removable during cleanup, but finance may still require them.", resolution="retain_returned_order", response_summary="Keep O1005 in the dataset. Normalize it, but do not delete returned or cancelled orders for this reconciliation task.", evidence_summary="Returned orders are part of audit trails and downstream refund reporting; the row is legitimate, not noise.", recommended_operation_ids=("med_normalize_dates", "med_normalize_currency_amounts", "med_normalize_order_statuses"), ) } return _task_from_solution( task_id="orders_reconciliation_medium", title="E-commerce Order Reconciliation", difficulty="medium", objective="Clean a transactional orders export by normalizing dates, money, statuses, and shipping states while deduplicating repeated order exports without deleting legitimate cancelled orders.", dataset_context="This table simulates a daily order extract from an e-commerce platform that revenue ops must reconcile before BI ingestion.", max_steps=12, review_budget=1, sync_targets=("crm", "billing"), primary_keys={"orders": "order_id"}, duplicate_identity_columns={"orders": ("order_id",)}, dirty_tables=dirty_tables, validation_rules=validation_rules, operations=operations, solution_operation_ids=("med_normalize_dates", "med_normalize_currency_amounts", "med_normalize_order_statuses", "med_normalize_shipping_states", "med_dedupe_orders"), issue_cards=issue_cards, review_cases=review_cases, ) def _build_hard_task() -> TaskSpec: dirty_tables = { "customers": [ {"customer_id": "CU100", "full_name": "Ana Lopez", "email": "ana.lopez@example.com ", "lifecycle_stage": "Active"}, {"customer_id": "CU101", "full_name": "Ana Lopez", "email": "ANA.LOPEZ@example.com", "lifecycle_stage": "active"}, {"customer_id": "CU102", "full_name": "Ben Carter", "email": "ben.carter@example.com", "lifecycle_stage": "trial"}, {"customer_id": "CU103", "full_name": "Mia Chen", "email": "mia.chen@example.com", "lifecycle_stage": "churn_risk"}, ], "subscriptions": [ {"subscription_id": "S900", "customer_id": "CU101", "customer_email": "ana.lopez@example.com", "plan_code": "BASIC monthly", "status": "active", "renewal_date": "2025/02/01"}, {"subscription_id": "S901", "customer_id": "CU102", "customer_email": "ben.carter@example.com", "plan_code": "pro annual", "status": "trial ", "renewal_date": "02-15-2025"}, {"subscription_id": "S902", "customer_id": "CU999", "customer_email": "mia.chen@example.com", "plan_code": "BASIC", "status": "past_due", "renewal_date": "2025.03.01"}, ], "payments": [ {"payment_id": "P500", "customer_id": "CU100", "customer_email": "ana.lopez@example.com", "subscription_id": "S900", "amount": "29", "currency": "usd", "payment_status": "Paid", "paid_at": "2025/01/01"}, {"payment_id": "P501", "customer_id": "", "customer_email": "ben.carter@example.com", "subscription_id": "S901", "amount": "299.0", "currency": "USD ", "payment_status": "settled", "paid_at": "01-15-2025"}, {"payment_id": "P502", "customer_id": "CU999", "customer_email": "mia.chen@example.com", "subscription_id": "S902", "amount": "29.00", "currency": "usd", "payment_status": "FAILED ", "paid_at": "2025.02.01"}, {"payment_id": "P503", "customer_id": "CU101", "customer_email": "ana.lopez@example.com", "subscription_id": "S900", "amount": "29.00", "currency": "usd", "payment_status": "paid", "paid_at": "2025-01-01"}, ], } lifecycle_map = {"active": "ACTIVE", "trial": "TRIAL", "churn_risk": "CHURN_RISK"} subscription_status_map = {"active": "ACTIVE", "trial": "TRIAL", "past_due": "PAST_DUE"} payment_status_map = {"paid": "PAID", "settled": "PAID", "failed": "FAILED"} plan_map = {"basic_monthly": "BASIC_MONTHLY", "basic": "BASIC_MONTHLY", "pro_annual": "PRO_ANNUAL"} operations = { "hard_normalize_customer_fields": OperationSpec("hard_normalize_customer_fields", "Normalize customer master records", "standardize_master_data", "safe", ("customers",), "Trim customer names/emails and standardize lifecycle_stage values.", "Canonical customer keys and lifecycle labels are the backbone of the migration.", lambda tables: normalize_columns(tables, "customers", {"full_name": normalize_name, "email": normalize_email, "lifecycle_stage": lambda value: normalize_status(value, lifecycle_map)})), "hard_merge_customers_by_email": OperationSpec("hard_merge_customers_by_email", "Merge duplicated CRM customers", "deduplicate_master_data", "review", ("customers",), "Merge customer rows with the same normalized email and keep the lowest customer_id as the canonical ID.", "Duplicate customer IDs fragment subscriptions and payments across child tables.", merge_hard_customers_by_email), "hard_normalize_subscriptions": OperationSpec("hard_normalize_subscriptions", "Normalize subscription records", "standardize_subscriptions", "safe", ("subscriptions",), "Normalize subscription plan codes, statuses, renewal dates, and customer emails.", "Subscription analytics and renewals automation rely on stable plan/status values.", lambda tables: normalize_columns(tables, "subscriptions", {"customer_email": normalize_email, "plan_code": lambda value: normalize_status(value, plan_map), "status": lambda value: normalize_status(value, subscription_status_map), "renewal_date": normalize_date})), "hard_repair_subscription_customer_refs": OperationSpec("hard_repair_subscription_customer_refs", "Repair subscription customer references", "repair_foreign_keys", "review", ("subscriptions",), "Rewrite subscriptions.customer_id by matching customer_email against the current customers table.", "Migration import will reject subscriptions that reference unknown customer IDs.", lambda tables: remap_foreign_keys_from_email(tables, "subscriptions", "customer_id", "customer_email", "customers", "customer_id", "email")), "hard_normalize_payments": OperationSpec("hard_normalize_payments", "Normalize payment ledger rows", "standardize_payments", "safe", ("payments",), "Normalize payment customer emails, amounts, currency, statuses, and paid_at dates.", "Consistent payment values are required for revenue reconciliation.", lambda tables: normalize_columns(tables, "payments", {"customer_email": normalize_email, "amount": normalize_amount, "currency": normalize_currency, "payment_status": lambda value: normalize_status(value, payment_status_map), "paid_at": normalize_date})), "hard_repair_payment_customer_refs": OperationSpec("hard_repair_payment_customer_refs", "Repair payment customer references", "repair_foreign_keys", "review", ("payments",), "Rewrite payments.customer_id using the payment customer_email and the current customers table.", "Payment facts must link to customer dimensions after the dedupe migration.", lambda tables: remap_foreign_keys_from_email(tables, "payments", "customer_id", "customer_email", "customers", "customer_id", "email")), "hard_remove_duplicate_payments": OperationSpec("hard_remove_duplicate_payments", "Remove duplicate payment facts", "deduplicate_facts", "review", ("payments",), "Collapse duplicated payment rows sharing customer, subscription, amount, and paid_at values.", "A duplicated ledger entry double-counts revenue.", remove_duplicate_payments), "hard_drop_orphaned_rows": OperationSpec("hard_drop_orphaned_rows", "Drop orphaned subscriptions and payments", "destructive_filter", "destructive", ("subscriptions", "payments"), "Delete child rows whose customer_id is not present in customers.", "Dropping business records hides data-quality issues and loses legitimate revenue facts.", drop_orphaned_subscriptions_and_payments), } validation_rules = ( RequiredRule("customers", "customer_id", "high"), RequiredRule("customers", "full_name", "high"), RequiredRule("customers", "email", "high"), RequiredRule("customers", "lifecycle_stage", "medium"), PatternRule("customers", "email", EMAIL_RE, "Customer email must be canonical.", "high"), EnumRule("customers", "lifecycle_stage", ("ACTIVE", "TRIAL", "CHURN_RISK"), "medium"), UniqueRule("customers", ("email",), "high"), RequiredRule("subscriptions", "subscription_id", "high"), RequiredRule("subscriptions", "customer_id", "high"), RequiredRule("subscriptions", "customer_email", "medium"), RequiredRule("subscriptions", "plan_code", "high"), RequiredRule("subscriptions", "status", "medium"), RequiredRule("subscriptions", "renewal_date", "medium"), PatternRule("subscriptions", "customer_email", EMAIL_RE, "Subscription email must be canonical.", "medium"), PatternRule("subscriptions", "renewal_date", ISO_DATE_RE, "Renewal date must be YYYY-MM-DD.", "medium"), EnumRule("subscriptions", "plan_code", ("BASIC_MONTHLY", "PRO_ANNUAL"), "medium"), EnumRule("subscriptions", "status", ("ACTIVE", "TRIAL", "PAST_DUE"), "medium"), UniqueRule("subscriptions", ("subscription_id",), "high"), ForeignKeyRule("subscriptions", "customer_id", "customers", "customer_id", "high"), RequiredRule("payments", "payment_id", "high"), RequiredRule("payments", "customer_id", "high"), RequiredRule("payments", "customer_email", "medium"), RequiredRule("payments", "subscription_id", "high"), RequiredRule("payments", "amount", "high"), RequiredRule("payments", "currency", "high"), RequiredRule("payments", "payment_status", "high"), RequiredRule("payments", "paid_at", "high"), PatternRule("payments", "customer_email", EMAIL_RE, "Payment email must be canonical.", "medium"), PatternRule("payments", "amount", AMOUNT_RE, "Payment amount must have two decimals.", "high"), PatternRule("payments", "paid_at", ISO_DATE_RE, "Payment date must be YYYY-MM-DD.", "medium"), EnumRule("payments", "currency", ("USD",), "medium"), EnumRule("payments", "payment_status", ("PAID", "FAILED"), "medium"), UniqueRule("payments", ("customer_email", "subscription_id", "amount", "paid_at"), "high"), ForeignKeyRule("payments", "customer_id", "customers", "customer_id", "high"), ForeignKeyRule("payments", "subscription_id", "subscriptions", "subscription_id", "high"), ) issue_cards = ( IssueCard(title="Customer master data contains duplicated identities", detail="Two Ana Lopez records use different customer IDs but the same email after normalization.", issue_codes=["unique:customers.email", "pattern:customers.email"], recommended_operation_ids=["hard_normalize_customer_fields", "hard_merge_customers_by_email"]), IssueCard(title="Child tables contain invalid customer references", detail="Subscription and payment rows reference stale or blank customer IDs that must be repaired from email joins.", issue_codes=["foreign_key:subscriptions.customer_id", "foreign_key:payments.customer_id", "required:payments.customer_id"], recommended_operation_ids=["hard_repair_subscription_customer_refs", "hard_repair_payment_customer_refs"]), IssueCard(title="Subscription and payment facts use inconsistent formats", detail="Plans, statuses, dates, amounts, and currency values need canonicalization before loading.", issue_codes=["enum:subscriptions.plan_code", "enum:subscriptions.status", "pattern:subscriptions.renewal_date", "pattern:payments.amount", "enum:payments.payment_status", "pattern:payments.paid_at"], recommended_operation_ids=["hard_normalize_subscriptions", "hard_normalize_payments"]), IssueCard(title="Duplicate payment facts are present", detail="Two payment rows represent the same invoice settlement and one should be removed.", issue_codes=["unique:payments.customer_email+subscription_id+amount+paid_at"], recommended_operation_ids=["hard_remove_duplicate_payments"]), ) review_cases = { "hard_customer_merge_review": ReviewCaseSpec( review_id="hard_customer_merge_review", entity_type="customer", entity_id="CU101", reason_code="possible_duplicate", title="Confirm duplicate customer merge", detail="CU100 and CU101 normalize to the same email, but child tables disagree on which customer ID is canonical.", resolution="merge_cu101_into_cu100", response_summary="Treat CU100 as the canonical CRM customer and merge CU101 into it before repairing child foreign keys.", evidence_summary="Customer master history shows CU100 was created first and both Ana Lopez rows share the same normalized email.", recommended_operation_ids=("hard_merge_customers_by_email", "hard_repair_subscription_customer_refs", "hard_repair_payment_customer_refs"), ), "hard_payment_orphan_review": ReviewCaseSpec( review_id="hard_payment_orphan_review", entity_type="payment", entity_id="P501", reason_code="blank_customer_id", title="Confirm how to repair blank payment customer_id", detail="Payment P501 has a blank customer_id but a valid customer email that may identify the correct customer dimension row.", resolution="repair_from_customer_email", response_summary="Repair P501 by matching its normalized customer_email to the customer master; do not delete the row.", evidence_summary="The billing export preserved ben.carter@example.com, so the customer foreign key can be restored deterministically.", recommended_operation_ids=("hard_normalize_payments", "hard_repair_payment_customer_refs"), ), } return _task_from_solution( task_id="crm_migration_hard", title="CRM Migration Referential Cleanup", difficulty="hard", objective="Repair a three-table CRM migration extract by standardizing customer, subscription, and payment data; merging duplicate customers; fixing foreign keys from email joins; and removing duplicate payment facts without dropping legitimate orphan-like child rows.", dataset_context="This dataset simulates a SaaS CRM and billing migration where a team must clean customer master data and child ledger references before import.", max_steps=18, review_budget=2, sync_targets=("crm", "billing"), primary_keys={"customers": "customer_id", "subscriptions": "subscription_id", "payments": "payment_id"}, duplicate_identity_columns={"customers": ("email",), "subscriptions": ("subscription_id",), "payments": ("customer_email", "subscription_id", "amount", "paid_at")}, dirty_tables=dirty_tables, validation_rules=validation_rules, operations=operations, solution_operation_ids=("hard_normalize_customer_fields", "hard_merge_customers_by_email", "hard_normalize_subscriptions", "hard_repair_subscription_customer_refs", "hard_normalize_payments", "hard_repair_payment_customer_refs", "hard_remove_duplicate_payments"), issue_cards=issue_cards, review_cases=review_cases, ) TASK_CATALOG: dict[str, TaskSpec] = {spec.task_id: spec for spec in (_build_easy_task(), _build_medium_task(), _build_hard_task())} def list_task_ids() -> list[str]: return list(TASK_CATALOG.keys()) def get_task_spec(task_id: str) -> TaskSpec: if task_id not in TASK_CATALOG: raise KeyError(f"Unknown task_id '{task_id}'. Available tasks: {list_task_ids()}") return TASK_CATALOG[task_id] def first_table_name(task_spec: TaskSpec) -> str: return next(iter(task_spec.dirty_tables)) def apply_operation_to_tables(task_spec: TaskSpec, tables: Tables, operation_id: str) -> Tables: if operation_id not in task_spec.operations: raise KeyError(f"Unknown operation_id '{operation_id}' for task '{task_spec.task_id}'.") transform = task_spec.operations[operation_id].transform return transform(clone_tables(tables)) def sorted_rows(rows: list[dict[str, str]], primary_key: str) -> list[dict[str, str]]: return sorted(copy.deepcopy(rows), key=lambda row: normalize_whitespace(row.get(primary_key, "")))