#!/usr/bin/env python3 """Detect skills that have similar node_code values but different skill_id values. By default, the script detects conflicts after normalizing node_code values (uppercasing and removing punctuation differences). It can also perform optional fuzzy matching on normalized compact node codes. """ from __future__ import annotations import argparse import re from difflib import SequenceMatcher from itertools import combinations from pathlib import Path import pandas as pd DEFAULT_SKILLS_PATH = Path(__file__).resolve().parent.parent / "Data" / "Skills.csv" DEFAULT_OUTPUT_PATH = ( Path(__file__).resolve().parent.parent / "Results" / "similar_node_code_conflicts.csv" ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Detect skills whose node_code values are similar but map to " "different skill_id values." ) ) parser.add_argument( "--skills-path", type=Path, default=DEFAULT_SKILLS_PATH, help="Path to Skills.csv.", ) parser.add_argument( "--output-path", type=Path, default=DEFAULT_OUTPUT_PATH, help="Path to save the detected conflicts as CSV.", ) parser.add_argument( "--include-fuzzy", action="store_true", help="Also run fuzzy matching across compact node_code values.", ) parser.add_argument( "--similarity-threshold", type=float, default=0.9, help="Minimum SequenceMatcher ratio for fuzzy matches (0.0 to 1.0).", ) parser.add_argument( "--max-fuzzy-pairs", type=int, default=200, help="Maximum number of fuzzy match pairs to keep after sorting.", ) parser.add_argument( "--print-limit", type=int, default=20, help="Maximum number of rows to print for each conflict section.", ) return parser.parse_args() def normalize_node_code(node_code: str) -> str: canonical = re.sub(r"[^A-Za-z0-9]+", ".", node_code.upper().strip()) canonical = re.sub(r"\.+", ".", canonical).strip(".") return canonical def compact_node_code(canonical_node_code: str) -> str: return canonical_node_code.replace(".", "") def unique_sorted_strings(series: pd.Series) -> list[str]: values: set[str] = set() for value in series.dropna(): text = str(value).strip() if text: values.add(text) return sorted(values) def unique_sorted_ints(series: pd.Series) -> list[int]: values: set[int] = set() for value in series.dropna(): values.add(int(value)) return sorted(values) def join_pipe(values: list[str]) -> str: return " | ".join(values) def join_csv_ints(values: list[int]) -> str: return ",".join(str(v) for v in values) def load_skills(skills_path: Path) -> pd.DataFrame: required_columns = ["problem_id", "skill_id", "node_code", "node_name"] df = pd.read_csv(skills_path, usecols=required_columns, low_memory=False) df["problem_id"] = pd.to_numeric(df["problem_id"], errors="coerce") df["skill_id"] = pd.to_numeric(df["skill_id"], errors="coerce") df = df.dropna(subset=["problem_id", "skill_id", "node_code"]).copy() df["problem_id"] = df["problem_id"].astype(int) df["skill_id"] = df["skill_id"].astype(int) df["node_code"] = df["node_code"].astype(str).str.strip() df["node_name"] = df["node_name"].fillna("").astype(str).str.strip() df = df[df["node_code"] != ""].copy() df["node_code_canonical"] = df["node_code"].apply(normalize_node_code) df["node_code_compact"] = df["node_code_canonical"].apply(compact_node_code) return df def summarize_compact_codes(df: pd.DataFrame) -> pd.DataFrame: summary = ( df.groupby("node_code_compact", sort=True) .agg( canonical_node_codes=("node_code_canonical", unique_sorted_strings), raw_node_codes=("node_code", unique_sorted_strings), skill_ids=("skill_id", unique_sorted_ints), node_names=("node_name", unique_sorted_strings), problem_count=("problem_id", "nunique"), mapping_count=("skill_id", "size"), ) .reset_index() .rename(columns={"node_code_compact": "compact_node_code"}) ) summary["n_skill_ids"] = summary["skill_ids"].apply(len) return summary def build_normalized_conflicts(summary: pd.DataFrame) -> pd.DataFrame: conflicts = summary[summary["n_skill_ids"] > 1].copy() if conflicts.empty: return conflicts conflicts.insert(0, "conflict_type", "normalized_match") conflicts["skill_ids"] = conflicts["skill_ids"].apply(join_csv_ints) conflicts["canonical_node_codes"] = conflicts["canonical_node_codes"].apply( join_pipe ) conflicts["raw_node_codes"] = conflicts["raw_node_codes"].apply(join_pipe) conflicts["node_names"] = conflicts["node_names"].apply(join_pipe) return conflicts.sort_values( ["n_skill_ids", "compact_node_code"], ascending=[False, True] ) def build_fuzzy_conflicts( summary: pd.DataFrame, threshold: float, max_pairs: int, ) -> pd.DataFrame: rows: list[dict[str, object]] = [] records = summary.to_dict(orient="records") for left, right in combinations(records, 2): left_code = str(left["compact_node_code"]) right_code = str(right["compact_node_code"]) if left_code == right_code: continue similarity = SequenceMatcher(None, left_code, right_code).ratio() if similarity < threshold: continue left_skills = set(left["skill_ids"]) right_skills = set(right["skill_ids"]) if left_skills == right_skills: continue rows.append( { "conflict_type": "fuzzy_match", "similarity": round(similarity, 4), "left_compact_node_code": left_code, "right_compact_node_code": right_code, "left_canonical_node_codes": join_pipe(left["canonical_node_codes"]), "right_canonical_node_codes": join_pipe(right["canonical_node_codes"]), "left_skill_ids": join_csv_ints(left["skill_ids"]), "right_skill_ids": join_csv_ints(right["skill_ids"]), "overlap_skill_ids": join_csv_ints( sorted(left_skills.intersection(right_skills)) ), } ) fuzzy = pd.DataFrame(rows) if fuzzy.empty: return fuzzy fuzzy = fuzzy.sort_values( ["similarity", "left_compact_node_code", "right_compact_node_code"], ascending=[False, True, True], ) if max_pairs > 0: fuzzy = fuzzy.head(max_pairs).copy() return fuzzy def print_section(title: str, df: pd.DataFrame, print_limit: int) -> None: print(f"\n{title}") if df.empty: print(" None") return to_show = df.head(print_limit) print(to_show.to_string(index=False)) if len(df) > len(to_show): print(f" ... ({len(df) - len(to_show)} more rows)") def main() -> int: args = parse_args() if args.similarity_threshold < 0.0 or args.similarity_threshold > 1.0: raise ValueError("--similarity-threshold must be in [0.0, 1.0].") if not args.skills_path.exists(): raise FileNotFoundError(f"Skills file not found: {args.skills_path}") skills_df = load_skills(args.skills_path) summary_df = summarize_compact_codes(skills_df) normalized_conflicts = build_normalized_conflicts(summary_df) fuzzy_conflicts = pd.DataFrame() if args.include_fuzzy: fuzzy_conflicts = build_fuzzy_conflicts( summary_df, threshold=args.similarity_threshold, max_pairs=args.max_fuzzy_pairs, ) frames = [normalized_conflicts] if args.include_fuzzy: frames.append(fuzzy_conflicts) combined_output = pd.concat(frames, ignore_index=True, sort=False) args.output_path.parent.mkdir(parents=True, exist_ok=True) combined_output.to_csv(args.output_path, index=False) print("Loaded rows:", len(skills_df)) print("Unique compact node codes:", len(summary_df)) print("Normalized conflicts:", len(normalized_conflicts)) if args.include_fuzzy: print( "Fuzzy conflicts (threshold " f"{args.similarity_threshold:.2f}):", len(fuzzy_conflicts), ) print_section( "Normalized node_code conflicts (same compact code, different skill_id):", normalized_conflicts, args.print_limit, ) if args.include_fuzzy: print_section( "Fuzzy node_code conflicts (near compact codes, different skill_id):", fuzzy_conflicts, args.print_limit, ) print(f"\nSaved conflicts to: {args.output_path}") return 0 if __name__ == "__main__": raise SystemExit(main())