File size: 25,036 Bytes
16dc556
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
"""Engine regression tests: profiler detection, every executor op, end-to-end."""

import math

import pandas as pd
import pytest

from scrubdata import apply_plan, mock_plan, profile_dataframe
from scrubdata import detect
from eval.metrics import is_valid


def _col_plan(name, ops):
    return {"table_operations": [], "flags": [],
            "columns": [{"name": name, "operations": ops}]}


def _apply(series_vals, ops, col="x"):
    df = pd.DataFrame({col: series_vals})
    out, _ = apply_plan(df, _col_plan(col, ops))
    return out[col].tolist()


# ---- value-level ops --------------------------------------------------------

def test_strip_whitespace():
    assert _apply(["  a  b ", "c"], [{"op": "strip_whitespace"}]) == ["a b", "c"]


def test_parse_currency_us_eu_accounting():
    out = _apply(["$1,200.50", "1.200,50", "(500)", "950"], [{"op": "parse_currency"}])
    assert out == [1200.50, 1200.50, -500.0, 950.0]


def test_parse_percent():
    out = _apply(["12.5%", "100%"], [{"op": "parse_percent"}])
    assert out[0] == pytest.approx(0.125) and out[1] == pytest.approx(1.0)


def test_parse_date_formats():
    out = _apply(["2023-01-05", "1/6/2023", "5 Jan 2023", "44931"],
                 [{"op": "parse_date"}])
    assert out[0] == "2023-01-05"
    assert out[1] == "2023-01-06"
    assert out[2] == "2023-01-05"
    assert out[3] == "2023-01-05"  # excel serial


def test_standardize_boolean():
    out = _apply(["Yes", "Y", "TRUE", "1", "No", "N", "FALSE", "0"],
                 [{"op": "standardize_boolean"}])
    assert out == [True, True, True, True, False, False, False, False]


def test_standardize_phone_us():
    assert _apply(["555.123.4567"], [{"op": "standardize_phone"}]) == ["(555) 123-4567"]


def test_normalize_email():
    assert _apply([" Bob@X.COM "], [{"op": "normalize_email"}]) == ["bob@x.com"]


def test_standardize_case():
    assert _apply(["hello WORLD"], [{"op": "standardize_case", "case": "title"}]) == ["Hello World"]


def test_normalize_disguised_nulls():
    out = _apply(["N/A", "-", "real"], [{"op": "normalize_disguised_nulls"}])
    assert out[0] is None or pd.isna(out[0])
    assert out[2] == "real"


def test_canonicalize_categories():
    out = _apply(["usa", "U.S.A"], [{"op": "canonicalize_categories",
                  "mapping": {"usa": "United States", "U.S.A": "United States"}}])
    assert out == ["United States", "United States"]


# ---- table-level + end-to-end ----------------------------------------------

def test_drop_dupes_empty_rows_cols():
    df = pd.DataFrame({"a": ["1", "1", ""], "junk": ["", "", ""]})
    plan = {"table_operations": [
        {"op": "drop_empty_columns", "columns": ["junk"]},
        {"op": "drop_empty_rows"}, {"op": "drop_exact_duplicates"}],
        "columns": [], "flags": []}
    out, _ = apply_plan(df, plan)
    assert list(out.columns) == ["a"]
    assert out["a"].tolist() == ["1"]


def test_sample_end_to_end():
    df = pd.read_csv("samples/dirty_contacts.csv", dtype=str, keep_default_na=False)
    before = profile_dataframe(df)
    plan = mock_plan(df, before)
    cleaned, log = apply_plan(df, plan)
    assert "notes2" not in cleaned.columns          # empty col dropped
    assert len(cleaned) == 13                        # 16 -> dedup/empty -> 13
    assert is_valid(plan)                            # plan conforms to schema


def test_phone_conservatism():
    # consistent format -> heuristic should NOT emit standardize_phone
    df = pd.DataFrame({"phone": ["5551234567", "5559876543", "5550001111"]})
    plan = mock_plan(df)
    ops = [o["op"] for c in plan["columns"] for o in c["operations"]]
    assert "standardize_phone" not in ops


def test_detect_types():
    assert detect.detect_semantic_type("email", ["a@b.com", "c@d.com"]) == "email"
    assert detect.detect_semantic_type("x", ["Yes", "No", "Y"]) == "boolean"


def test_batched_planner():
    # agentic column-batching wrapper merges per-batch plans + deterministic table ops
    from scrubdata.model_planner import make_batched_planner
    from scrubdata.planner import mock_plan
    df = pd.read_csv("samples/dirty_contacts.csv", dtype=str, keep_default_na=False)
    plan = make_batched_planner(mock_plan, batch_size=3)(df)
    names = {c["name"] for c in plan["columns"]}
    assert {"country", "amount"} <= names                       # all columns covered
    assert any(o["op"] == "drop_empty_columns" for o in plan["table_operations"])
    assert is_valid(plan)


def test_reconcile_grounds_and_abstains():
    from scrubdata.reconcile import default_index
    idx = default_index()
    assert idx.reconcile("USA", "country")[0] == "United States"
    assert idx.reconcile("Germny", "country")[0] == "Germany"      # fuzzy
    assert idx.reconcile("Xyzzylandia", "country") is None          # ABSTAIN
    assert idx.reconcile("Califrnia", "state")[0] == "California"


def test_grounded_planner_no_wrong_merge():
    # 'guntxrsvillx' (a town not in the reference) must NOT be merged into a similar real
    # city — the structural fix for guntxrsvillx->huntsville.
    df = pd.DataFrame({"loc": ["birminghxm", "Birmingham", "guntxrsvillx", "Chicago",
                               "Chcago", "Birmingham", "Chicago", "Birmingham"]})
    plan = mock_plan(df)
    mapping = {k: v for c in plan["columns"] for o in c["operations"]
               if o["op"] == "canonicalize_categories" for k, v in o["mapping"].items()}
    assert mapping.get("birminghxm") == "Birmingham"
    assert mapping.get("guntxrsvillx", "") != "Huntsville"


def test_grounded_wrapper_overrides_model_overcorrection():
    from scrubdata.grounded import make_grounded_planner
    # a model that over-corrects (invents canonicals + wrong-merges)
    def fake_model(df, *a):
        return {"table_operations": [], "flags": [], "columns": [
            {"name": "city", "detected_semantic_type": "categorical", "issues": [],
             "operations": [{"op": "canonicalize_categories",
                             "mapping": {"birminghxm": "Birmingham City USA",
                                         "guntxrsvillx": "Huntsville"}}]}]}
    df = pd.DataFrame({"city": ["birminghxm", "Birmingham", "guntxrsvillx", "Chicago",
                               "Chcago", "Birmingham", "Chicago", "Birmingham"]})
    plan = make_grounded_planner(fake_model)(df)
    m = {k: v for c in plan["columns"] for o in c["operations"]
         if o["op"] == "canonicalize_categories" for k, v in o["mapping"].items()}
    assert m.get("birminghxm") == "Birmingham"            # grounded, not "Birmingham City USA"
    assert m.get("guntxrsvillx", "") != "Huntsville"      # wrong-merge blocked
    assert any(f["column"] == "city" for f in plan["flags"])   # abstained -> review flag


def test_pii_validators():
    from scrubdata.pii import luhn_ok, _is_credit_card, _is_iban
    assert luhn_ok("4532015112830366")
    assert not luhn_ok("4532015112830367")
    assert _is_credit_card("4532-0151-1283-0366")
    assert not _is_credit_card("1234567890123456")          # fails Luhn
    assert _is_iban("DE89370400440532013000")
    assert not _is_iban("DE89370400440532013001")           # fails mod-97


def test_pii_column_detection_and_negatives():
    from scrubdata.pii import detect_column_pii
    cards = ["4532015112830366", "4716461583322103", "5425233430109903", "4024007103939509"]
    r = detect_column_pii("card", cards)
    assert r and r["pii_type"] == "credit_card" and r["checksum"]
    r = detect_column_pii("ssn", ["123-45-6789", "987-65-4321", "111-22-3333"])
    assert r and r["pii_type"] == "ssn"
    assert detect_column_pii("city", ["Boston", "Chicago", "Dallas", "Boston"]) is None
    assert detect_column_pii("qty", ["1", "2", "3", "4", "5"]) is None


def test_pii_planner_masks_and_never_reformats_identifiers():
    df = pd.DataFrame({
        "card": ["4532015112830366", "4716461583322103", "5425233430109903",
                 "4024007103939509", "370434978549371"],   # last one fails Luhn (80% rate)
        "email": ["ana@corp.io", "luis@mail.com", "sofia@test.org", "raul@corp.io",
                  "mia@mail.com"],
        "city": ["Boston", "Chicago", "Boston", "Dallas", "Chicago"],
    })
    plan = mock_plan(df)
    assert is_valid(plan)
    ops = {c["name"]: [o["op"] for o in c["operations"]] for c in plan["columns"]}
    # checksum-confirmed at 80% coverage -> still auto-masked, never parse_number'd
    assert ops["card"] == ["flag_pii", "mask_pii"]
    assert "flag_pii" in ops["email"] and "mask_pii" not in ops["email"]
    assert "city" not in ops or not any("pii" in o for o in ops.get("city", []))
    cleaned, _ = apply_plan(df, plan)
    from scrubdata.pii import detect_column_pii
    assert detect_column_pii("card", cleaned["card"].tolist()) is None   # leak-free
    assert cleaned["card"][0].endswith("0366") and cleaned["card"][0].startswith("*")
    assert cleaned["email"][0] == "ana@corp.io"                          # flagged, not destroyed


def test_pii_hash_and_pseudonymize_deterministic():
    from scrubdata.pii import hash_value, pseudonymize_value
    assert hash_value("4532015112830366", "s1") == hash_value("4532015112830366", "s1")
    assert hash_value("4532015112830366", "s1") != hash_value("4532015112830366", "s2")
    p1 = pseudonymize_value("ana@corp.io", "s1", "email")
    assert p1 == pseudonymize_value("ana@corp.io", "s1", "email")   # join-stable
    assert p1.startswith("EMAIL_") and "ana" not in p1


def test_active_planner_defaults_to_heuristic(monkeypatch):
    monkeypatch.delenv("SCRUBDATA_MODEL", raising=False)
    from scrubdata.active import get_planner
    from scrubdata.planner import mock_plan
    assert get_planner() is mock_plan


def test_union_plans_model_wins_and_heuristic_extends():
    from scrubdata.verifier import union_plans
    primary = {"columns": [{"name": "city", "operations": [
        {"op": "canonicalize_categories", "mapping": {"bostn": "Boston"}}]}], "flags": []}
    secondary = {"columns": [
        {"name": "city", "operations": [{"op": "canonicalize_categories",
                                         "mapping": {"bostn": "BOSTON", "chcago": "Chicago"}}]},
        {"name": "state", "operations": [{"op": "canonicalize_categories",
                                          "mapping": {"texs": "Texas"}}]},
    ]}
    out = union_plans(primary, secondary)
    maps = {c["name"]: c["operations"][0]["mapping"] for c in out["columns"]}
    assert maps["city"]["bostn"] == "Boston"          # primary wins on conflict
    assert maps["city"]["chcago"] == "Chicago"        # secondary extends coverage
    assert maps["state"] == {"texs": "Texas"}         # secondary-only column added
    assert primary["columns"][0]["operations"][0]["mapping"] == {"bostn": "Boston"}  # no mutation


def test_active_planner_is_verified_union(monkeypatch):
    monkeypatch.setenv("SCRUBDATA_MODEL", "test-model")
    from scrubdata.active import get_planner
    planner = get_planner()
    # the model backend isn't reachable in tests -> every batch falls back to the
    # heuristic; get_planner must return the verified-union wrapper (only it emits this
    # honest label) and tag the plan as fallback rather than claiming the model ran.
    df = pd.DataFrame({"city": ["Boston", "Boston", "Bostn", "Chicago", "Chicago"]})
    plan = planner(df)
    assert plan["_generated_by"] == "deterministic (model unavailable, fell back)"
    assert is_valid(plan)


def test_convention_gates_regression():
    from scrubdata import detect
    from scrubdata.executor import _parse_percent, _standardize_phone
    # date gate: uniform slash / uniform month-name = consistent; mixed = not
    assert detect.date_formats_consistent(["1/4/2016", "12/23/2015", "3/7/2014"])
    assert detect.date_formats_consistent(["28 July 2016", "4 May 2015"])
    assert not detect.date_formats_consistent(["1/4/2016", "2015-12-23", "3/7/2014",
                                               "2014-01-02"])
    # 90% boundary: 1 stray in 20 stays consistent
    assert detect.date_formats_consistent(["1/4/2016"] * 19 + ["2016-01-04"])
    # percent gate: uniform-% gated; one stray of 20 still gated (no cliff)
    assert detect.percent_formats_consistent(["10%", "20%", "30%"])
    assert detect.percent_formats_consistent(["10%"] * 19 + ["0.6"])
    assert not detect.percent_formats_consistent(["10%", "0.2", "0.3"])
    # parse_percent abstains on bare values instead of /100 corruption
    assert _parse_percent("0.6") == "0.6"
    assert _parse_percent("45%") == 0.45
    # zip guard + Excel-serial name gate + 7-digit phone
    assert detect.detect_semantic_type("zipcode(long)", ["40231", "40213"] * 10) == "text"
    assert detect.detect_semantic_type("zcta", ["48371", "48380"] * 10) == "text"
    assert detect.detect_semantic_type("record_id", ["40231", "40213"] * 10) == "number"
    assert _standardize_phone("454.1763") == "454-1763"
    # end-to-end: consistent date column -> NO parse_date op + minority flagged
    df = pd.DataFrame({"issue_date": ["1/4/2016"] * 18 + ["1/5/2016", "2016-01-04"]})
    plan = mock_plan(df)
    ops = [o["op"] for c in plan["columns"] for o in c["operations"]]
    assert "parse_date" not in ops
    assert any(f["issue"] == "off_convention_dates" for f in plan["flags"])
    # mixed date column -> op present
    df2 = pd.DataFrame({"start": ["1/4/2016", "2015-12-23", "Apr-2014", "04/16/23"] * 5})
    ops2 = [o["op"] for c in mock_plan(df2)["columns"] for o in c["operations"]]
    assert "parse_date" in ops2


def test_verifier_gates_model_format_ops():
    from scrubdata.verifier import verify_plan
    df = pd.DataFrame({"d": ["1/4/2016", "2/5/2016", "3/6/2016"] * 4,
                       "p": ["10%", "20%", "30%"] * 4})
    model_plan = {"table_operations": [], "flags": [], "columns": [
        {"name": "d", "operations": [{"op": "parse_date", "rationale": "x"}]},
        {"name": "p", "operations": [{"op": "parse_percent", "rationale": "x"}]},
    ]}
    out = verify_plan(df, model_plan, tau=0.5)
    ops = [o["op"] for c in out["columns"] for o in c["operations"]]
    assert "parse_date" not in ops and "parse_percent" not in ops
    assert sum(1 for f in out["flags"] if f["issue"] == "convention_preserved") == 2


def test_voting_guards_regression():
    from scrubdata.planner import detect_entity_groups
    from scrubdata.executor import apply_plan
    # numeric votable column: detection excludes it; executor never crashes
    rows = []
    for f in range(25):
        for s in range(5):
            rows.append({"sku": f"SKU-{f}", "src": f"s{s}",
                         "label": ("ok" if not (f == 2 and s == 1) else "okk")
                                  + str(f % 4),
                         "qty": f * 10 + s})
    df = pd.DataFrame(rows)
    df["qty"] = df["qty"].astype("int64")
    eg = detect_entity_groups(df)
    if eg:
        assert "qty" not in eg[1]
    apply_plan(df, mock_plan(df))                  # must not raise
    # missing-like keys never form an entity group
    plan = {"table_operations": [{"op": "resolve_by_majority", "key_column": "k",
                                  "columns": ["v"]}], "columns": [], "flags": []}
    df2 = pd.DataFrame({"k": ["N/A"] * 6 + ["X-1"] * 3,
                        "v": ["a", "a", "a", "a", "b", "c", "z", "z", "y"]})
    cleaned, _ = apply_plan(df2, plan)
    assert list(cleaned["v"][:6]) == ["a", "a", "a", "a", "b", "c"]   # untouched
    # plan params are clamped: model-emitted min_share=0 cannot force rewrites
    plan2 = {"table_operations": [{"op": "resolve_by_majority", "key_column": "k",
                                   "columns": ["v"], "min_share": 0.0,
                                   "min_group": 1}], "columns": [], "flags": []}
    df3 = pd.DataFrame({"k": ["G1"] * 4, "v": ["a", "b", "b", "c"]})   # 50% max
    cleaned3, _ = apply_plan(df3, plan2)
    assert list(cleaned3["v"]) == ["a", "b", "b", "c"]
    # false-consensus guard: fat minorities (1 of 4 = legitimate updates) decline;
    # thin minorities (1 of 10 = reporting errors) proceed
    df4 = pd.DataFrame({"k": [f"G{i//4}" for i in range(40)],
                        "v": ["m", "m", "m", "x"] * 10})
    plan4 = {"table_operations": [{"op": "resolve_by_majority", "key_column": "k",
                                   "columns": ["v"]}], "columns": [], "flags": []}
    cleaned4, log4 = apply_plan(df4, plan4)
    entry = next(e for e in log4 if e["op"] == "resolve_by_majority")
    assert entry["cells_changed"] == 0 and "declined" in entry["detail"]
    df5 = pd.DataFrame({"k": [f"G{i//10}" for i in range(40)],
                        "v": (["m"] * 9 + ["x"]) * 4})
    cleaned5, log5 = apply_plan(df5, plan4)
    entry5 = next(e for e in log5 if e["op"] == "resolve_by_majority")
    assert entry5["cells_changed"] == 4                  # thin dissenters resolved
    # date-shaped keys are rejected
    rows = [{"date": f"2024-01-{d+1:02d}", "site": f"site-{r % 3}", "crew": f"c{r % 4}",
             "reading": f"v{r}"} for d in range(25) for r in range(5)]
    assert detect_entity_groups(pd.DataFrame(rows)) is None


def test_union_inherits_vote_op_and_preserves_op_order():
    from scrubdata.verifier import union_plans
    primary = {"table_operations": [], "columns": [], "flags": []}
    secondary = {"table_operations": [{"op": "resolve_by_majority", "key_column": "k",
                                       "columns": ["v"], "rationale": "vote"}],
                 "columns": [{"name": "t", "operations": [
                     {"op": "fix_encoding", "rationale": "enc"},
                     {"op": "normalize_punctuation", "rationale": "punct"},
                 ]}], "flags": []}
    out = union_plans(primary, secondary)
    assert any(o["op"] == "resolve_by_majority" for o in out["table_operations"])
    t_ops = [o["op"] for c in out["columns"] if c["name"] == "t"
             for o in c["operations"]]
    assert t_ops.index("fix_encoding") < t_ops.index("normalize_punctuation")


def test_fix_encoding_op():
    from scrubdata.executor import _fix_encoding
    assert _fix_encoding("café".encode("utf-8").decode("cp1252")) == "café"
    assert _fix_encoding("naïve résumé".encode("utf-8").decode("latin-1")) == "naïve résumé"
    assert _fix_encoding("plain text") == "plain text"        # untouched
    df = pd.DataFrame({"title": ["café latte", "normal row"] * 6})
    plan = mock_plan(df)
    ops = [o["op"] for c in plan["columns"] for o in c["operations"]]
    assert "fix_encoding" in ops
    cleaned, _ = apply_plan(df, plan)
    assert cleaned["title"][0] == "café latte"


def test_resolve_by_majority_voting():
    rows = []
    for f in range(25):                       # 25 flights x 5 source reports
        for s in range(5):
            dep = f"{(f % 12) + 1}:58 p.m."
            arr = f"{(f % 11) + 1}:10 a.m."
            if (f, s) in ((3, 4), (9, 1)):
                dep = "7:59 p.m."             # two corrupted reports, two groups
            if (f, s) in ((7, 2), (12, 0)):
                arr = "9:40 a.m."
            rows.append({"flight": f"AA-{1000+f}", "src": f"src{s}", "dep": dep,
                         "arr": arr, "gate": f"G{f}"})
    df = pd.DataFrame(rows)
    plan = mock_plan(df)
    vote = [o for o in plan["table_operations"] if o["op"] == "resolve_by_majority"]
    assert vote and vote[0]["key_column"] == "flight"
    cleaned, log = apply_plan(df, plan)
    assert set(cleaned[cleaned["flight"] == "AA-1003"]["dep"]) == {"4:58 p.m."}
    entry = next(e for e in log if e["op"] == "resolve_by_majority")
    assert entry["cells_changed"] >= 1        # the minority report was resolved
    # no key regime -> no vote op
    df2 = pd.DataFrame({"a": [str(i) for i in range(40)], "b": ["x"] * 40})
    assert not any(o["op"] == "resolve_by_majority"
                   for o in mock_plan(df2)["table_operations"])


def test_suspects_visibility_high_cardinality():
    from scrubdata.profiler import profile_column
    # high-card "text" column: 60 unique names + one near-dup of a repeated one
    names = [f"unique business {i}" for i in range(57)]
    col = names + ["acme holdings", "acme holdings", "acme holdngs"]
    prof = profile_column(pd.Series(col, name="business"))
    assert prof["detected_semantic_type"] == "text"
    sus = {s["raw"]: s["candidates"] for s in prof["suspect_values"]}
    assert "acme holdngs" in sus and "acme holdings" in sus["acme holdngs"]
    assert len(prof["suspect_values"]) <= 25            # bounded
    # heuristic now repairs it (verifier-gated), where before it emitted nothing
    df = pd.DataFrame({"business": col})
    plan = mock_plan(df)
    maps = {r: c for col_ in plan["columns"] for o in col_["operations"]
            if o["op"] == "canonicalize_categories" for r, c in o["mapping"].items()}
    assert maps.get("acme holdngs") == "acme holdings"
    cleaned, _ = apply_plan(df, plan)
    assert "acme holdngs" not in set(cleaned["business"])
    # garbage suspect-free value stays put; plan still schema-valid
    assert is_valid(plan)


def test_suspects_garbage_flagged_not_mapped():
    from scrubdata.profiler import profile_column
    col = [f"item {i}" for i in range(40)] + ["it€m ’junk", "it€m ’junk"]
    df = pd.DataFrame({"thing": col})
    plan = mock_plan(df)
    maps = {r for c in plan["columns"] for o in c["operations"]
            if o["op"] == "canonicalize_categories" for r in o["mapping"]}
    assert "it€m ’junk" not in maps                     # no invented target
    assert is_valid(plan)


def test_normalize_punctuation_op():
    df = pd.DataFrame({"name": ["palm’s thai", "joe‘s “grill”", "a–b — c", "plain's ok"]})
    plan = mock_plan(df)
    ops = [o["op"] for c in plan["columns"] for o in c["operations"]]
    assert "normalize_punctuation" in ops
    cleaned, _ = apply_plan(df, plan)
    assert cleaned["name"][0] == "palm's thai"
    assert cleaned["name"][1] == 'joe\'s "grill"'
    assert cleaned["name"][2] == "a-b - c"
    # a clean column must NOT get the op
    plan2 = mock_plan(pd.DataFrame({"name": ["plain's ok", "also fine"]}))
    ops2 = [o["op"] for c in plan2["columns"] for o in c["operations"]]
    assert "normalize_punctuation" not in ops2


def test_pair_profile_candidates_and_constraint():
    from scrubdata.pair_profile import candidate_pairs, constrain_plan
    col = ["Boston"] * 8 + ["Chicago"] * 6 + ["Bostn", "Chcago", "Qwortelby"]
    pairs = candidate_pairs(col)
    by_raw = {p["raw"]: [c["canon"] for c in p["candidates"]] for p in pairs}
    assert "Boston" in by_raw.get("Bostn", [])
    assert "Chicago" in by_raw.get("Chcago", [])
    assert "Qwortelby" not in by_raw                  # garbage gets no candidates
    assert "Boston" not in by_raw                     # frequent values are not suspicious
    plan = {"columns": [{"name": "city", "operations": [{
        "op": "canonicalize_categories", "rationale": "typos",
        "mapping": {"Bostn": "Boston", "Chcago": "Dallas", "Qwortelby": "Boston"}}]}],
        "flags": []}
    out = constrain_plan(plan, {"city": [{"raw": p["raw"],
                                          "candidates": [c["canon"] for c in p["candidates"]]}
                                         for p in pairs]})
    kept = out["columns"][0]["operations"][0]["mapping"]
    assert kept == {"Bostn": "Boston"}                # off-candidate + garbage dropped
    assert out["flags"] and out["flags"][0]["issue"] == "outside_candidate_pairs"


def test_jellyfish_prompt_construction():
    from eval.baselines_learned import di_prompt, ed_prompt, parse_di, parse_ed
    rec = {"city": "Bostn", "state": "MA"}
    ed = ed_prompt(rec, "city")
    assert "Record [city: Bostn, state: MA]" in ed
    assert "Attribute for Verification: [city: Bostn]" in ed
    assert ed.endswith("### Response:\n\n")
    di = di_prompt(rec, "city", "geography")
    assert "Record: [state: MA]" in di          # flagged attribute removed
    assert "city" in di and "Bostn" not in di   # model infers, never copies
    assert parse_ed("Yes, there is an error") and not parse_ed("No.")
    assert parse_di(" Boston ", "Bostn") == "Boston"
    assert parse_di("", "Bostn") == "Bostn"                      # abstain on empty
    assert parse_di("The value is\nBoston", "Bostn") == "Bostn"  # abstain on rambling


def test_value_counts_profile():
    df = pd.DataFrame({"country": ["USA", "USA", "usa", "Canada"]})
    prof = profile_dataframe(df)
    vc = dict((v, n) for v, n in prof["columns"][0]["value_counts"])
    assert vc["USA"] == 2 and "value_counts" in prof["columns"][0]


def test_cli(tmp_path):
    from scrubdata.cli import main
    out = tmp_path / "clean.csv"
    plan = tmp_path / "plan.json"
    rc = main(["samples/dirty_contacts.csv", "-o", str(out), "--plan", str(plan), "--quiet"])
    assert rc == 0
    assert out.exists() and plan.exists()
    cleaned = pd.read_csv(out)
    assert "notes2" not in cleaned.columns and len(cleaned) == 13