Spaces:
Running
Running
| import json | |
| import numpy as np | |
| from sklearn.metrics import precision_recall_fscore_support, accuracy_score | |
| from typing import List, Dict | |
| import re | |
| import os | |
| class EvaluationMetrics: | |
| """Evaluate model performance on both tasks""" | |
| def evaluate_extraction(predictions: List[Dict], ground_truth: List[Dict]) -> Dict: | |
| """Evaluate extraction task performance""" | |
| metrics = { | |
| "overall_accuracy": 0, | |
| "field_accuracies": {}, | |
| "total_samples": len(predictions) | |
| } | |
| all_correct = 0 | |
| field_correct = {} | |
| field_counts = {} | |
| # Extract field names | |
| fields = ["name", "email", "phone", "skills", "experience", "education", "certifications"] | |
| for field in fields: | |
| field_correct[field] = 0 | |
| field_counts[field] = 0 | |
| for pred, truth in zip(predictions, ground_truth): | |
| for field in fields: | |
| if field in pred and field in truth: | |
| field_counts[field] += 1 | |
| # Compare field values | |
| if isinstance(pred[field], (list, dict)): | |
| if json.dumps(pred[field], sort_keys=True) == json.dumps(truth[field], sort_keys=True): | |
| field_correct[field] += 1 | |
| else: | |
| if str(pred[field]).lower() == str(truth[field]).lower(): | |
| field_correct[field] += 1 | |
| # Calculate field accuracies | |
| for field in fields: | |
| if field_counts[field] > 0: | |
| accuracy = field_correct[field] / field_counts[field] | |
| metrics["field_accuracies"][field] = accuracy | |
| # Overall accuracy | |
| total_fields = sum(field_counts.values()) | |
| if total_fields > 0: | |
| metrics["overall_accuracy"] = sum(field_correct.values()) / total_fields | |
| return metrics | |
| def evaluate_matching(predictions: List[Dict], ground_truth: List[Dict]) -> Dict: | |
| """Evaluate matching task performance""" | |
| metrics = { | |
| "score_rmse": 0, | |
| "score_mae": 0, | |
| "skill_matching_precision": 0, | |
| "skill_matching_recall": 0, | |
| "recommendation_accuracy": 0, | |
| "total_samples": len(predictions) | |
| } | |
| score_errors = [] | |
| correct_recommendations = 0 | |
| all_matching_skills = [] | |
| all_pred_matching_skills = [] | |
| for pred, truth in zip(predictions, ground_truth): | |
| # Score error | |
| if "match_score" in pred and "match_score" in truth: | |
| score_errors.append(abs(pred["match_score"] - truth["match_score"])) | |
| # Recommendation accuracy | |
| if "recommendation" in pred and "recommendation" in truth: | |
| if pred["recommendation"].lower() == truth["recommendation"].lower(): | |
| correct_recommendations += 1 | |
| # Skill matching | |
| if "matching_skills" in pred and "matching_skills" in truth: | |
| all_pred_matching_skills.extend(pred.get("matching_skills", [])) | |
| all_matching_skills.extend(truth.get("matching_skills", [])) | |
| if score_errors: | |
| metrics["score_rmse"] = np.sqrt(np.mean(np.array(score_errors)**2)) | |
| metrics["score_mae"] = np.mean(score_errors) | |
| if len(predictions) > 0: | |
| metrics["recommendation_accuracy"] = correct_recommendations / len(predictions) | |
| # Skill matching metrics | |
| if all_matching_skills or all_pred_matching_skills: | |
| # Simple precision/recall for skills | |
| correct_skills = len(set(all_pred_matching_skills) & set(all_matching_skills)) | |
| if all_pred_matching_skills: | |
| metrics["skill_matching_precision"] = correct_skills / len(set(all_pred_matching_skills)) | |
| if all_matching_skills: | |
| metrics["skill_matching_recall"] = correct_skills / len(set(all_matching_skills)) | |
| return metrics | |
| def print_metrics(metrics: Dict, task: str): | |
| """Pretty print metrics""" | |
| print(f"\n{'='*50}") | |
| print(f"EVALUATION RESULTS - {task.upper()}") | |
| print(f"{'='*50}") | |
| for key, value in metrics.items(): | |
| if isinstance(value, float): | |
| print(f"{key}: {value:.4f}") | |
| elif isinstance(value, dict): | |
| print(f"\n{key}:") | |
| for sub_key, sub_value in value.items(): | |
| if isinstance(sub_value, float): | |
| print(f" {sub_key}: {sub_value:.4f}") | |
| else: | |
| print(f" {sub_key}: {sub_value}") | |
| else: | |
| print(f"{key}: {value}") | |
| def evaluate_on_test_set(test_path: str = "data/processed/test.json", | |
| model_path: str = "models/checkpoints/final"): | |
| """Evaluate model on test set""" | |
| # Prefer package-relative import; fall back to absolute when executed as a script. | |
| try: | |
| from .inference import ResumeInferenceEngine | |
| except ImportError as e: | |
| if "attempted relative import" in str(e).lower(): | |
| from src.inference import ResumeInferenceEngine | |
| else: | |
| raise | |
| def _load_json_or_jsonl(path: str): | |
| with open(path, "r", encoding="utf-8") as f: | |
| content = f.read().strip() | |
| if not content: | |
| return [] | |
| # JSON array | |
| if content[0] == "[": | |
| return json.loads(content) | |
| # JSONL | |
| rows = [] | |
| for line in content.splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| rows.append(json.loads(line)) | |
| return rows | |
| def _safe_json_loads(text: str): | |
| try: | |
| return json.loads(text) | |
| except Exception: | |
| return None | |
| def _parse_match_score(text: str): | |
| # Accept formats like "Match Score: 0.82" or JSON {"match_score": 82} | |
| if not isinstance(text, str): | |
| return None | |
| match = re.search(r"match\s*score\s*[:=]\s*([0-9]*\.?[0-9]+)", text, flags=re.IGNORECASE) | |
| if not match: | |
| return None | |
| value = float(match.group(1)) | |
| # Normalize to 0-100 if it looks like 0-1 | |
| if value <= 1.0: | |
| value *= 100.0 | |
| return value | |
| # Load test data (supports JSON array or JSONL) | |
| test_data = _load_json_or_jsonl(test_path) | |
| # Initialize engine | |
| engine = ResumeInferenceEngine(model_path) | |
| # Separate by task (fallback: treat everything as matching) | |
| extraction_samples = [s for s in test_data if s.get("task") == "extraction"] | |
| matching_samples = [s for s in test_data if s.get("task") == "matching"] | |
| if not extraction_samples and not matching_samples: | |
| matching_samples = list(test_data) | |
| print(f"Evaluating on {len(extraction_samples)} extraction samples...") | |
| print(f"Evaluating on {len(matching_samples)} matching samples...") | |
| # Evaluate extraction | |
| extraction_preds = [] | |
| extraction_truth = [] | |
| for sample in extraction_samples: | |
| try: | |
| pred = engine.extract_resume(sample["input"]) | |
| extraction_preds.append(pred) | |
| truth = _safe_json_loads(sample.get("output", "")) | |
| extraction_truth.append(truth if isinstance(truth, dict) else {}) | |
| except Exception as e: | |
| print(f"Error on extraction sample: {e}") | |
| extraction_preds.append({}) | |
| extraction_metrics = EvaluationMetrics.evaluate_extraction(extraction_preds, extraction_truth) | |
| EvaluationMetrics.print_metrics(extraction_metrics, "extraction") | |
| # Evaluate matching | |
| matching_preds = [] | |
| matching_truth = [] | |
| for sample in matching_samples: | |
| try: | |
| input_text = sample.get("input", "") | |
| # Try to parse the expected delimiter; otherwise treat entire input as resume text. | |
| parts = input_text.split("\n\nJob Description:\n") | |
| if len(parts) == 2: | |
| resume = parts[0].replace("Resume:\n", "").strip() | |
| job = parts[1].strip() | |
| else: | |
| resume = input_text.strip() | |
| job = "" | |
| pred = engine.match_resume_to_job(resume, job) if job else engine.extract_resume(resume) | |
| matching_preds.append(pred) | |
| truth_obj = _safe_json_loads(sample.get("output", "")) | |
| if isinstance(truth_obj, dict): | |
| if "match_score" in truth_obj and isinstance(truth_obj["match_score"], (int, float)): | |
| # normalize to 0-100 if needed | |
| if truth_obj["match_score"] <= 1.0: | |
| truth_obj["match_score"] *= 100.0 | |
| matching_truth.append(truth_obj) | |
| else: | |
| # Fallback: parse numeric score from plain text outputs like "Match Score: 0.82" | |
| score = _parse_match_score(sample.get("output", "")) | |
| matching_truth.append({"match_score": score} if score is not None else {}) | |
| except Exception as e: | |
| print(f"Error on matching sample: {e}") | |
| matching_preds.append({}) | |
| matching_metrics = EvaluationMetrics.evaluate_matching(matching_preds, matching_truth) | |
| EvaluationMetrics.print_metrics(matching_metrics, "matching") | |
| # Save results | |
| results = { | |
| "extraction": extraction_metrics, | |
| "matching": matching_metrics | |
| } | |
| os.makedirs("results", exist_ok=True) | |
| with open("results/evaluation_results.json", "w", encoding="utf-8") as f: | |
| json.dump(results, f, indent=2) | |
| print("\n✅ Results saved to results/evaluation_results.json") | |
| return extraction_metrics, matching_metrics | |
| if __name__ == "__main__": | |
| import argparse | |
| import os | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--test-path", default="data/processed/test.json") | |
| parser.add_argument("--model-path", default="models/checkpoints/final") | |
| args = parser.parse_args() | |
| os.makedirs("results", exist_ok=True) | |
| evaluate_on_test_set(args.test_path, args.model_path) | |