FoundationalASSIST / Code /detect_similar_node_codes.py
martinakaduc's picture
Upload folder using huggingface_hub
6256eb9 verified
#!/usr/bin/env python3
"""Detect skills that have similar node_code values but different skill_id values.
By default, the script detects conflicts after normalizing node_code values
(uppercasing and removing punctuation differences). It can also perform optional
fuzzy matching on normalized compact node codes.
"""
from __future__ import annotations
import argparse
import re
from difflib import SequenceMatcher
from itertools import combinations
from pathlib import Path
import pandas as pd
DEFAULT_SKILLS_PATH = Path(__file__).resolve().parent.parent / "Data" / "Skills.csv"
DEFAULT_OUTPUT_PATH = (
Path(__file__).resolve().parent.parent
/ "Results"
/ "similar_node_code_conflicts.csv"
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Detect skills whose node_code values are similar but map to "
"different skill_id values."
)
)
parser.add_argument(
"--skills-path",
type=Path,
default=DEFAULT_SKILLS_PATH,
help="Path to Skills.csv.",
)
parser.add_argument(
"--output-path",
type=Path,
default=DEFAULT_OUTPUT_PATH,
help="Path to save the detected conflicts as CSV.",
)
parser.add_argument(
"--include-fuzzy",
action="store_true",
help="Also run fuzzy matching across compact node_code values.",
)
parser.add_argument(
"--similarity-threshold",
type=float,
default=0.9,
help="Minimum SequenceMatcher ratio for fuzzy matches (0.0 to 1.0).",
)
parser.add_argument(
"--max-fuzzy-pairs",
type=int,
default=200,
help="Maximum number of fuzzy match pairs to keep after sorting.",
)
parser.add_argument(
"--print-limit",
type=int,
default=20,
help="Maximum number of rows to print for each conflict section.",
)
return parser.parse_args()
def normalize_node_code(node_code: str) -> str:
canonical = re.sub(r"[^A-Za-z0-9]+", ".", node_code.upper().strip())
canonical = re.sub(r"\.+", ".", canonical).strip(".")
return canonical
def compact_node_code(canonical_node_code: str) -> str:
return canonical_node_code.replace(".", "")
def unique_sorted_strings(series: pd.Series) -> list[str]:
values: set[str] = set()
for value in series.dropna():
text = str(value).strip()
if text:
values.add(text)
return sorted(values)
def unique_sorted_ints(series: pd.Series) -> list[int]:
values: set[int] = set()
for value in series.dropna():
values.add(int(value))
return sorted(values)
def join_pipe(values: list[str]) -> str:
return " | ".join(values)
def join_csv_ints(values: list[int]) -> str:
return ",".join(str(v) for v in values)
def load_skills(skills_path: Path) -> pd.DataFrame:
required_columns = ["problem_id", "skill_id", "node_code", "node_name"]
df = pd.read_csv(skills_path, usecols=required_columns, low_memory=False)
df["problem_id"] = pd.to_numeric(df["problem_id"], errors="coerce")
df["skill_id"] = pd.to_numeric(df["skill_id"], errors="coerce")
df = df.dropna(subset=["problem_id", "skill_id", "node_code"]).copy()
df["problem_id"] = df["problem_id"].astype(int)
df["skill_id"] = df["skill_id"].astype(int)
df["node_code"] = df["node_code"].astype(str).str.strip()
df["node_name"] = df["node_name"].fillna("").astype(str).str.strip()
df = df[df["node_code"] != ""].copy()
df["node_code_canonical"] = df["node_code"].apply(normalize_node_code)
df["node_code_compact"] = df["node_code_canonical"].apply(compact_node_code)
return df
def summarize_compact_codes(df: pd.DataFrame) -> pd.DataFrame:
summary = (
df.groupby("node_code_compact", sort=True)
.agg(
canonical_node_codes=("node_code_canonical", unique_sorted_strings),
raw_node_codes=("node_code", unique_sorted_strings),
skill_ids=("skill_id", unique_sorted_ints),
node_names=("node_name", unique_sorted_strings),
problem_count=("problem_id", "nunique"),
mapping_count=("skill_id", "size"),
)
.reset_index()
.rename(columns={"node_code_compact": "compact_node_code"})
)
summary["n_skill_ids"] = summary["skill_ids"].apply(len)
return summary
def build_normalized_conflicts(summary: pd.DataFrame) -> pd.DataFrame:
conflicts = summary[summary["n_skill_ids"] > 1].copy()
if conflicts.empty:
return conflicts
conflicts.insert(0, "conflict_type", "normalized_match")
conflicts["skill_ids"] = conflicts["skill_ids"].apply(join_csv_ints)
conflicts["canonical_node_codes"] = conflicts["canonical_node_codes"].apply(
join_pipe
)
conflicts["raw_node_codes"] = conflicts["raw_node_codes"].apply(join_pipe)
conflicts["node_names"] = conflicts["node_names"].apply(join_pipe)
return conflicts.sort_values(
["n_skill_ids", "compact_node_code"], ascending=[False, True]
)
def build_fuzzy_conflicts(
summary: pd.DataFrame,
threshold: float,
max_pairs: int,
) -> pd.DataFrame:
rows: list[dict[str, object]] = []
records = summary.to_dict(orient="records")
for left, right in combinations(records, 2):
left_code = str(left["compact_node_code"])
right_code = str(right["compact_node_code"])
if left_code == right_code:
continue
similarity = SequenceMatcher(None, left_code, right_code).ratio()
if similarity < threshold:
continue
left_skills = set(left["skill_ids"])
right_skills = set(right["skill_ids"])
if left_skills == right_skills:
continue
rows.append(
{
"conflict_type": "fuzzy_match",
"similarity": round(similarity, 4),
"left_compact_node_code": left_code,
"right_compact_node_code": right_code,
"left_canonical_node_codes": join_pipe(left["canonical_node_codes"]),
"right_canonical_node_codes": join_pipe(right["canonical_node_codes"]),
"left_skill_ids": join_csv_ints(left["skill_ids"]),
"right_skill_ids": join_csv_ints(right["skill_ids"]),
"overlap_skill_ids": join_csv_ints(
sorted(left_skills.intersection(right_skills))
),
}
)
fuzzy = pd.DataFrame(rows)
if fuzzy.empty:
return fuzzy
fuzzy = fuzzy.sort_values(
["similarity", "left_compact_node_code", "right_compact_node_code"],
ascending=[False, True, True],
)
if max_pairs > 0:
fuzzy = fuzzy.head(max_pairs).copy()
return fuzzy
def print_section(title: str, df: pd.DataFrame, print_limit: int) -> None:
print(f"\n{title}")
if df.empty:
print(" None")
return
to_show = df.head(print_limit)
print(to_show.to_string(index=False))
if len(df) > len(to_show):
print(f" ... ({len(df) - len(to_show)} more rows)")
def main() -> int:
args = parse_args()
if args.similarity_threshold < 0.0 or args.similarity_threshold > 1.0:
raise ValueError("--similarity-threshold must be in [0.0, 1.0].")
if not args.skills_path.exists():
raise FileNotFoundError(f"Skills file not found: {args.skills_path}")
skills_df = load_skills(args.skills_path)
summary_df = summarize_compact_codes(skills_df)
normalized_conflicts = build_normalized_conflicts(summary_df)
fuzzy_conflicts = pd.DataFrame()
if args.include_fuzzy:
fuzzy_conflicts = build_fuzzy_conflicts(
summary_df,
threshold=args.similarity_threshold,
max_pairs=args.max_fuzzy_pairs,
)
frames = [normalized_conflicts]
if args.include_fuzzy:
frames.append(fuzzy_conflicts)
combined_output = pd.concat(frames, ignore_index=True, sort=False)
args.output_path.parent.mkdir(parents=True, exist_ok=True)
combined_output.to_csv(args.output_path, index=False)
print("Loaded rows:", len(skills_df))
print("Unique compact node codes:", len(summary_df))
print("Normalized conflicts:", len(normalized_conflicts))
if args.include_fuzzy:
print(
"Fuzzy conflicts (threshold " f"{args.similarity_threshold:.2f}):",
len(fuzzy_conflicts),
)
print_section(
"Normalized node_code conflicts (same compact code, different skill_id):",
normalized_conflicts,
args.print_limit,
)
if args.include_fuzzy:
print_section(
"Fuzzy node_code conflicts (near compact codes, different skill_id):",
fuzzy_conflicts,
args.print_limit,
)
print(f"\nSaved conflicts to: {args.output_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())