#!/usr/bin/env python3 """Export rows that have duplicated problem_id values. Default input: - ../Data/Problems.csv Default output: - ../Results/Problems_duplicated_problem_id.csv The output includes all original columns plus: - duplicate_group_id - duplicate_problem_id_count - distinct_problem_body_count """ from __future__ import annotations import argparse import csv import html import re from collections import defaultdict from pathlib import Path from typing import Dict, List, Set _TAG_RE = re.compile(r"<[^>]+>") _WS_RE = re.compile(r"\s+") def normalize_body(text: str, strip_html: bool, collapse_whitespace: bool) -> str: """Normalize Problem Body text for distinct-body counting.""" value = html.unescape(text or "") if strip_html: value = _TAG_RE.sub("", value) if collapse_whitespace: value = _WS_RE.sub(" ", value).strip() return value def main() -> None: parser = argparse.ArgumentParser( description="Find rows in Problems.csv where problem_id is duplicated." ) parser.add_argument( "--input-csv", type=Path, default=Path("../Data/Problems.csv"), help="Path to Problems.csv", ) parser.add_argument( "--output-csv", type=Path, default=Path("../Results/Problems_duplicated_problem_id.csv"), help="Output CSV path", ) parser.add_argument( "--id-column", type=str, default="problem_id", help="Column name for problem identifier", ) parser.add_argument( "--body-column", type=str, default="Problem Body", help="Column name for problem statement text", ) parser.add_argument( "--strip-html", action="store_true", help="Strip HTML tags before counting distinct problem bodies", ) parser.add_argument( "--collapse-whitespace", action="store_true", help="Collapse runs of whitespace before counting distinct problem bodies", ) args = parser.parse_args() input_csv = args.input_csv.resolve() output_csv = args.output_csv.resolve() with input_csv.open("r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) fieldnames = reader.fieldnames or [] if args.id_column not in fieldnames: raise ValueError(f"Missing id column '{args.id_column}' in {input_csv}") if args.body_column not in fieldnames: raise ValueError(f"Missing body column '{args.body_column}' in {input_csv}") rows: List[dict] = list(reader) groups: Dict[str, List[int]] = defaultdict(list) for idx, row in enumerate(rows): problem_id = str(row.get(args.id_column, "")).strip() if not problem_id: continue groups[problem_id].append(idx) duplicate_ids = [pid for pid, row_idxs in groups.items() if len(row_idxs) > 1] # Preserve first-seen order of duplicate groups. duplicate_ids.sort(key=lambda pid: groups[pid][0]) output_rows: List[dict] = [] for group_num, pid in enumerate(duplicate_ids, start=1): row_idxs = groups[pid] distinct_bodies: Set[str] = set() for row_idx in row_idxs: body_raw = rows[row_idx].get(args.body_column, "") distinct_bodies.add( normalize_body( body_raw, strip_html=args.strip_html, collapse_whitespace=args.collapse_whitespace, ) ) for row_idx in row_idxs: out_row = dict(rows[row_idx]) out_row["duplicate_group_id"] = str(group_num) out_row["duplicate_problem_id_count"] = str(len(row_idxs)) out_row["distinct_problem_body_count"] = str(len(distinct_bodies)) output_rows.append(out_row) output_csv.parent.mkdir(parents=True, exist_ok=True) output_fieldnames = fieldnames + [ "duplicate_group_id", "duplicate_problem_id_count", "distinct_problem_body_count", ] with output_csv.open("w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=output_fieldnames) writer.writeheader() writer.writerows(output_rows) print(f"Input rows: {len(rows)}") print(f"Duplicated {args.id_column} groups: {len(duplicate_ids)}") print(f"Output rows: {len(output_rows)}") print(f"Wrote: {output_csv}") if __name__ == "__main__": main()