File size: 4,528 Bytes
6256eb9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | #!/usr/bin/env python3
"""Export rows that share the same Problem Body but have different problem_id.
Default input:
- ../Data/Problems.csv
Default output:
- ../Data/Problems_same_body_different_problem_id.csv
The output includes all original columns plus:
- duplicate_group_id
- distinct_problem_id_count
- distinct_problem_ids
"""
from __future__ import annotations
import argparse
import csv
import html
import re
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Set
_TAG_RE = re.compile(r"<[^>]+>")
_WS_RE = re.compile(r"\s+")
def normalize_body(text: str, strip_html: bool, collapse_whitespace: bool) -> str:
"""Normalize Problem Body text for grouping."""
value = html.unescape(text or "")
if strip_html:
value = _TAG_RE.sub("", value)
if collapse_whitespace:
value = _WS_RE.sub(" ", value).strip()
return value
def main() -> None:
parser = argparse.ArgumentParser(
description=(
"Find rows in Problems.csv where the same Problem Body is mapped "
"to different problem_id values."
)
)
parser.add_argument(
"--input-csv",
type=Path,
default=Path("../Data/Problems.csv"),
help="Path to Problems.csv",
)
parser.add_argument(
"--output-csv",
type=Path,
default=Path("../Results/Problems_same_body_different_problem_id.csv"),
help="Output CSV path",
)
parser.add_argument(
"--body-column",
type=str,
default="Problem Body",
help="Column name for problem statement text",
)
parser.add_argument(
"--id-column",
type=str,
default="problem_id",
help="Column name for problem identifier",
)
parser.add_argument(
"--strip-html",
action="store_true",
help="Strip HTML tags before grouping",
)
parser.add_argument(
"--collapse-whitespace",
action="store_true",
help="Collapse runs of whitespace before grouping",
)
args = parser.parse_args()
input_csv = args.input_csv.resolve()
output_csv = args.output_csv.resolve()
with input_csv.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
fieldnames = reader.fieldnames or []
if args.body_column not in fieldnames:
raise ValueError(f"Missing body column '{args.body_column}' in {input_csv}")
if args.id_column not in fieldnames:
raise ValueError(f"Missing id column '{args.id_column}' in {input_csv}")
rows: List[dict] = list(reader)
groups: Dict[str, List[int]] = defaultdict(list)
group_ids: Dict[str, Set[str]] = defaultdict(set)
for idx, row in enumerate(rows):
body_raw = row.get(args.body_column, "")
body_key = normalize_body(
body_raw,
strip_html=args.strip_html,
collapse_whitespace=args.collapse_whitespace,
)
if not body_key:
continue
problem_id = str(row.get(args.id_column, "")).strip()
groups[body_key].append(idx)
if problem_id:
group_ids[body_key].add(problem_id)
duplicate_keys = [k for k, ids in group_ids.items() if len(ids) > 1]
# Preserve first-seen order of duplicate groups.
duplicate_keys.sort(key=lambda k: groups[k][0])
output_rows: List[dict] = []
for group_num, key in enumerate(duplicate_keys, start=1):
ids_sorted = sorted(group_ids[key])
ids_joined = ";".join(ids_sorted)
for row_idx in groups[key]:
out_row = dict(rows[row_idx])
out_row["duplicate_group_id"] = str(group_num)
out_row["distinct_problem_id_count"] = str(len(ids_sorted))
out_row["distinct_problem_ids"] = ids_joined
output_rows.append(out_row)
output_csv.parent.mkdir(parents=True, exist_ok=True)
output_fieldnames = fieldnames + [
"duplicate_group_id",
"distinct_problem_id_count",
"distinct_problem_ids",
]
with output_csv.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=output_fieldnames)
writer.writeheader()
writer.writerows(output_rows)
print(f"Input rows: {len(rows)}")
print(f"Duplicate body groups (different problem_id): {len(duplicate_keys)}")
print(f"Output rows: {len(output_rows)}")
print(f"Wrote: {output_csv}")
if __name__ == "__main__":
main()
|