Rs_mini_projrct / src /evaluate_pipeline.py
Harshilforworks's picture
Upload 33 files
4f10909 verified
"""Evaluation utilities for the tech -> FAISS recommender pipeline.
Produces: precision@k, recall@k, F1@k, RMSE, MAE, confusion matrix (top-1), and plots.
Usage:
python backend/src/evaluate_pipeline.py --test-csv backend/Dataset/test_cases.csv --tech-col tech --true-col true_ids --k 5
Test CSV should contain one row per case with a tech input string and a ground-truth job id or list.
true_ids may be a single integer, a JSON array string ("[1,2]") or comma/semicolon separated ids.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import List, Dict, Any
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
try:
from src.pipeline_tech_match import pipeline_match_from_tech
except Exception:
pipeline_match_from_tech = None # type: ignore
def _parse_true_ids(val) -> List[int]:
if pd.isna(val):
return []
if isinstance(val, (list, tuple)):
return [int(x) for x in val]
s = str(val).strip()
if not s:
return []
# try JSON
try:
parsed = json.loads(s)
if isinstance(parsed, list):
return [int(x) for x in parsed]
except Exception:
pass
# split by common separators
sep = "," if "," in s else ";" if ";" in s else " "
parts = [p.strip() for p in s.split(sep) if p.strip()]
ids = []
for p in parts:
try:
ids.append(int(p))
except Exception:
pass
return ids
def evaluate_cases(predictions: List[Dict[str, Any]], truths: List[List[int]], k: int) -> Dict[str, Any]:
n = len(predictions)
precisions = [] # Precision@K for each query
recalls = [] # Recall@K for each query
f1s = [] # F1@K for each query
aps = [] # AP@K for each query
y_true_all = []
y_score_all = []
top1_pred = []
top1_true = []
for pred, true_ids in zip(predictions, truths):
matches = pred.get("matches", [])[:k]
pred_ids = [m.get("id") for m in matches]
scores = [m.get("score", 0.0) for m in matches]
# For each rank position, is it relevant (1) or not (0)
rels = [1 if (pid in true_ids) else 0 for pid in pred_ids]
n_true = len(true_ids) # Total relevant items
# Calculate TP@K (number of relevant items in top K)
tp = sum(rels)
# Precision@K = TP@K / K
prec = tp / k if k > 0 else 0.0
# Recall@K = TP@K / Total Relevant Items
rec = tp / n_true if n_true > 0 else 0.0
# F1@K = 2 * (Precision@K * Recall@K) / (Precision@K + Recall@K)
f1 = (2 * prec * rec / (prec + rec)) if (prec + rec) > 0 else 0.0
# Calculate AP@K (Average Precision at K)
# For each relevant item found, calculate precision up to that rank
cum_tp = 0 # Cumulative true positives
precisions_at_rel = [] # Precisions at ranks where relevant items found
for i, rel in enumerate(rels, 1):
if rel == 1:
cum_tp += 1
prec_at_i = cum_tp / i
precisions_at_rel.append(prec_at_i)
# AP@K is sum of precisions at relevant ranks divided by total relevant items
ap = sum(precisions_at_rel) / n_true if n_true > 0 else 0.0
# Store metrics for this query
precisions.append(prec)
recalls.append(rec)
f1s.append(f1)
aps.append(ap)
# For confusion matrix
top1_pred.append(pred_ids[0] if pred_ids else None)
top1_true.append(true_ids[0] if true_ids else None)
# For RMSE/MAE
y_true_all.extend(rels)
y_score_all.extend(scores)
metrics: Dict[str, Any] = {
"n_cases": n,
# Mean metrics across all queries
"precision_at_k_mean": float(np.mean(precisions)) if precisions else 0.0,
"recall_at_k_mean": float(np.mean(recalls)) if recalls else 0.0,
"f1_at_k_mean": float(np.mean(f1s)) if f1s else 0.0,
"map_at_k": float(np.mean(aps)) if aps else 0.0, # Mean Average Precision@K
# Standard deviations
"precision_at_k_std": float(np.std(precisions)) if precisions else 0.0,
"recall_at_k_std": float(np.std(recalls)) if recalls else 0.0,
"f1_at_k_std": float(np.std(f1s)) if f1s else 0.0,
"ap_at_k_std": float(np.std(aps)) if aps else 0.0,
}
if y_true_all:
# Some sklearn versions don't accept the `squared` kwarg; compute RMSE as sqrt(MSE)
mse = mean_squared_error(y_true_all, y_score_all)
metrics["rmse_all_candidates"] = float(np.sqrt(mse))
metrics["mae_all_candidates"] = float(mean_absolute_error(y_true_all, y_score_all))
else:
metrics["rmse_all_candidates"] = None
metrics["mae_all_candidates"] = None
# confusion matrix for top-1 (only works if labels are small in number otherwise it's large)
try:
valid_pairs = [(t, p) for t, p in zip(top1_true, top1_pred) if t is not None and p is not None]
if valid_pairs:
y_t = [t for t, _ in valid_pairs]
y_p = [p for _, p in valid_pairs]
cm = confusion_matrix(y_t, y_p)
metrics["confusion_matrix"] = cm.tolist()
else:
metrics["confusion_matrix"] = None
except Exception:
metrics["confusion_matrix"] = None
metrics["per_case_precision"] = precisions
metrics["per_case_recall"] = recalls
metrics["per_case_f1"] = f1s
return metrics
def plot_and_save(precisions: List[float], recalls: List[float], confusion: Any, out_dir: Path):
out_dir.mkdir(parents=True, exist_ok=True)
plt.figure(figsize=(8, 4))
sns.histplot(precisions, bins=20)
plt.title("Precision@K distribution")
plt.xlabel("Precision@K")
plt.savefig(out_dir / "precision_dist.png")
plt.close()
plt.figure(figsize=(8, 4))
sns.histplot(recalls, bins=20)
plt.title("Recall@K distribution")
plt.xlabel("Recall@K")
plt.savefig(out_dir / "recall_dist.png")
plt.close()
if confusion is not None:
plt.figure(figsize=(8, 6))
sns.heatmap(np.array(confusion), annot=True, fmt="d", cmap="Blues")
plt.title("Top-1 Confusion Matrix")
plt.savefig(out_dir / "confusion_top1.png")
plt.close()
def main():
# Fixed parameters
k = 5
ks = [k]
max_k = k
# Setup paths
base_dir = Path(__file__).resolve().parent.parent
csv_path = base_dir / "Dataset" / "test_cases_preprocessed.csv" # Using preprocessed file
tech_col = "tech_stack"
out_dir = base_dir / "eval_outputs"
# Ensure output directory exists
out_dir.mkdir(parents=True, exist_ok=True)
if not csv_path.exists():
raise FileNotFoundError(f"Test CSV not found: {csv_path}")
df = pd.read_csv(csv_path)
if tech_col not in df.columns:
raise KeyError(f"Tech column '{tech_col}' not found in {csv_path}. Available columns: {list(df.columns)}")
techs = df[tech_col].astype(str).tolist()
# Determine ground-truth ids
truths: List[List[int]] = []
if "Title" in df.columns:
# Try to derive numeric ids from FAISS metadata by matching Title -> id
# locate metadata (try same candidates as pipeline)
meta_path = Path(__file__).resolve().parent.parent / "Vector_db" / "faiss_metadata.json"
if not meta_path.exists():
# try other likely locations
candidates = [
Path(__file__).resolve().parent.parent / "Vector_db" / "faiss_metadata.json",
Path(__file__).resolve().parent / "faiss_metadata.json",
Path.cwd() / "backend" / "Vector_db" / "faiss_metadata.json",
Path.cwd() / "backend" / "faiss_metadata.json",
Path.cwd() / "Vector_db" / "faiss_metadata.json",
]
found = False
for c in candidates:
if c.exists():
meta_path = c
found = True
break
if not found:
meta_path = None
title_to_id: Dict[str, int] = {}
if meta_path and meta_path.exists():
meta = json.loads(meta_path.read_text(encoding="utf-8"))
# meta is id -> title; invert for lookup
for k, v in meta.items():
if v is None:
continue
title_to_id[str(v).strip()] = int(k)
for t in df["Title"].astype(str).tolist():
tid = title_to_id.get(t.strip())
truths.append([tid] if tid is not None else [])
else:
truths = [[] for _ in techs]
# Attempt a batched evaluation: load model + index once and search all queries in one batch
predictions: List[Dict[str, Any]] = []
use_fast_path = True
try:
from sentence_transformers import SentenceTransformer
import faiss
# import token helper from pipeline for consistent tokenization
try:
from src.pipeline_tech_match import tokens_from_text
except Exception:
from pipeline_tech_match import tokens_from_text # type: ignore
except Exception:
SentenceTransformer = None # type: ignore
faiss = None # type: ignore
tokens_from_text = None # type: ignore
use_fast_path = False
if use_fast_path and pipeline_match_from_tech is not None:
# Use pipeline directly instead of fast path
for tech in techs:
# Call pipeline directly with raw tech stack
result = pipeline_match_from_tech(tech, top_k=max_k)
predictions.append(result)
use_fast_path = False # Skip the rest of fast path processing
elif use_fast_path and SentenceTransformer is not None and faiss is not None:
# Fallback to direct embedding if pipeline is not available
model_name = "all-MiniLM-L6-v2"
model = SentenceTransformer(model_name)
# Pass raw tech stack strings directly to pipeline
skills_texts = techs
emb = model.encode(skills_texts, convert_to_numpy=True)
vecs = np.asarray(emb, dtype=np.float32)
try:
faiss.normalize_L2(vecs)
except Exception:
pass
# locate FAISS index and metadata
idx_path = Path(__file__).resolve().parent.parent / "Vector_db" / "faiss_index.faiss"
meta_path = Path(__file__).resolve().parent.parent / "Vector_db" / "faiss_metadata.json"
if not idx_path.exists():
candidates = [
Path(__file__).resolve().parent.parent / "Vector_db" / "faiss_index.faiss",
Path(__file__).resolve().parent / "faiss_index.faiss",
Path.cwd() / "backend" / "Vector_db" / "faiss_index.faiss",
Path.cwd() / "backend" / "faiss_index.faiss",
Path.cwd() / "Vector_db" / "faiss_index.faiss",
]
for c in candidates:
if c.exists():
idx_path = c
break
if not meta_path.exists():
candidates = [
Path(__file__).resolve().parent.parent / "Vector_db" / "faiss_metadata.json",
Path(__file__).resolve().parent / "faiss_metadata.json",
Path.cwd() / "backend" / "Vector_db" / "faiss_metadata.json",
Path.cwd() / "backend" / "faiss_metadata.json",
Path.cwd() / "Vector_db" / "faiss_metadata.json",
]
for c in candidates:
if c.exists():
meta_path = c
break
if not idx_path.exists() or not meta_path.exists():
# fallback to per-query pipeline if index/metadata not found
use_fast_path = False
if use_fast_path:
index = faiss.read_index(str(idx_path))
meta = json.loads(meta_path.read_text(encoding="utf-8"))
# batch search using max_k (we will slice down for smaller Ks)
D, I = index.search(vecs, max_k)
for row_scores, row_ids, skills_text in zip(D.tolist(), I.tolist(), skills_texts):
matches = []
for score, iid in zip(row_scores, row_ids):
if iid < 0:
continue
key = str(iid)
title = meta.get(key)
matches.append({"id": int(iid), "title": title, "score": float(score)})
predictions.append({"skills_text": skills_text, "matches": matches})
else:
# Fallback: call pipeline_match_from_tech per-case (slower)
if pipeline_match_from_tech is None:
raise RuntimeError("pipeline_match_from_tech not importable and batch path unavailable. Ensure deps are installed.")
for tech in techs:
out = pipeline_match_from_tech(tech, top_k=max_k)
predictions.append(out)
# Create output directory
out_dir.mkdir(parents=True, exist_ok=True)
# Save predictions for inspection
(out_dir / "predictions.json").write_text(json.dumps(predictions, ensure_ascii=False, indent=2), encoding="utf-8")
# Print detailed results for first result as example
if predictions:
idx = 0
single_pred = predictions[idx]
single_truth = truths[idx] if idx < len(truths) else []
# Determine rank of first ground-truth id (if present)
found_rank = None
found_any = False
found_top1 = False
pred_ids = [m.get("id") for m in single_pred.get("matches", [])]
for pos, pid in enumerate(pred_ids, start=1):
if pid in single_truth:
found_any = True
found_rank = pos
break
if pred_ids:
found_top1 = (pred_ids[0] in single_truth) if single_truth else False
single_out = {
"row_index": idx,
"tech": df.iloc[idx].get(tech_col) if tech_col in df.columns else None,
"title": df.iloc[idx].get("Title") if "Title" in df.columns else None,
"ground_truth_ids": single_truth,
"found_any": found_any,
"found_rank": found_rank,
"found_top1": found_top1,
"matches": single_pred.get("matches", []),
}
# Save example prediction
single_path = out_dir / f"example_case.json"
single_path.write_text(json.dumps(single_out, ensure_ascii=False, indent=2), encoding="utf-8")
# Evaluate for each requested K (Precision@K, Recall@K, F1@K)
metrics_per_k: Dict[int, Dict[str, Any]] = {}
out_dir.mkdir(parents=True, exist_ok=True)
for k in ks:
m = evaluate_cases(predictions, truths, k)
metrics_per_k[int(k)] = m
# save per-K distribution plots in subfolder k{K}
plot_and_save(m.get("per_case_precision", []), m.get("per_case_recall", []), m.get("confusion_matrix") if k == 1 else None, out_dir / f"k{k}")
# write combined summary
summary = {"ks": ks, "metrics": metrics_per_k}
(out_dir / "metrics_summary.json").write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
# Print readable summary
print("\n=== Evaluation Results ===")
print(f"Number of test cases: {len(predictions)}")
for k in ks:
metrics = metrics_per_k[k]
print(f"\nResults for K={k}:")
print("Aggregated Metrics:")
print(f" Average Precision: {metrics['precision_at_k_mean']:.3f} (std: {metrics['precision_at_k_std']:.3f})")
print(f" Average Recall: {metrics['recall_at_k_mean']:.3f} (std: {metrics['recall_at_k_std']:.3f})")
print(f" Average F1 Score: {metrics['f1_at_k_mean']:.3f} (std: {metrics['f1_at_k_std']:.3f})")
print(f" Mean Average Precision: {metrics['map_at_k']:.3f} (std: {metrics['ap_at_k_std']:.3f})")
print("\nPer-Case Results:")
for i, (pred, truth) in enumerate(zip(predictions, truths)):
title = df.iloc[i]["Title"]
matches = pred.get("matches", [])[:3] # Only show top 3 for each case
print(f"\nCase {i+1}: {title}")
print("Ground Truth IDs:", truth)
print("Top 3 Recommendations:")
for j, match in enumerate(matches, 1):
score = match.get("score", 0.0)
matched_title = match.get("title", "Unknown")
is_correct = "✓" if match.get("id") in truth else " "
print(f" {j}. {matched_title} (score: {score:.3f}) {is_correct}")
if __name__ == "__main__":
main()