FoundationalASSIST / Code /find_duplicate_problem_body.py
martinakaduc's picture
Upload folder using huggingface_hub
6256eb9 verified
#!/usr/bin/env python3
"""Export rows that share the same Problem Body but have different problem_id.
Default input:
- ../Data/Problems.csv
Default output:
- ../Data/Problems_same_body_different_problem_id.csv
The output includes all original columns plus:
- duplicate_group_id
- distinct_problem_id_count
- distinct_problem_ids
"""
from __future__ import annotations
import argparse
import csv
import html
import re
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Set
_TAG_RE = re.compile(r"<[^>]+>")
_WS_RE = re.compile(r"\s+")
def normalize_body(text: str, strip_html: bool, collapse_whitespace: bool) -> str:
"""Normalize Problem Body text for grouping."""
value = html.unescape(text or "")
if strip_html:
value = _TAG_RE.sub("", value)
if collapse_whitespace:
value = _WS_RE.sub(" ", value).strip()
return value
def main() -> None:
parser = argparse.ArgumentParser(
description=(
"Find rows in Problems.csv where the same Problem Body is mapped "
"to different problem_id values."
)
)
parser.add_argument(
"--input-csv",
type=Path,
default=Path("../Data/Problems.csv"),
help="Path to Problems.csv",
)
parser.add_argument(
"--output-csv",
type=Path,
default=Path("../Results/Problems_same_body_different_problem_id.csv"),
help="Output CSV path",
)
parser.add_argument(
"--body-column",
type=str,
default="Problem Body",
help="Column name for problem statement text",
)
parser.add_argument(
"--id-column",
type=str,
default="problem_id",
help="Column name for problem identifier",
)
parser.add_argument(
"--strip-html",
action="store_true",
help="Strip HTML tags before grouping",
)
parser.add_argument(
"--collapse-whitespace",
action="store_true",
help="Collapse runs of whitespace before grouping",
)
args = parser.parse_args()
input_csv = args.input_csv.resolve()
output_csv = args.output_csv.resolve()
with input_csv.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
fieldnames = reader.fieldnames or []
if args.body_column not in fieldnames:
raise ValueError(f"Missing body column '{args.body_column}' in {input_csv}")
if args.id_column not in fieldnames:
raise ValueError(f"Missing id column '{args.id_column}' in {input_csv}")
rows: List[dict] = list(reader)
groups: Dict[str, List[int]] = defaultdict(list)
group_ids: Dict[str, Set[str]] = defaultdict(set)
for idx, row in enumerate(rows):
body_raw = row.get(args.body_column, "")
body_key = normalize_body(
body_raw,
strip_html=args.strip_html,
collapse_whitespace=args.collapse_whitespace,
)
if not body_key:
continue
problem_id = str(row.get(args.id_column, "")).strip()
groups[body_key].append(idx)
if problem_id:
group_ids[body_key].add(problem_id)
duplicate_keys = [k for k, ids in group_ids.items() if len(ids) > 1]
# Preserve first-seen order of duplicate groups.
duplicate_keys.sort(key=lambda k: groups[k][0])
output_rows: List[dict] = []
for group_num, key in enumerate(duplicate_keys, start=1):
ids_sorted = sorted(group_ids[key])
ids_joined = ";".join(ids_sorted)
for row_idx in groups[key]:
out_row = dict(rows[row_idx])
out_row["duplicate_group_id"] = str(group_num)
out_row["distinct_problem_id_count"] = str(len(ids_sorted))
out_row["distinct_problem_ids"] = ids_joined
output_rows.append(out_row)
output_csv.parent.mkdir(parents=True, exist_ok=True)
output_fieldnames = fieldnames + [
"duplicate_group_id",
"distinct_problem_id_count",
"distinct_problem_ids",
]
with output_csv.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=output_fieldnames)
writer.writeheader()
writer.writerows(output_rows)
print(f"Input rows: {len(rows)}")
print(f"Duplicate body groups (different problem_id): {len(duplicate_keys)}")
print(f"Output rows: {len(output_rows)}")
print(f"Wrote: {output_csv}")
if __name__ == "__main__":
main()