| |
| """Detect skills that have similar node_code values but different skill_id values. |
| |
| By default, the script detects conflicts after normalizing node_code values |
| (uppercasing and removing punctuation differences). It can also perform optional |
| fuzzy matching on normalized compact node codes. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import re |
| from difflib import SequenceMatcher |
| from itertools import combinations |
| from pathlib import Path |
|
|
| import pandas as pd |
|
|
|
|
| DEFAULT_SKILLS_PATH = Path(__file__).resolve().parent.parent / "Data" / "Skills.csv" |
| DEFAULT_OUTPUT_PATH = ( |
| Path(__file__).resolve().parent.parent |
| / "Results" |
| / "similar_node_code_conflicts.csv" |
| ) |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description=( |
| "Detect skills whose node_code values are similar but map to " |
| "different skill_id values." |
| ) |
| ) |
| parser.add_argument( |
| "--skills-path", |
| type=Path, |
| default=DEFAULT_SKILLS_PATH, |
| help="Path to Skills.csv.", |
| ) |
| parser.add_argument( |
| "--output-path", |
| type=Path, |
| default=DEFAULT_OUTPUT_PATH, |
| help="Path to save the detected conflicts as CSV.", |
| ) |
| parser.add_argument( |
| "--include-fuzzy", |
| action="store_true", |
| help="Also run fuzzy matching across compact node_code values.", |
| ) |
| parser.add_argument( |
| "--similarity-threshold", |
| type=float, |
| default=0.9, |
| help="Minimum SequenceMatcher ratio for fuzzy matches (0.0 to 1.0).", |
| ) |
| parser.add_argument( |
| "--max-fuzzy-pairs", |
| type=int, |
| default=200, |
| help="Maximum number of fuzzy match pairs to keep after sorting.", |
| ) |
| parser.add_argument( |
| "--print-limit", |
| type=int, |
| default=20, |
| help="Maximum number of rows to print for each conflict section.", |
| ) |
| return parser.parse_args() |
|
|
|
|
| def normalize_node_code(node_code: str) -> str: |
| canonical = re.sub(r"[^A-Za-z0-9]+", ".", node_code.upper().strip()) |
| canonical = re.sub(r"\.+", ".", canonical).strip(".") |
| return canonical |
|
|
|
|
| def compact_node_code(canonical_node_code: str) -> str: |
| return canonical_node_code.replace(".", "") |
|
|
|
|
| def unique_sorted_strings(series: pd.Series) -> list[str]: |
| values: set[str] = set() |
| for value in series.dropna(): |
| text = str(value).strip() |
| if text: |
| values.add(text) |
| return sorted(values) |
|
|
|
|
| def unique_sorted_ints(series: pd.Series) -> list[int]: |
| values: set[int] = set() |
| for value in series.dropna(): |
| values.add(int(value)) |
| return sorted(values) |
|
|
|
|
| def join_pipe(values: list[str]) -> str: |
| return " | ".join(values) |
|
|
|
|
| def join_csv_ints(values: list[int]) -> str: |
| return ",".join(str(v) for v in values) |
|
|
|
|
| def load_skills(skills_path: Path) -> pd.DataFrame: |
| required_columns = ["problem_id", "skill_id", "node_code", "node_name"] |
| df = pd.read_csv(skills_path, usecols=required_columns, low_memory=False) |
|
|
| df["problem_id"] = pd.to_numeric(df["problem_id"], errors="coerce") |
| df["skill_id"] = pd.to_numeric(df["skill_id"], errors="coerce") |
|
|
| df = df.dropna(subset=["problem_id", "skill_id", "node_code"]).copy() |
| df["problem_id"] = df["problem_id"].astype(int) |
| df["skill_id"] = df["skill_id"].astype(int) |
|
|
| df["node_code"] = df["node_code"].astype(str).str.strip() |
| df["node_name"] = df["node_name"].fillna("").astype(str).str.strip() |
| df = df[df["node_code"] != ""].copy() |
|
|
| df["node_code_canonical"] = df["node_code"].apply(normalize_node_code) |
| df["node_code_compact"] = df["node_code_canonical"].apply(compact_node_code) |
| return df |
|
|
|
|
| def summarize_compact_codes(df: pd.DataFrame) -> pd.DataFrame: |
| summary = ( |
| df.groupby("node_code_compact", sort=True) |
| .agg( |
| canonical_node_codes=("node_code_canonical", unique_sorted_strings), |
| raw_node_codes=("node_code", unique_sorted_strings), |
| skill_ids=("skill_id", unique_sorted_ints), |
| node_names=("node_name", unique_sorted_strings), |
| problem_count=("problem_id", "nunique"), |
| mapping_count=("skill_id", "size"), |
| ) |
| .reset_index() |
| .rename(columns={"node_code_compact": "compact_node_code"}) |
| ) |
|
|
| summary["n_skill_ids"] = summary["skill_ids"].apply(len) |
| return summary |
|
|
|
|
| def build_normalized_conflicts(summary: pd.DataFrame) -> pd.DataFrame: |
| conflicts = summary[summary["n_skill_ids"] > 1].copy() |
| if conflicts.empty: |
| return conflicts |
|
|
| conflicts.insert(0, "conflict_type", "normalized_match") |
| conflicts["skill_ids"] = conflicts["skill_ids"].apply(join_csv_ints) |
| conflicts["canonical_node_codes"] = conflicts["canonical_node_codes"].apply( |
| join_pipe |
| ) |
| conflicts["raw_node_codes"] = conflicts["raw_node_codes"].apply(join_pipe) |
| conflicts["node_names"] = conflicts["node_names"].apply(join_pipe) |
|
|
| return conflicts.sort_values( |
| ["n_skill_ids", "compact_node_code"], ascending=[False, True] |
| ) |
|
|
|
|
| def build_fuzzy_conflicts( |
| summary: pd.DataFrame, |
| threshold: float, |
| max_pairs: int, |
| ) -> pd.DataFrame: |
| rows: list[dict[str, object]] = [] |
|
|
| records = summary.to_dict(orient="records") |
| for left, right in combinations(records, 2): |
| left_code = str(left["compact_node_code"]) |
| right_code = str(right["compact_node_code"]) |
|
|
| if left_code == right_code: |
| continue |
|
|
| similarity = SequenceMatcher(None, left_code, right_code).ratio() |
| if similarity < threshold: |
| continue |
|
|
| left_skills = set(left["skill_ids"]) |
| right_skills = set(right["skill_ids"]) |
| if left_skills == right_skills: |
| continue |
|
|
| rows.append( |
| { |
| "conflict_type": "fuzzy_match", |
| "similarity": round(similarity, 4), |
| "left_compact_node_code": left_code, |
| "right_compact_node_code": right_code, |
| "left_canonical_node_codes": join_pipe(left["canonical_node_codes"]), |
| "right_canonical_node_codes": join_pipe(right["canonical_node_codes"]), |
| "left_skill_ids": join_csv_ints(left["skill_ids"]), |
| "right_skill_ids": join_csv_ints(right["skill_ids"]), |
| "overlap_skill_ids": join_csv_ints( |
| sorted(left_skills.intersection(right_skills)) |
| ), |
| } |
| ) |
|
|
| fuzzy = pd.DataFrame(rows) |
| if fuzzy.empty: |
| return fuzzy |
|
|
| fuzzy = fuzzy.sort_values( |
| ["similarity", "left_compact_node_code", "right_compact_node_code"], |
| ascending=[False, True, True], |
| ) |
| if max_pairs > 0: |
| fuzzy = fuzzy.head(max_pairs).copy() |
| return fuzzy |
|
|
|
|
| def print_section(title: str, df: pd.DataFrame, print_limit: int) -> None: |
| print(f"\n{title}") |
| if df.empty: |
| print(" None") |
| return |
|
|
| to_show = df.head(print_limit) |
| print(to_show.to_string(index=False)) |
| if len(df) > len(to_show): |
| print(f" ... ({len(df) - len(to_show)} more rows)") |
|
|
|
|
| def main() -> int: |
| args = parse_args() |
|
|
| if args.similarity_threshold < 0.0 or args.similarity_threshold > 1.0: |
| raise ValueError("--similarity-threshold must be in [0.0, 1.0].") |
|
|
| if not args.skills_path.exists(): |
| raise FileNotFoundError(f"Skills file not found: {args.skills_path}") |
|
|
| skills_df = load_skills(args.skills_path) |
| summary_df = summarize_compact_codes(skills_df) |
|
|
| normalized_conflicts = build_normalized_conflicts(summary_df) |
| fuzzy_conflicts = pd.DataFrame() |
| if args.include_fuzzy: |
| fuzzy_conflicts = build_fuzzy_conflicts( |
| summary_df, |
| threshold=args.similarity_threshold, |
| max_pairs=args.max_fuzzy_pairs, |
| ) |
|
|
| frames = [normalized_conflicts] |
| if args.include_fuzzy: |
| frames.append(fuzzy_conflicts) |
| combined_output = pd.concat(frames, ignore_index=True, sort=False) |
|
|
| args.output_path.parent.mkdir(parents=True, exist_ok=True) |
| combined_output.to_csv(args.output_path, index=False) |
|
|
| print("Loaded rows:", len(skills_df)) |
| print("Unique compact node codes:", len(summary_df)) |
| print("Normalized conflicts:", len(normalized_conflicts)) |
| if args.include_fuzzy: |
| print( |
| "Fuzzy conflicts (threshold " f"{args.similarity_threshold:.2f}):", |
| len(fuzzy_conflicts), |
| ) |
|
|
| print_section( |
| "Normalized node_code conflicts (same compact code, different skill_id):", |
| normalized_conflicts, |
| args.print_limit, |
| ) |
|
|
| if args.include_fuzzy: |
| print_section( |
| "Fuzzy node_code conflicts (near compact codes, different skill_id):", |
| fuzzy_conflicts, |
| args.print_limit, |
| ) |
|
|
| print(f"\nSaved conflicts to: {args.output_path}") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|