import json import numpy as np from sklearn.metrics import precision_recall_fscore_support, accuracy_score from typing import List, Dict import re import os class EvaluationMetrics: """Evaluate model performance on both tasks""" @staticmethod def evaluate_extraction(predictions: List[Dict], ground_truth: List[Dict]) -> Dict: """Evaluate extraction task performance""" metrics = { "overall_accuracy": 0, "field_accuracies": {}, "total_samples": len(predictions) } all_correct = 0 field_correct = {} field_counts = {} # Extract field names fields = ["name", "email", "phone", "skills", "experience", "education", "certifications"] for field in fields: field_correct[field] = 0 field_counts[field] = 0 for pred, truth in zip(predictions, ground_truth): for field in fields: if field in pred and field in truth: field_counts[field] += 1 # Compare field values if isinstance(pred[field], (list, dict)): if json.dumps(pred[field], sort_keys=True) == json.dumps(truth[field], sort_keys=True): field_correct[field] += 1 else: if str(pred[field]).lower() == str(truth[field]).lower(): field_correct[field] += 1 # Calculate field accuracies for field in fields: if field_counts[field] > 0: accuracy = field_correct[field] / field_counts[field] metrics["field_accuracies"][field] = accuracy # Overall accuracy total_fields = sum(field_counts.values()) if total_fields > 0: metrics["overall_accuracy"] = sum(field_correct.values()) / total_fields return metrics @staticmethod def evaluate_matching(predictions: List[Dict], ground_truth: List[Dict]) -> Dict: """Evaluate matching task performance""" metrics = { "score_rmse": 0, "score_mae": 0, "skill_matching_precision": 0, "skill_matching_recall": 0, "recommendation_accuracy": 0, "total_samples": len(predictions) } score_errors = [] correct_recommendations = 0 all_matching_skills = [] all_pred_matching_skills = [] for pred, truth in zip(predictions, ground_truth): # Score error if "match_score" in pred and "match_score" in truth: score_errors.append(abs(pred["match_score"] - truth["match_score"])) # Recommendation accuracy if "recommendation" in pred and "recommendation" in truth: if pred["recommendation"].lower() == truth["recommendation"].lower(): correct_recommendations += 1 # Skill matching if "matching_skills" in pred and "matching_skills" in truth: all_pred_matching_skills.extend(pred.get("matching_skills", [])) all_matching_skills.extend(truth.get("matching_skills", [])) if score_errors: metrics["score_rmse"] = np.sqrt(np.mean(np.array(score_errors)**2)) metrics["score_mae"] = np.mean(score_errors) if len(predictions) > 0: metrics["recommendation_accuracy"] = correct_recommendations / len(predictions) # Skill matching metrics if all_matching_skills or all_pred_matching_skills: # Simple precision/recall for skills correct_skills = len(set(all_pred_matching_skills) & set(all_matching_skills)) if all_pred_matching_skills: metrics["skill_matching_precision"] = correct_skills / len(set(all_pred_matching_skills)) if all_matching_skills: metrics["skill_matching_recall"] = correct_skills / len(set(all_matching_skills)) return metrics @staticmethod def print_metrics(metrics: Dict, task: str): """Pretty print metrics""" print(f"\n{'='*50}") print(f"EVALUATION RESULTS - {task.upper()}") print(f"{'='*50}") for key, value in metrics.items(): if isinstance(value, float): print(f"{key}: {value:.4f}") elif isinstance(value, dict): print(f"\n{key}:") for sub_key, sub_value in value.items(): if isinstance(sub_value, float): print(f" {sub_key}: {sub_value:.4f}") else: print(f" {sub_key}: {sub_value}") else: print(f"{key}: {value}") def evaluate_on_test_set(test_path: str = "data/processed/test.json", model_path: str = "models/checkpoints/final"): """Evaluate model on test set""" # Prefer package-relative import; fall back to absolute when executed as a script. try: from .inference import ResumeInferenceEngine except ImportError as e: if "attempted relative import" in str(e).lower(): from src.inference import ResumeInferenceEngine else: raise def _load_json_or_jsonl(path: str): with open(path, "r", encoding="utf-8") as f: content = f.read().strip() if not content: return [] # JSON array if content[0] == "[": return json.loads(content) # JSONL rows = [] for line in content.splitlines(): line = line.strip() if not line: continue rows.append(json.loads(line)) return rows def _safe_json_loads(text: str): try: return json.loads(text) except Exception: return None def _parse_match_score(text: str): # Accept formats like "Match Score: 0.82" or JSON {"match_score": 82} if not isinstance(text, str): return None match = re.search(r"match\s*score\s*[:=]\s*([0-9]*\.?[0-9]+)", text, flags=re.IGNORECASE) if not match: return None value = float(match.group(1)) # Normalize to 0-100 if it looks like 0-1 if value <= 1.0: value *= 100.0 return value # Load test data (supports JSON array or JSONL) test_data = _load_json_or_jsonl(test_path) # Initialize engine engine = ResumeInferenceEngine(model_path) # Separate by task (fallback: treat everything as matching) extraction_samples = [s for s in test_data if s.get("task") == "extraction"] matching_samples = [s for s in test_data if s.get("task") == "matching"] if not extraction_samples and not matching_samples: matching_samples = list(test_data) print(f"Evaluating on {len(extraction_samples)} extraction samples...") print(f"Evaluating on {len(matching_samples)} matching samples...") # Evaluate extraction extraction_preds = [] extraction_truth = [] for sample in extraction_samples: try: pred = engine.extract_resume(sample["input"]) extraction_preds.append(pred) truth = _safe_json_loads(sample.get("output", "")) extraction_truth.append(truth if isinstance(truth, dict) else {}) except Exception as e: print(f"Error on extraction sample: {e}") extraction_preds.append({}) extraction_metrics = EvaluationMetrics.evaluate_extraction(extraction_preds, extraction_truth) EvaluationMetrics.print_metrics(extraction_metrics, "extraction") # Evaluate matching matching_preds = [] matching_truth = [] for sample in matching_samples: try: input_text = sample.get("input", "") # Try to parse the expected delimiter; otherwise treat entire input as resume text. parts = input_text.split("\n\nJob Description:\n") if len(parts) == 2: resume = parts[0].replace("Resume:\n", "").strip() job = parts[1].strip() else: resume = input_text.strip() job = "" pred = engine.match_resume_to_job(resume, job) if job else engine.extract_resume(resume) matching_preds.append(pred) truth_obj = _safe_json_loads(sample.get("output", "")) if isinstance(truth_obj, dict): if "match_score" in truth_obj and isinstance(truth_obj["match_score"], (int, float)): # normalize to 0-100 if needed if truth_obj["match_score"] <= 1.0: truth_obj["match_score"] *= 100.0 matching_truth.append(truth_obj) else: # Fallback: parse numeric score from plain text outputs like "Match Score: 0.82" score = _parse_match_score(sample.get("output", "")) matching_truth.append({"match_score": score} if score is not None else {}) except Exception as e: print(f"Error on matching sample: {e}") matching_preds.append({}) matching_metrics = EvaluationMetrics.evaluate_matching(matching_preds, matching_truth) EvaluationMetrics.print_metrics(matching_metrics, "matching") # Save results results = { "extraction": extraction_metrics, "matching": matching_metrics } os.makedirs("results", exist_ok=True) with open("results/evaluation_results.json", "w", encoding="utf-8") as f: json.dump(results, f, indent=2) print("\n✅ Results saved to results/evaluation_results.json") return extraction_metrics, matching_metrics if __name__ == "__main__": import argparse import os parser = argparse.ArgumentParser() parser.add_argument("--test-path", default="data/processed/test.json") parser.add_argument("--model-path", default="models/checkpoints/final") args = parser.parse_args() os.makedirs("results", exist_ok=True) evaluate_on_test_set(args.test_path, args.model_path)