File size: 4,497 Bytes
6256eb9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | #!/usr/bin/env python3
"""Export rows that have duplicated problem_id values.
Default input:
- ../Data/Problems.csv
Default output:
- ../Results/Problems_duplicated_problem_id.csv
The output includes all original columns plus:
- duplicate_group_id
- duplicate_problem_id_count
- distinct_problem_body_count
"""
from __future__ import annotations
import argparse
import csv
import html
import re
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Set
_TAG_RE = re.compile(r"<[^>]+>")
_WS_RE = re.compile(r"\s+")
def normalize_body(text: str, strip_html: bool, collapse_whitespace: bool) -> str:
"""Normalize Problem Body text for distinct-body counting."""
value = html.unescape(text or "")
if strip_html:
value = _TAG_RE.sub("", value)
if collapse_whitespace:
value = _WS_RE.sub(" ", value).strip()
return value
def main() -> None:
parser = argparse.ArgumentParser(
description="Find rows in Problems.csv where problem_id is duplicated."
)
parser.add_argument(
"--input-csv",
type=Path,
default=Path("../Data/Problems.csv"),
help="Path to Problems.csv",
)
parser.add_argument(
"--output-csv",
type=Path,
default=Path("../Results/Problems_duplicated_problem_id.csv"),
help="Output CSV path",
)
parser.add_argument(
"--id-column",
type=str,
default="problem_id",
help="Column name for problem identifier",
)
parser.add_argument(
"--body-column",
type=str,
default="Problem Body",
help="Column name for problem statement text",
)
parser.add_argument(
"--strip-html",
action="store_true",
help="Strip HTML tags before counting distinct problem bodies",
)
parser.add_argument(
"--collapse-whitespace",
action="store_true",
help="Collapse runs of whitespace before counting distinct problem bodies",
)
args = parser.parse_args()
input_csv = args.input_csv.resolve()
output_csv = args.output_csv.resolve()
with input_csv.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
fieldnames = reader.fieldnames or []
if args.id_column not in fieldnames:
raise ValueError(f"Missing id column '{args.id_column}' in {input_csv}")
if args.body_column not in fieldnames:
raise ValueError(f"Missing body column '{args.body_column}' in {input_csv}")
rows: List[dict] = list(reader)
groups: Dict[str, List[int]] = defaultdict(list)
for idx, row in enumerate(rows):
problem_id = str(row.get(args.id_column, "")).strip()
if not problem_id:
continue
groups[problem_id].append(idx)
duplicate_ids = [pid for pid, row_idxs in groups.items() if len(row_idxs) > 1]
# Preserve first-seen order of duplicate groups.
duplicate_ids.sort(key=lambda pid: groups[pid][0])
output_rows: List[dict] = []
for group_num, pid in enumerate(duplicate_ids, start=1):
row_idxs = groups[pid]
distinct_bodies: Set[str] = set()
for row_idx in row_idxs:
body_raw = rows[row_idx].get(args.body_column, "")
distinct_bodies.add(
normalize_body(
body_raw,
strip_html=args.strip_html,
collapse_whitespace=args.collapse_whitespace,
)
)
for row_idx in row_idxs:
out_row = dict(rows[row_idx])
out_row["duplicate_group_id"] = str(group_num)
out_row["duplicate_problem_id_count"] = str(len(row_idxs))
out_row["distinct_problem_body_count"] = str(len(distinct_bodies))
output_rows.append(out_row)
output_csv.parent.mkdir(parents=True, exist_ok=True)
output_fieldnames = fieldnames + [
"duplicate_group_id",
"duplicate_problem_id_count",
"distinct_problem_body_count",
]
with output_csv.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=output_fieldnames)
writer.writeheader()
writer.writerows(output_rows)
print(f"Input rows: {len(rows)}")
print(f"Duplicated {args.id_column} groups: {len(duplicate_ids)}")
print(f"Output rows: {len(output_rows)}")
print(f"Wrote: {output_csv}")
if __name__ == "__main__":
main()
|