File size: 4,528 Bytes
6256eb9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python3
"""Export rows that share the same Problem Body but have different problem_id.

Default input:
- ../Data/Problems.csv

Default output:
- ../Data/Problems_same_body_different_problem_id.csv

The output includes all original columns plus:
- duplicate_group_id
- distinct_problem_id_count
- distinct_problem_ids
"""

from __future__ import annotations

import argparse
import csv
import html
import re
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Set

_TAG_RE = re.compile(r"<[^>]+>")
_WS_RE = re.compile(r"\s+")


def normalize_body(text: str, strip_html: bool, collapse_whitespace: bool) -> str:
    """Normalize Problem Body text for grouping."""
    value = html.unescape(text or "")
    if strip_html:
        value = _TAG_RE.sub("", value)
    if collapse_whitespace:
        value = _WS_RE.sub(" ", value).strip()
    return value


def main() -> None:
    parser = argparse.ArgumentParser(
        description=(
            "Find rows in Problems.csv where the same Problem Body is mapped "
            "to different problem_id values."
        )
    )
    parser.add_argument(
        "--input-csv",
        type=Path,
        default=Path("../Data/Problems.csv"),
        help="Path to Problems.csv",
    )
    parser.add_argument(
        "--output-csv",
        type=Path,
        default=Path("../Results/Problems_same_body_different_problem_id.csv"),
        help="Output CSV path",
    )
    parser.add_argument(
        "--body-column",
        type=str,
        default="Problem Body",
        help="Column name for problem statement text",
    )
    parser.add_argument(
        "--id-column",
        type=str,
        default="problem_id",
        help="Column name for problem identifier",
    )
    parser.add_argument(
        "--strip-html",
        action="store_true",
        help="Strip HTML tags before grouping",
    )
    parser.add_argument(
        "--collapse-whitespace",
        action="store_true",
        help="Collapse runs of whitespace before grouping",
    )

    args = parser.parse_args()

    input_csv = args.input_csv.resolve()
    output_csv = args.output_csv.resolve()

    with input_csv.open("r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        fieldnames = reader.fieldnames or []

        if args.body_column not in fieldnames:
            raise ValueError(f"Missing body column '{args.body_column}' in {input_csv}")
        if args.id_column not in fieldnames:
            raise ValueError(f"Missing id column '{args.id_column}' in {input_csv}")

        rows: List[dict] = list(reader)

    groups: Dict[str, List[int]] = defaultdict(list)
    group_ids: Dict[str, Set[str]] = defaultdict(set)

    for idx, row in enumerate(rows):
        body_raw = row.get(args.body_column, "")
        body_key = normalize_body(
            body_raw,
            strip_html=args.strip_html,
            collapse_whitespace=args.collapse_whitespace,
        )
        if not body_key:
            continue

        problem_id = str(row.get(args.id_column, "")).strip()
        groups[body_key].append(idx)
        if problem_id:
            group_ids[body_key].add(problem_id)

    duplicate_keys = [k for k, ids in group_ids.items() if len(ids) > 1]

    # Preserve first-seen order of duplicate groups.
    duplicate_keys.sort(key=lambda k: groups[k][0])

    output_rows: List[dict] = []
    for group_num, key in enumerate(duplicate_keys, start=1):
        ids_sorted = sorted(group_ids[key])
        ids_joined = ";".join(ids_sorted)

        for row_idx in groups[key]:
            out_row = dict(rows[row_idx])
            out_row["duplicate_group_id"] = str(group_num)
            out_row["distinct_problem_id_count"] = str(len(ids_sorted))
            out_row["distinct_problem_ids"] = ids_joined
            output_rows.append(out_row)

    output_csv.parent.mkdir(parents=True, exist_ok=True)
    output_fieldnames = fieldnames + [
        "duplicate_group_id",
        "distinct_problem_id_count",
        "distinct_problem_ids",
    ]

    with output_csv.open("w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=output_fieldnames)
        writer.writeheader()
        writer.writerows(output_rows)

    print(f"Input rows: {len(rows)}")
    print(f"Duplicate body groups (different problem_id): {len(duplicate_keys)}")
    print(f"Output rows: {len(output_rows)}")
    print(f"Wrote: {output_csv}")


if __name__ == "__main__":
    main()