File size: 4,497 Bytes
6256eb9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
"""Export rows that have duplicated problem_id values.

Default input:
- ../Data/Problems.csv

Default output:
- ../Results/Problems_duplicated_problem_id.csv

The output includes all original columns plus:
- duplicate_group_id
- duplicate_problem_id_count
- distinct_problem_body_count
"""

from __future__ import annotations

import argparse
import csv
import html
import re
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Set

_TAG_RE = re.compile(r"<[^>]+>")
_WS_RE = re.compile(r"\s+")


def normalize_body(text: str, strip_html: bool, collapse_whitespace: bool) -> str:
    """Normalize Problem Body text for distinct-body counting."""
    value = html.unescape(text or "")
    if strip_html:
        value = _TAG_RE.sub("", value)
    if collapse_whitespace:
        value = _WS_RE.sub(" ", value).strip()
    return value


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Find rows in Problems.csv where problem_id is duplicated."
    )
    parser.add_argument(
        "--input-csv",
        type=Path,
        default=Path("../Data/Problems.csv"),
        help="Path to Problems.csv",
    )
    parser.add_argument(
        "--output-csv",
        type=Path,
        default=Path("../Results/Problems_duplicated_problem_id.csv"),
        help="Output CSV path",
    )
    parser.add_argument(
        "--id-column",
        type=str,
        default="problem_id",
        help="Column name for problem identifier",
    )
    parser.add_argument(
        "--body-column",
        type=str,
        default="Problem Body",
        help="Column name for problem statement text",
    )
    parser.add_argument(
        "--strip-html",
        action="store_true",
        help="Strip HTML tags before counting distinct problem bodies",
    )
    parser.add_argument(
        "--collapse-whitespace",
        action="store_true",
        help="Collapse runs of whitespace before counting distinct problem bodies",
    )

    args = parser.parse_args()

    input_csv = args.input_csv.resolve()
    output_csv = args.output_csv.resolve()

    with input_csv.open("r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        fieldnames = reader.fieldnames or []

        if args.id_column not in fieldnames:
            raise ValueError(f"Missing id column '{args.id_column}' in {input_csv}")
        if args.body_column not in fieldnames:
            raise ValueError(f"Missing body column '{args.body_column}' in {input_csv}")

        rows: List[dict] = list(reader)

    groups: Dict[str, List[int]] = defaultdict(list)
    for idx, row in enumerate(rows):
        problem_id = str(row.get(args.id_column, "")).strip()
        if not problem_id:
            continue
        groups[problem_id].append(idx)

    duplicate_ids = [pid for pid, row_idxs in groups.items() if len(row_idxs) > 1]

    # Preserve first-seen order of duplicate groups.
    duplicate_ids.sort(key=lambda pid: groups[pid][0])

    output_rows: List[dict] = []
    for group_num, pid in enumerate(duplicate_ids, start=1):
        row_idxs = groups[pid]

        distinct_bodies: Set[str] = set()
        for row_idx in row_idxs:
            body_raw = rows[row_idx].get(args.body_column, "")
            distinct_bodies.add(
                normalize_body(
                    body_raw,
                    strip_html=args.strip_html,
                    collapse_whitespace=args.collapse_whitespace,
                )
            )

        for row_idx in row_idxs:
            out_row = dict(rows[row_idx])
            out_row["duplicate_group_id"] = str(group_num)
            out_row["duplicate_problem_id_count"] = str(len(row_idxs))
            out_row["distinct_problem_body_count"] = str(len(distinct_bodies))
            output_rows.append(out_row)

    output_csv.parent.mkdir(parents=True, exist_ok=True)
    output_fieldnames = fieldnames + [
        "duplicate_group_id",
        "duplicate_problem_id_count",
        "distinct_problem_body_count",
    ]

    with output_csv.open("w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=output_fieldnames)
        writer.writeheader()
        writer.writerows(output_rows)

    print(f"Input rows: {len(rows)}")
    print(f"Duplicated {args.id_column} groups: {len(duplicate_ids)}")
    print(f"Output rows: {len(output_rows)}")
    print(f"Wrote: {output_csv}")


if __name__ == "__main__":
    main()