File size: 43,622 Bytes
ac326a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c2c5f2
 
 
 
 
 
 
 
 
 
 
 
 
 
ac326a6
 
 
 
 
 
 
 
7c2c5f2
 
ac326a6
 
 
 
 
 
 
 
7c2c5f2
ac326a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c2c5f2
 
ac326a6
 
 
 
 
 
 
7c2c5f2
ac326a6
 
 
 
 
 
 
 
 
 
 
7c2c5f2
 
ac326a6
 
 
 
 
 
 
 
7c2c5f2
ac326a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c2c5f2
 
 
 
 
 
 
 
 
 
 
 
 
 
ac326a6
 
 
 
 
 
 
7c2c5f2
 
ac326a6
 
 
 
 
 
 
7c2c5f2
ac326a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c2c5f2
 
 
 
 
 
 
 
 
 
 
 
 
 
ac326a6
 
 
 
 
 
 
7c2c5f2
 
ac326a6
 
 
 
 
 
 
7c2c5f2
ac326a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c2c5f2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac326a6
 
 
 
 
 
 
7c2c5f2
 
ac326a6
 
 
 
 
 
 
7c2c5f2
ac326a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
"""Task specs and deterministic cleaning operations for CleanOps."""

from __future__ import annotations

import copy
import re
from dataclasses import dataclass
from datetime import datetime
from typing import Callable

from cleanops_env.models import IssueCard

Tables = dict[str, list[dict[str, str]]]
TransformFn = Callable[[Tables], Tables]

EMAIL_RE = re.compile(r"^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$")
PHONE_RE = re.compile(r"^\(\d{3}\) \d{3}-\d{4}$")
ISO_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
AMOUNT_RE = re.compile(r"^\d+\.\d{2}$")
STATE_CODES = {
    "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL",
    "IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT",
    "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI",
    "SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY",
}
STATE_ALIASES = {
    "tennessee": "TN",
    "tn": "TN",
    "texas": "TX",
    "tx": "TX",
    "california": "CA",
    "ca": "CA",
}
STATUS_RANK = {
    "active": 5,
    "trial": 4,
    "pending": 3,
    "past_due": 2,
    "returned": 2,
    "churn_risk": 1,
    "inactive": 0,
    "cancelled": 0,
}


@dataclass(frozen=True)
class RequiredRule:
    table_name: str
    column_name: str
    severity: str = "high"


@dataclass(frozen=True)
class PatternRule:
    table_name: str
    column_name: str
    pattern: re.Pattern[str]
    message: str
    severity: str = "medium"


@dataclass(frozen=True)
class EnumRule:
    table_name: str
    column_name: str
    allowed: tuple[str, ...]
    severity: str = "medium"


@dataclass(frozen=True)
class UniqueRule:
    table_name: str
    columns: tuple[str, ...]
    severity: str = "high"


@dataclass(frozen=True)
class ForeignKeyRule:
    table_name: str
    column_name: str
    ref_table_name: str
    ref_column_name: str
    severity: str = "high"


ValidationRule = RequiredRule | PatternRule | EnumRule | UniqueRule | ForeignKeyRule


@dataclass(frozen=True)
class OperationSpec:
    operation_id: str
    title: str
    category: str
    risk: str
    tables_affected: tuple[str, ...]
    description: str
    why_it_matters: str
    transform: TransformFn


@dataclass(frozen=True)
class ReviewCaseSpec:
    review_id: str
    entity_type: str
    entity_id: str
    reason_code: str
    title: str
    detail: str
    resolution: str
    response_summary: str
    evidence_summary: str
    recommended_operation_ids: tuple[str, ...] = ()


@dataclass(frozen=True)
class TaskSpec:
    task_id: str
    title: str
    difficulty: str
    objective: str
    dataset_context: str
    max_steps: int
    review_budget: int
    sync_targets: tuple[str, ...]
    primary_keys: dict[str, str]
    duplicate_identity_columns: dict[str, tuple[str, ...]]
    dirty_tables: Tables
    gold_tables: Tables
    validation_rules: tuple[ValidationRule, ...]
    operations: dict[str, OperationSpec]
    solution_operation_ids: tuple[str, ...]
    issue_cards: tuple[IssueCard, ...]
    review_cases: dict[str, ReviewCaseSpec]


def clone_tables(tables: Tables) -> Tables:
    return {table_name: [dict(row) for row in rows] for table_name, rows in tables.items()}


def normalize_whitespace(value: object) -> str:
    return " ".join(str(value or "").replace("\u00a0", " ").split())


def normalize_name(value: object) -> str:
    cleaned = normalize_whitespace(value)
    return " ".join(part.capitalize() for part in cleaned.split())


def normalize_email(value: object) -> str:
    return normalize_whitespace(value).lower()


def normalize_phone_us(value: object) -> str:
    digits = re.sub(r"\D", "", str(value or ""))
    if len(digits) == 11 and digits.startswith("1"):
        digits = digits[1:]
    if len(digits) == 7:
        digits = "615" + digits
    if len(digits) != 10:
        return normalize_whitespace(value)
    return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"


def normalize_state(value: object) -> str:
    cleaned = normalize_whitespace(value).lower()
    return STATE_ALIASES.get(cleaned, cleaned.upper())


def fill_state_from_city(city: object, state: object) -> str:
    current = normalize_state(state)
    if current:
        return current
    city_lookup = {"nashville": "TN", "austin": "TX", "san jose": "CA"}
    return city_lookup.get(normalize_whitespace(city).lower(), "")


def normalize_status(value: object, mapping: dict[str, str]) -> str:
    cleaned = normalize_whitespace(value).lower().replace(" ", "_")
    return mapping.get(cleaned, cleaned.upper())


def normalize_date(value: object) -> str:
    cleaned = normalize_whitespace(value)
    for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%m-%d-%Y", "%Y.%m.%d"):
        try:
            return datetime.strptime(cleaned, fmt).strftime("%Y-%m-%d")
        except ValueError:
            continue
    return cleaned


def normalize_currency(value: object) -> str:
    cleaned = normalize_whitespace(value).replace("$", "").upper()
    if cleaned in {"", "USD", "US"}:
        return "USD"
    return cleaned


def normalize_amount(value: object) -> str:
    cleaned = normalize_whitespace(value)
    cleaned = cleaned.replace("$", "").replace(",", "").replace("O", "0").replace("o", "0")
    try:
        return f"{float(cleaned):.2f}"
    except ValueError:
        return normalize_whitespace(value)


def rank_value(field_name: str, value: str) -> tuple[int, int, str]:
    cleaned = normalize_whitespace(value)
    if not cleaned:
        return (0, 0, "")
    if field_name in {"status", "order_status", "payment_status", "lifecycle_stage"}:
        return (2, STATUS_RANK.get(cleaned.lower(), 1), cleaned)
    if field_name.endswith("id") or field_name.endswith("_id"):
        return (1, len(cleaned), cleaned)
    return (1, len(cleaned), cleaned)


def choose_preferred_value(field_name: str, values: list[str]) -> str:
    candidates = [normalize_whitespace(value) for value in values if normalize_whitespace(value)]
    if not candidates:
        return ""
    return sorted(candidates, key=lambda item: rank_value(field_name, item), reverse=True)[0]


def dedupe_rows(rows: list[dict[str, str]], primary_key: str, identity_columns: tuple[str, ...]) -> list[dict[str, str]]:
    groups: dict[tuple[str, ...], list[dict[str, str]]] = {}
    for row in rows:
        identity = tuple(normalize_whitespace(row.get(column_name, "")).lower() for column_name in identity_columns)
        groups.setdefault(identity, []).append(dict(row))

    merged_rows: list[dict[str, str]] = []
    for group_rows in groups.values():
        canonical = sorted(group_rows, key=lambda row: normalize_whitespace(row.get(primary_key, "")))[0]
        merged_row = dict(canonical)
        all_columns = sorted({column_name for row in group_rows for column_name in row})
        for column_name in all_columns:
            if column_name == primary_key:
                merged_row[column_name] = normalize_whitespace(canonical.get(column_name, ""))
                continue
            merged_row[column_name] = choose_preferred_value(column_name, [row.get(column_name, "") for row in group_rows])
        merged_rows.append(merged_row)
    return sorted(merged_rows, key=lambda row: normalize_whitespace(row.get(primary_key, "")))


def remap_foreign_keys_from_email(
    tables: Tables,
    child_table: str,
    fk_column: str,
    email_column: str,
    parent_table: str,
    parent_key_column: str,
    parent_email_column: str,
) -> Tables:
    updated = clone_tables(tables)
    parent_lookup: dict[str, str] = {}
    for row in sorted(updated[parent_table], key=lambda item: normalize_whitespace(item.get(parent_key_column, ""))):
        email = normalize_email(row.get(parent_email_column, ""))
        if email and email not in parent_lookup:
            parent_lookup[email] = normalize_whitespace(row.get(parent_key_column, ""))
    for row in updated[child_table]:
        email = normalize_email(row.get(email_column, ""))
        if email in parent_lookup:
            row[fk_column] = parent_lookup[email]
    return updated


def normalize_columns(tables: Tables, table_name: str, column_transforms: dict[str, Callable[[object], str]]) -> Tables:
    updated = clone_tables(tables)
    for row in updated[table_name]:
        for column_name, transform in column_transforms.items():
            row[column_name] = transform(row.get(column_name, ""))
    return updated


def fill_customer_state_from_city(tables: Tables) -> Tables:
    updated = clone_tables(tables)
    for row in updated["customers"]:
        row["state"] = fill_state_from_city(row.get("city", ""), row.get("state", ""))
    return updated


def merge_easy_customers(tables: Tables) -> Tables:
    updated = clone_tables(tables)
    updated["customers"] = dedupe_rows(updated["customers"], "customer_id", ("email",))
    return updated


def drop_inactive_customers(tables: Tables) -> Tables:
    updated = clone_tables(tables)
    updated["customers"] = [row for row in updated["customers"] if normalize_whitespace(row.get("status", "")).lower() != "inactive"]
    return updated


def dedupe_orders_by_source_id(tables: Tables) -> Tables:
    updated = clone_tables(tables)
    groups: dict[str, list[dict[str, str]]] = {}
    for row in updated["orders"]:
        identity = normalize_whitespace(row.get("source_order_id", "")) or normalize_whitespace(row.get("order_id", ""))
        groups.setdefault(identity, []).append(dict(row))

    merged_rows: list[dict[str, str]] = []
    for identity, group_rows in groups.items():
        canonical = sorted(group_rows, key=lambda row: normalize_whitespace(row.get("order_id", "")))[0]
        merged = dict(canonical)
        merged["order_id"] = identity
        all_columns = sorted({column for row in group_rows for column in row})
        for column_name in all_columns:
            if column_name in {"order_id", "source_order_id"}:
                continue
            merged[column_name] = choose_preferred_value(column_name, [row.get(column_name, "") for row in group_rows])
        merged.pop("source_order_id", None)
        merged_rows.append(merged)
    updated["orders"] = sorted(merged_rows, key=lambda row: normalize_whitespace(row.get("order_id", "")))
    return updated


def drop_cancelled_orders(tables: Tables) -> Tables:
    updated = clone_tables(tables)
    updated["orders"] = [row for row in updated["orders"] if normalize_whitespace(row.get("order_status", "")).lower() not in {"cancelled", "canceled"}]
    return updated


def round_order_amounts_to_int(tables: Tables) -> Tables:
    updated = clone_tables(tables)
    for row in updated["orders"]:
        normalized = normalize_amount(row.get("total_amount", ""))
        try:
            row["total_amount"] = str(int(round(float(normalized))))
        except ValueError:
            row["total_amount"] = normalized
    return updated


def merge_hard_customers_by_email(tables: Tables) -> Tables:
    updated = clone_tables(tables)
    updated["customers"] = dedupe_rows(updated["customers"], "customer_id", ("email",))
    return updated


def remove_duplicate_payments(tables: Tables) -> Tables:
    updated = clone_tables(tables)
    groups: dict[tuple[str, str, str, str], list[dict[str, str]]] = {}
    for row in updated["payments"]:
        identity = (
            normalize_email(row.get("customer_email", "")),
            normalize_whitespace(row.get("subscription_id", "")),
            normalize_amount(row.get("amount", "")),
            normalize_date(row.get("paid_at", "")),
        )
        groups.setdefault(identity, []).append(dict(row))
    deduped = [sorted(group_rows, key=lambda row: normalize_whitespace(row.get("payment_id", "")))[0] for group_rows in groups.values()]
    updated["payments"] = sorted(deduped, key=lambda row: normalize_whitespace(row.get("payment_id", "")))
    return updated


def drop_orphaned_subscriptions_and_payments(tables: Tables) -> Tables:
    updated = clone_tables(tables)
    valid_customer_ids = {normalize_whitespace(row.get("customer_id", "")) for row in updated["customers"]}
    updated["subscriptions"] = [row for row in updated["subscriptions"] if normalize_whitespace(row.get("customer_id", "")) in valid_customer_ids]
    updated["payments"] = [row for row in updated["payments"] if normalize_whitespace(row.get("customer_id", "")) in valid_customer_ids]
    return updated


def _task_from_solution(
    *,
    task_id: str,
    title: str,
    difficulty: str,
    objective: str,
    dataset_context: str,
    max_steps: int,
    review_budget: int,
    sync_targets: tuple[str, ...],
    primary_keys: dict[str, str],
    duplicate_identity_columns: dict[str, tuple[str, ...]],
    dirty_tables: Tables,
    validation_rules: tuple[ValidationRule, ...],
    operations: dict[str, OperationSpec],
    solution_operation_ids: tuple[str, ...],
    issue_cards: tuple[IssueCard, ...],
    review_cases: dict[str, ReviewCaseSpec],
) -> TaskSpec:
    gold_tables = clone_tables(dirty_tables)
    for operation_id in solution_operation_ids:
        gold_tables = operations[operation_id].transform(gold_tables)
    return TaskSpec(
        task_id=task_id,
        title=title,
        difficulty=difficulty,
        objective=objective,
        dataset_context=dataset_context,
        max_steps=max_steps,
        review_budget=review_budget,
        sync_targets=sync_targets,
        primary_keys=primary_keys,
        duplicate_identity_columns=duplicate_identity_columns,
        dirty_tables=dirty_tables,
        gold_tables=gold_tables,
        validation_rules=validation_rules,
        operations=operations,
        solution_operation_ids=solution_operation_ids,
        issue_cards=issue_cards,
        review_cases=review_cases,
    )


def _build_easy_task() -> TaskSpec:
    dirty_tables = {
        "customers": [
            {"customer_id": "C001", "full_name": " alice johnson ", "email": "ALICE@example.com ", "phone": "615.555.0101", "state": "tn", "city": "Nashville", "status": "pending"},
            {"customer_id": "C002", "full_name": "Bob smith", "email": " bob.smith@example.com", "phone": "(615) 555-0102", "state": "Tennessee", "city": "Nashville", "status": "active"},
            {"customer_id": "C003", "full_name": "Carla Gomez", "email": "carla.gomez@example.com", "phone": "5550103", "state": "", "city": "Austin", "status": "pending"},
            {"customer_id": "C004", "full_name": "Dan wu", "email": "DAN.WU@example.com", "phone": "+1 615-555-0104", "state": " TX ", "city": "Austin", "status": "inactive"},
            {"customer_id": "C005", "full_name": "Alice Johnson", "email": "alice@example.com", "phone": "6155550101", "state": "TN", "city": "Nashville", "status": "active"},
        ]
    }
    operations = {
        "easy_normalize_names": OperationSpec("easy_normalize_names", "Normalize customer names", "standardize_text", "safe", ("customers",), "Trim whitespace and convert full_name to title case.", "Customer-facing names should be consistently formatted before CRM import.", lambda tables: normalize_columns(tables, "customers", {"full_name": normalize_name})),
        "easy_normalize_emails": OperationSpec("easy_normalize_emails", "Normalize customer emails", "standardize_text", "safe", ("customers",), "Lowercase and trim email addresses.", "Case and whitespace inconsistencies hide duplicate customer records.", lambda tables: normalize_columns(tables, "customers", {"email": normalize_email})),
        "easy_normalize_phones": OperationSpec("easy_normalize_phones", "Normalize US phone numbers", "standardize_contact", "safe", ("customers",), "Convert phone numbers into '(AAA) BBB-CCCC' format and infer the missing local area code.", "Support and sales workflows depend on consistent contact fields.", lambda tables: normalize_columns(tables, "customers", {"phone": normalize_phone_us})),
        "easy_normalize_states": OperationSpec("easy_normalize_states", "Normalize state codes", "standardize_geo", "safe", ("customers",), "Convert state names and lower-case abbreviations to two-letter US state codes.", "Downstream tax and routing systems expect canonical state codes.", lambda tables: normalize_columns(tables, "customers", {"state": normalize_state})),
        "easy_fill_state_from_city": OperationSpec("easy_fill_state_from_city", "Fill missing state from city", "impute_missing", "review", ("customers",), "Fill blank state values using a deterministic city-to-state lookup.", "Missing geo fields prevent territory assignment and validation checks.", fill_customer_state_from_city),
        "easy_merge_customers_by_email": OperationSpec("easy_merge_customers_by_email", "Merge duplicate customers by email", "deduplicate", "review", ("customers",), "Collapse rows with the same normalized email into one canonical customer record.", "Duplicate contacts inflate outreach counts and break customer analytics.", merge_easy_customers),
        "easy_drop_inactive_customers": OperationSpec("easy_drop_inactive_customers", "Drop inactive customers", "destructive_filter", "destructive", ("customers",), "Delete all rows where status is inactive.", "This can destroy valid historical records and should be avoided for this task.", drop_inactive_customers),
    }
    validation_rules = (
        RequiredRule("customers", "customer_id", "high"),
        RequiredRule("customers", "full_name", "high"),
        RequiredRule("customers", "email", "high"),
        RequiredRule("customers", "phone", "medium"),
        RequiredRule("customers", "state", "medium"),
        PatternRule("customers", "email", EMAIL_RE, "Email must be lowercase and valid.", "high"),
        PatternRule("customers", "phone", PHONE_RE, "Phone must use '(AAA) BBB-CCCC' format.", "medium"),
        EnumRule("customers", "state", tuple(sorted(STATE_CODES)), "medium"),
        EnumRule("customers", "status", ("active", "inactive", "pending"), "low"),
        UniqueRule("customers", ("email",), "high"),
    )
    issue_cards = (
        IssueCard(title="Contact formatting is inconsistent", detail="Names, emails, phones, and state values need canonical formatting before handoff.", issue_codes=["pattern:customers.email", "pattern:customers.phone", "enum:customers.state"], recommended_operation_ids=["easy_normalize_names", "easy_normalize_emails", "easy_normalize_phones", "easy_normalize_states"]),
        IssueCard(title="A missing state value blocks validation", detail="One customer record has city information but no state code.", issue_codes=["required:customers.state"], recommended_operation_ids=["easy_fill_state_from_city"]),
        IssueCard(title="Duplicate customer identities exist", detail="Two rows refer to the same customer once emails are normalized.", issue_codes=["unique:customers.email"], recommended_operation_ids=["easy_merge_customers_by_email"]),
    )
    review_cases = {
        "easy_customer_duplicate_review": ReviewCaseSpec(
            review_id="easy_customer_duplicate_review",
            entity_type="customer",
            entity_id="C005",
            reason_code="possible_duplicate",
            title="Confirm duplicate customer merge",
            detail="Alice Johnson appears twice with status conflicts after email normalization.",
            resolution="merge_confirmed_keep_c001",
            response_summary="Merge C005 into C001. Keep the active account record and preserve inactive customers elsewhere in the file.",
            evidence_summary="Normalized emails match and both rows describe the same Nashville customer; C001 is the canonical CRM ID.",
            recommended_operation_ids=("easy_merge_customers_by_email",),
        )
    }
    return _task_from_solution(
        task_id="customer_contacts_easy",
        title="Customer Contacts Standardization",
        difficulty="easy",
        objective="Prepare a customer-contact export for CRM import by standardizing contact fields, filling one missing state, and merging duplicate customer rows without deleting valid inactive accounts.",
        dataset_context="This table simulates a weekly B2B CRM export that sales ops cleans before loading into a customer system.",
        max_steps=10,
        review_budget=1,
        sync_targets=("crm",),
        primary_keys={"customers": "customer_id"},
        duplicate_identity_columns={"customers": ("email",)},
        dirty_tables=dirty_tables,
        validation_rules=validation_rules,
        operations=operations,
        solution_operation_ids=("easy_normalize_names", "easy_normalize_emails", "easy_normalize_phones", "easy_normalize_states", "easy_fill_state_from_city", "easy_merge_customers_by_email"),
        issue_cards=issue_cards,
        review_cases=review_cases,
    )


def _build_medium_task() -> TaskSpec:
    dirty_tables = {
        "orders": [
            {"order_id": "O1001", "customer_email": "alice@example.com", "order_date": "2025/01/05", "currency": "usd", "total_amount": "1,200.00", "order_status": "Shipped", "shipping_state": "tn", "source_order_id": ""},
            {"order_id": "O1002", "customer_email": "bob.smith@example.com", "order_date": "01-06-2025", "currency": "$", "total_amount": "45.50", "order_status": "pending ", "shipping_state": "Tennessee", "source_order_id": ""},
            {"order_id": "O1003", "customer_email": "carla.gomez@example.com", "order_date": "2025-01-07", "currency": "USD ", "total_amount": "12O.00", "order_status": "cancelled", "shipping_state": "TX", "source_order_id": ""},
            {"order_id": "O1004", "customer_email": "dan.wu@example.com", "order_date": "2025.01.08", "currency": "usd", "total_amount": "89", "order_status": "Ship", "shipping_state": " tx ", "source_order_id": ""},
            {"order_id": "O1002", "customer_email": "bob.smith@example.com", "order_date": "2025-01-06", "currency": "USD", "total_amount": "45.50", "order_status": "PENDING", "shipping_state": "TN", "source_order_id": "O1002"},
            {"order_id": "O1005", "customer_email": "enterprise@example.com", "order_date": "2025/01/09", "currency": "usd", "total_amount": "2500", "order_status": "Returned", "shipping_state": "CA", "source_order_id": ""},
        ]
    }
    order_status_map = {"shipped": "SHIPPED", "ship": "SHIPPED", "pending": "PENDING", "cancelled": "CANCELLED", "returned": "RETURNED"}
    operations = {
        "med_normalize_dates": OperationSpec("med_normalize_dates", "Normalize order dates", "standardize_dates", "safe", ("orders",), "Convert order_date values into ISO 8601 format (YYYY-MM-DD).", "Finance systems reject mixed date formats during settlement reconciliation.", lambda tables: normalize_columns(tables, "orders", {"order_date": normalize_date})),
        "med_normalize_currency_amounts": OperationSpec("med_normalize_currency_amounts", "Normalize currency and amounts", "standardize_money", "review", ("orders",), "Standardize currency to USD and normalize total_amount to two-decimal strings.", "Revenue aggregation fails when currency and amount encodings are inconsistent.", lambda tables: normalize_columns(tables, "orders", {"currency": normalize_currency, "total_amount": normalize_amount})),
        "med_normalize_order_statuses": OperationSpec("med_normalize_order_statuses", "Canonicalize order statuses", "standardize_status", "safe", ("orders",), "Map free-text order status values to SHIPPED, PENDING, CANCELLED, or RETURNED.", "Operational dashboards and SLAs depend on normalized state machines.", lambda tables: normalize_columns(tables, "orders", {"order_status": lambda value: normalize_status(value, order_status_map)})),
        "med_normalize_shipping_states": OperationSpec("med_normalize_shipping_states", "Normalize shipping state codes", "standardize_geo", "safe", ("orders",), "Convert shipping_state values into two-letter US state codes.", "Warehouse routing rules require canonical geography fields.", lambda tables: normalize_columns(tables, "orders", {"shipping_state": normalize_state})),
        "med_dedupe_orders": OperationSpec("med_dedupe_orders", "Remove duplicated order exports", "deduplicate", "review", ("orders",), "Merge duplicate order rows using source_order_id when present.", "Duplicate transactions overstate revenue and shipment volume.", dedupe_orders_by_source_id),
        "med_drop_cancelled_orders": OperationSpec("med_drop_cancelled_orders", "Drop cancelled orders", "destructive_filter", "destructive", ("orders",), "Delete all cancelled rows from the table.", "Cancelled orders are still valid operational records and should not be removed here.", drop_cancelled_orders),
        "med_round_amounts_to_int": OperationSpec("med_round_amounts_to_int", "Round all order totals to whole dollars", "destructive_transform", "destructive", ("orders",), "Round every total_amount to an integer string.", "This destroys cents precision and should be rejected.", round_order_amounts_to_int),
    }
    validation_rules = (
        RequiredRule("orders", "order_id", "high"),
        RequiredRule("orders", "customer_email", "high"),
        RequiredRule("orders", "order_date", "high"),
        RequiredRule("orders", "currency", "high"),
        RequiredRule("orders", "total_amount", "high"),
        RequiredRule("orders", "order_status", "high"),
        RequiredRule("orders", "shipping_state", "medium"),
        PatternRule("orders", "customer_email", EMAIL_RE, "Customer email must be canonical.", "high"),
        PatternRule("orders", "order_date", ISO_DATE_RE, "Order date must be YYYY-MM-DD.", "high"),
        PatternRule("orders", "total_amount", AMOUNT_RE, "Amount must have exactly two decimals.", "high"),
        EnumRule("orders", "currency", ("USD",), "high"),
        EnumRule("orders", "order_status", ("SHIPPED", "PENDING", "CANCELLED", "RETURNED"), "medium"),
        EnumRule("orders", "shipping_state", tuple(sorted(STATE_CODES)), "medium"),
        UniqueRule("orders", ("order_id",), "high"),
    )
    issue_cards = (
        IssueCard(title="Dates, money, and statuses use mixed encodings", detail="The order export mixes separators, symbols, and ad hoc status spellings.", issue_codes=["pattern:orders.order_date", "pattern:orders.total_amount", "enum:orders.order_status", "enum:orders.currency"], recommended_operation_ids=["med_normalize_dates", "med_normalize_currency_amounts", "med_normalize_order_statuses"]),
        IssueCard(title="Shipping state labels are not canonical", detail="Downstream warehouse tools require two-letter state abbreviations.", issue_codes=["enum:orders.shipping_state"], recommended_operation_ids=["med_normalize_shipping_states"]),
        IssueCard(title="A duplicated order row exists", detail="One record is a second export copy of another order.", issue_codes=["unique:orders.order_id"], recommended_operation_ids=["med_dedupe_orders"]),
    )
    review_cases = {
        "med_returned_order_review": ReviewCaseSpec(
            review_id="med_returned_order_review",
            entity_type="order",
            entity_id="O1005",
            reason_code="preserve_operational_record",
            title="Confirm whether returned order should be retained",
            detail="Returned orders often look removable during cleanup, but finance may still require them.",
            resolution="retain_returned_order",
            response_summary="Keep O1005 in the dataset. Normalize it, but do not delete returned or cancelled orders for this reconciliation task.",
            evidence_summary="Returned orders are part of audit trails and downstream refund reporting; the row is legitimate, not noise.",
            recommended_operation_ids=("med_normalize_dates", "med_normalize_currency_amounts", "med_normalize_order_statuses"),
        )
    }
    return _task_from_solution(
        task_id="orders_reconciliation_medium",
        title="E-commerce Order Reconciliation",
        difficulty="medium",
        objective="Clean a transactional orders export by normalizing dates, money, statuses, and shipping states while deduplicating repeated order exports without deleting legitimate cancelled orders.",
        dataset_context="This table simulates a daily order extract from an e-commerce platform that revenue ops must reconcile before BI ingestion.",
        max_steps=12,
        review_budget=1,
        sync_targets=("crm", "billing"),
        primary_keys={"orders": "order_id"},
        duplicate_identity_columns={"orders": ("order_id",)},
        dirty_tables=dirty_tables,
        validation_rules=validation_rules,
        operations=operations,
        solution_operation_ids=("med_normalize_dates", "med_normalize_currency_amounts", "med_normalize_order_statuses", "med_normalize_shipping_states", "med_dedupe_orders"),
        issue_cards=issue_cards,
        review_cases=review_cases,
    )


def _build_hard_task() -> TaskSpec:
    dirty_tables = {
        "customers": [
            {"customer_id": "CU100", "full_name": "Ana Lopez", "email": "ana.lopez@example.com ", "lifecycle_stage": "Active"},
            {"customer_id": "CU101", "full_name": "Ana  Lopez", "email": "ANA.LOPEZ@example.com", "lifecycle_stage": "active"},
            {"customer_id": "CU102", "full_name": "Ben Carter", "email": "ben.carter@example.com", "lifecycle_stage": "trial"},
            {"customer_id": "CU103", "full_name": "Mia Chen", "email": "mia.chen@example.com", "lifecycle_stage": "churn_risk"},
        ],
        "subscriptions": [
            {"subscription_id": "S900", "customer_id": "CU101", "customer_email": "ana.lopez@example.com", "plan_code": "BASIC monthly", "status": "active", "renewal_date": "2025/02/01"},
            {"subscription_id": "S901", "customer_id": "CU102", "customer_email": "ben.carter@example.com", "plan_code": "pro annual", "status": "trial ", "renewal_date": "02-15-2025"},
            {"subscription_id": "S902", "customer_id": "CU999", "customer_email": "mia.chen@example.com", "plan_code": "BASIC", "status": "past_due", "renewal_date": "2025.03.01"},
        ],
        "payments": [
            {"payment_id": "P500", "customer_id": "CU100", "customer_email": "ana.lopez@example.com", "subscription_id": "S900", "amount": "29", "currency": "usd", "payment_status": "Paid", "paid_at": "2025/01/01"},
            {"payment_id": "P501", "customer_id": "", "customer_email": "ben.carter@example.com", "subscription_id": "S901", "amount": "299.0", "currency": "USD ", "payment_status": "settled", "paid_at": "01-15-2025"},
            {"payment_id": "P502", "customer_id": "CU999", "customer_email": "mia.chen@example.com", "subscription_id": "S902", "amount": "29.00", "currency": "usd", "payment_status": "FAILED ", "paid_at": "2025.02.01"},
            {"payment_id": "P503", "customer_id": "CU101", "customer_email": "ana.lopez@example.com", "subscription_id": "S900", "amount": "29.00", "currency": "usd", "payment_status": "paid", "paid_at": "2025-01-01"},
        ],
    }
    lifecycle_map = {"active": "ACTIVE", "trial": "TRIAL", "churn_risk": "CHURN_RISK"}
    subscription_status_map = {"active": "ACTIVE", "trial": "TRIAL", "past_due": "PAST_DUE"}
    payment_status_map = {"paid": "PAID", "settled": "PAID", "failed": "FAILED"}
    plan_map = {"basic_monthly": "BASIC_MONTHLY", "basic": "BASIC_MONTHLY", "pro_annual": "PRO_ANNUAL"}
    operations = {
        "hard_normalize_customer_fields": OperationSpec("hard_normalize_customer_fields", "Normalize customer master records", "standardize_master_data", "safe", ("customers",), "Trim customer names/emails and standardize lifecycle_stage values.", "Canonical customer keys and lifecycle labels are the backbone of the migration.", lambda tables: normalize_columns(tables, "customers", {"full_name": normalize_name, "email": normalize_email, "lifecycle_stage": lambda value: normalize_status(value, lifecycle_map)})),
        "hard_merge_customers_by_email": OperationSpec("hard_merge_customers_by_email", "Merge duplicated CRM customers", "deduplicate_master_data", "review", ("customers",), "Merge customer rows with the same normalized email and keep the lowest customer_id as the canonical ID.", "Duplicate customer IDs fragment subscriptions and payments across child tables.", merge_hard_customers_by_email),
        "hard_normalize_subscriptions": OperationSpec("hard_normalize_subscriptions", "Normalize subscription records", "standardize_subscriptions", "safe", ("subscriptions",), "Normalize subscription plan codes, statuses, renewal dates, and customer emails.", "Subscription analytics and renewals automation rely on stable plan/status values.", lambda tables: normalize_columns(tables, "subscriptions", {"customer_email": normalize_email, "plan_code": lambda value: normalize_status(value, plan_map), "status": lambda value: normalize_status(value, subscription_status_map), "renewal_date": normalize_date})),
        "hard_repair_subscription_customer_refs": OperationSpec("hard_repair_subscription_customer_refs", "Repair subscription customer references", "repair_foreign_keys", "review", ("subscriptions",), "Rewrite subscriptions.customer_id by matching customer_email against the current customers table.", "Migration import will reject subscriptions that reference unknown customer IDs.", lambda tables: remap_foreign_keys_from_email(tables, "subscriptions", "customer_id", "customer_email", "customers", "customer_id", "email")),
        "hard_normalize_payments": OperationSpec("hard_normalize_payments", "Normalize payment ledger rows", "standardize_payments", "safe", ("payments",), "Normalize payment customer emails, amounts, currency, statuses, and paid_at dates.", "Consistent payment values are required for revenue reconciliation.", lambda tables: normalize_columns(tables, "payments", {"customer_email": normalize_email, "amount": normalize_amount, "currency": normalize_currency, "payment_status": lambda value: normalize_status(value, payment_status_map), "paid_at": normalize_date})),
        "hard_repair_payment_customer_refs": OperationSpec("hard_repair_payment_customer_refs", "Repair payment customer references", "repair_foreign_keys", "review", ("payments",), "Rewrite payments.customer_id using the payment customer_email and the current customers table.", "Payment facts must link to customer dimensions after the dedupe migration.", lambda tables: remap_foreign_keys_from_email(tables, "payments", "customer_id", "customer_email", "customers", "customer_id", "email")),
        "hard_remove_duplicate_payments": OperationSpec("hard_remove_duplicate_payments", "Remove duplicate payment facts", "deduplicate_facts", "review", ("payments",), "Collapse duplicated payment rows sharing customer, subscription, amount, and paid_at values.", "A duplicated ledger entry double-counts revenue.", remove_duplicate_payments),
        "hard_drop_orphaned_rows": OperationSpec("hard_drop_orphaned_rows", "Drop orphaned subscriptions and payments", "destructive_filter", "destructive", ("subscriptions", "payments"), "Delete child rows whose customer_id is not present in customers.", "Dropping business records hides data-quality issues and loses legitimate revenue facts.", drop_orphaned_subscriptions_and_payments),
    }
    validation_rules = (
        RequiredRule("customers", "customer_id", "high"),
        RequiredRule("customers", "full_name", "high"),
        RequiredRule("customers", "email", "high"),
        RequiredRule("customers", "lifecycle_stage", "medium"),
        PatternRule("customers", "email", EMAIL_RE, "Customer email must be canonical.", "high"),
        EnumRule("customers", "lifecycle_stage", ("ACTIVE", "TRIAL", "CHURN_RISK"), "medium"),
        UniqueRule("customers", ("email",), "high"),
        RequiredRule("subscriptions", "subscription_id", "high"),
        RequiredRule("subscriptions", "customer_id", "high"),
        RequiredRule("subscriptions", "customer_email", "medium"),
        RequiredRule("subscriptions", "plan_code", "high"),
        RequiredRule("subscriptions", "status", "medium"),
        RequiredRule("subscriptions", "renewal_date", "medium"),
        PatternRule("subscriptions", "customer_email", EMAIL_RE, "Subscription email must be canonical.", "medium"),
        PatternRule("subscriptions", "renewal_date", ISO_DATE_RE, "Renewal date must be YYYY-MM-DD.", "medium"),
        EnumRule("subscriptions", "plan_code", ("BASIC_MONTHLY", "PRO_ANNUAL"), "medium"),
        EnumRule("subscriptions", "status", ("ACTIVE", "TRIAL", "PAST_DUE"), "medium"),
        UniqueRule("subscriptions", ("subscription_id",), "high"),
        ForeignKeyRule("subscriptions", "customer_id", "customers", "customer_id", "high"),
        RequiredRule("payments", "payment_id", "high"),
        RequiredRule("payments", "customer_id", "high"),
        RequiredRule("payments", "customer_email", "medium"),
        RequiredRule("payments", "subscription_id", "high"),
        RequiredRule("payments", "amount", "high"),
        RequiredRule("payments", "currency", "high"),
        RequiredRule("payments", "payment_status", "high"),
        RequiredRule("payments", "paid_at", "high"),
        PatternRule("payments", "customer_email", EMAIL_RE, "Payment email must be canonical.", "medium"),
        PatternRule("payments", "amount", AMOUNT_RE, "Payment amount must have two decimals.", "high"),
        PatternRule("payments", "paid_at", ISO_DATE_RE, "Payment date must be YYYY-MM-DD.", "medium"),
        EnumRule("payments", "currency", ("USD",), "medium"),
        EnumRule("payments", "payment_status", ("PAID", "FAILED"), "medium"),
        UniqueRule("payments", ("customer_email", "subscription_id", "amount", "paid_at"), "high"),
        ForeignKeyRule("payments", "customer_id", "customers", "customer_id", "high"),
        ForeignKeyRule("payments", "subscription_id", "subscriptions", "subscription_id", "high"),
    )
    issue_cards = (
        IssueCard(title="Customer master data contains duplicated identities", detail="Two Ana Lopez records use different customer IDs but the same email after normalization.", issue_codes=["unique:customers.email", "pattern:customers.email"], recommended_operation_ids=["hard_normalize_customer_fields", "hard_merge_customers_by_email"]),
        IssueCard(title="Child tables contain invalid customer references", detail="Subscription and payment rows reference stale or blank customer IDs that must be repaired from email joins.", issue_codes=["foreign_key:subscriptions.customer_id", "foreign_key:payments.customer_id", "required:payments.customer_id"], recommended_operation_ids=["hard_repair_subscription_customer_refs", "hard_repair_payment_customer_refs"]),
        IssueCard(title="Subscription and payment facts use inconsistent formats", detail="Plans, statuses, dates, amounts, and currency values need canonicalization before loading.", issue_codes=["enum:subscriptions.plan_code", "enum:subscriptions.status", "pattern:subscriptions.renewal_date", "pattern:payments.amount", "enum:payments.payment_status", "pattern:payments.paid_at"], recommended_operation_ids=["hard_normalize_subscriptions", "hard_normalize_payments"]),
        IssueCard(title="Duplicate payment facts are present", detail="Two payment rows represent the same invoice settlement and one should be removed.", issue_codes=["unique:payments.customer_email+subscription_id+amount+paid_at"], recommended_operation_ids=["hard_remove_duplicate_payments"]),
    )
    review_cases = {
        "hard_customer_merge_review": ReviewCaseSpec(
            review_id="hard_customer_merge_review",
            entity_type="customer",
            entity_id="CU101",
            reason_code="possible_duplicate",
            title="Confirm duplicate customer merge",
            detail="CU100 and CU101 normalize to the same email, but child tables disagree on which customer ID is canonical.",
            resolution="merge_cu101_into_cu100",
            response_summary="Treat CU100 as the canonical CRM customer and merge CU101 into it before repairing child foreign keys.",
            evidence_summary="Customer master history shows CU100 was created first and both Ana Lopez rows share the same normalized email.",
            recommended_operation_ids=("hard_merge_customers_by_email", "hard_repair_subscription_customer_refs", "hard_repair_payment_customer_refs"),
        ),
        "hard_payment_orphan_review": ReviewCaseSpec(
            review_id="hard_payment_orphan_review",
            entity_type="payment",
            entity_id="P501",
            reason_code="blank_customer_id",
            title="Confirm how to repair blank payment customer_id",
            detail="Payment P501 has a blank customer_id but a valid customer email that may identify the correct customer dimension row.",
            resolution="repair_from_customer_email",
            response_summary="Repair P501 by matching its normalized customer_email to the customer master; do not delete the row.",
            evidence_summary="The billing export preserved ben.carter@example.com, so the customer foreign key can be restored deterministically.",
            recommended_operation_ids=("hard_normalize_payments", "hard_repair_payment_customer_refs"),
        ),
    }
    return _task_from_solution(
        task_id="crm_migration_hard",
        title="CRM Migration Referential Cleanup",
        difficulty="hard",
        objective="Repair a three-table CRM migration extract by standardizing customer, subscription, and payment data; merging duplicate customers; fixing foreign keys from email joins; and removing duplicate payment facts without dropping legitimate orphan-like child rows.",
        dataset_context="This dataset simulates a SaaS CRM and billing migration where a team must clean customer master data and child ledger references before import.",
        max_steps=18,
        review_budget=2,
        sync_targets=("crm", "billing"),
        primary_keys={"customers": "customer_id", "subscriptions": "subscription_id", "payments": "payment_id"},
        duplicate_identity_columns={"customers": ("email",), "subscriptions": ("subscription_id",), "payments": ("customer_email", "subscription_id", "amount", "paid_at")},
        dirty_tables=dirty_tables,
        validation_rules=validation_rules,
        operations=operations,
        solution_operation_ids=("hard_normalize_customer_fields", "hard_merge_customers_by_email", "hard_normalize_subscriptions", "hard_repair_subscription_customer_refs", "hard_normalize_payments", "hard_repair_payment_customer_refs", "hard_remove_duplicate_payments"),
        issue_cards=issue_cards,
        review_cases=review_cases,
    )


TASK_CATALOG: dict[str, TaskSpec] = {spec.task_id: spec for spec in (_build_easy_task(), _build_medium_task(), _build_hard_task())}


def list_task_ids() -> list[str]:
    return list(TASK_CATALOG.keys())


def get_task_spec(task_id: str) -> TaskSpec:
    if task_id not in TASK_CATALOG:
        raise KeyError(f"Unknown task_id '{task_id}'. Available tasks: {list_task_ids()}")
    return TASK_CATALOG[task_id]


def first_table_name(task_spec: TaskSpec) -> str:
    return next(iter(task_spec.dirty_tables))


def apply_operation_to_tables(task_spec: TaskSpec, tables: Tables, operation_id: str) -> Tables:
    if operation_id not in task_spec.operations:
        raise KeyError(f"Unknown operation_id '{operation_id}' for task '{task_spec.task_id}'.")
    transform = task_spec.operations[operation_id].transform
    return transform(clone_tables(tables))


def sorted_rows(rows: list[dict[str, str]], primary_key: str) -> list[dict[str, str]]:
    return sorted(copy.deepcopy(rows), key=lambda row: normalize_whitespace(row.get(primary_key, "")))