resume-llm-api / src /evaluate.py
mhr-212's picture
Upload folder using huggingface_hub
7e0c689 verified
import json
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
from typing import List, Dict
import re
import os
class EvaluationMetrics:
"""Evaluate model performance on both tasks"""
@staticmethod
def evaluate_extraction(predictions: List[Dict], ground_truth: List[Dict]) -> Dict:
"""Evaluate extraction task performance"""
metrics = {
"overall_accuracy": 0,
"field_accuracies": {},
"total_samples": len(predictions)
}
all_correct = 0
field_correct = {}
field_counts = {}
# Extract field names
fields = ["name", "email", "phone", "skills", "experience", "education", "certifications"]
for field in fields:
field_correct[field] = 0
field_counts[field] = 0
for pred, truth in zip(predictions, ground_truth):
for field in fields:
if field in pred and field in truth:
field_counts[field] += 1
# Compare field values
if isinstance(pred[field], (list, dict)):
if json.dumps(pred[field], sort_keys=True) == json.dumps(truth[field], sort_keys=True):
field_correct[field] += 1
else:
if str(pred[field]).lower() == str(truth[field]).lower():
field_correct[field] += 1
# Calculate field accuracies
for field in fields:
if field_counts[field] > 0:
accuracy = field_correct[field] / field_counts[field]
metrics["field_accuracies"][field] = accuracy
# Overall accuracy
total_fields = sum(field_counts.values())
if total_fields > 0:
metrics["overall_accuracy"] = sum(field_correct.values()) / total_fields
return metrics
@staticmethod
def evaluate_matching(predictions: List[Dict], ground_truth: List[Dict]) -> Dict:
"""Evaluate matching task performance"""
metrics = {
"score_rmse": 0,
"score_mae": 0,
"skill_matching_precision": 0,
"skill_matching_recall": 0,
"recommendation_accuracy": 0,
"total_samples": len(predictions)
}
score_errors = []
correct_recommendations = 0
all_matching_skills = []
all_pred_matching_skills = []
for pred, truth in zip(predictions, ground_truth):
# Score error
if "match_score" in pred and "match_score" in truth:
score_errors.append(abs(pred["match_score"] - truth["match_score"]))
# Recommendation accuracy
if "recommendation" in pred and "recommendation" in truth:
if pred["recommendation"].lower() == truth["recommendation"].lower():
correct_recommendations += 1
# Skill matching
if "matching_skills" in pred and "matching_skills" in truth:
all_pred_matching_skills.extend(pred.get("matching_skills", []))
all_matching_skills.extend(truth.get("matching_skills", []))
if score_errors:
metrics["score_rmse"] = np.sqrt(np.mean(np.array(score_errors)**2))
metrics["score_mae"] = np.mean(score_errors)
if len(predictions) > 0:
metrics["recommendation_accuracy"] = correct_recommendations / len(predictions)
# Skill matching metrics
if all_matching_skills or all_pred_matching_skills:
# Simple precision/recall for skills
correct_skills = len(set(all_pred_matching_skills) & set(all_matching_skills))
if all_pred_matching_skills:
metrics["skill_matching_precision"] = correct_skills / len(set(all_pred_matching_skills))
if all_matching_skills:
metrics["skill_matching_recall"] = correct_skills / len(set(all_matching_skills))
return metrics
@staticmethod
def print_metrics(metrics: Dict, task: str):
"""Pretty print metrics"""
print(f"\n{'='*50}")
print(f"EVALUATION RESULTS - {task.upper()}")
print(f"{'='*50}")
for key, value in metrics.items():
if isinstance(value, float):
print(f"{key}: {value:.4f}")
elif isinstance(value, dict):
print(f"\n{key}:")
for sub_key, sub_value in value.items():
if isinstance(sub_value, float):
print(f" {sub_key}: {sub_value:.4f}")
else:
print(f" {sub_key}: {sub_value}")
else:
print(f"{key}: {value}")
def evaluate_on_test_set(test_path: str = "data/processed/test.json",
model_path: str = "models/checkpoints/final"):
"""Evaluate model on test set"""
# Prefer package-relative import; fall back to absolute when executed as a script.
try:
from .inference import ResumeInferenceEngine
except ImportError as e:
if "attempted relative import" in str(e).lower():
from src.inference import ResumeInferenceEngine
else:
raise
def _load_json_or_jsonl(path: str):
with open(path, "r", encoding="utf-8") as f:
content = f.read().strip()
if not content:
return []
# JSON array
if content[0] == "[":
return json.loads(content)
# JSONL
rows = []
for line in content.splitlines():
line = line.strip()
if not line:
continue
rows.append(json.loads(line))
return rows
def _safe_json_loads(text: str):
try:
return json.loads(text)
except Exception:
return None
def _parse_match_score(text: str):
# Accept formats like "Match Score: 0.82" or JSON {"match_score": 82}
if not isinstance(text, str):
return None
match = re.search(r"match\s*score\s*[:=]\s*([0-9]*\.?[0-9]+)", text, flags=re.IGNORECASE)
if not match:
return None
value = float(match.group(1))
# Normalize to 0-100 if it looks like 0-1
if value <= 1.0:
value *= 100.0
return value
# Load test data (supports JSON array or JSONL)
test_data = _load_json_or_jsonl(test_path)
# Initialize engine
engine = ResumeInferenceEngine(model_path)
# Separate by task (fallback: treat everything as matching)
extraction_samples = [s for s in test_data if s.get("task") == "extraction"]
matching_samples = [s for s in test_data if s.get("task") == "matching"]
if not extraction_samples and not matching_samples:
matching_samples = list(test_data)
print(f"Evaluating on {len(extraction_samples)} extraction samples...")
print(f"Evaluating on {len(matching_samples)} matching samples...")
# Evaluate extraction
extraction_preds = []
extraction_truth = []
for sample in extraction_samples:
try:
pred = engine.extract_resume(sample["input"])
extraction_preds.append(pred)
truth = _safe_json_loads(sample.get("output", ""))
extraction_truth.append(truth if isinstance(truth, dict) else {})
except Exception as e:
print(f"Error on extraction sample: {e}")
extraction_preds.append({})
extraction_metrics = EvaluationMetrics.evaluate_extraction(extraction_preds, extraction_truth)
EvaluationMetrics.print_metrics(extraction_metrics, "extraction")
# Evaluate matching
matching_preds = []
matching_truth = []
for sample in matching_samples:
try:
input_text = sample.get("input", "")
# Try to parse the expected delimiter; otherwise treat entire input as resume text.
parts = input_text.split("\n\nJob Description:\n")
if len(parts) == 2:
resume = parts[0].replace("Resume:\n", "").strip()
job = parts[1].strip()
else:
resume = input_text.strip()
job = ""
pred = engine.match_resume_to_job(resume, job) if job else engine.extract_resume(resume)
matching_preds.append(pred)
truth_obj = _safe_json_loads(sample.get("output", ""))
if isinstance(truth_obj, dict):
if "match_score" in truth_obj and isinstance(truth_obj["match_score"], (int, float)):
# normalize to 0-100 if needed
if truth_obj["match_score"] <= 1.0:
truth_obj["match_score"] *= 100.0
matching_truth.append(truth_obj)
else:
# Fallback: parse numeric score from plain text outputs like "Match Score: 0.82"
score = _parse_match_score(sample.get("output", ""))
matching_truth.append({"match_score": score} if score is not None else {})
except Exception as e:
print(f"Error on matching sample: {e}")
matching_preds.append({})
matching_metrics = EvaluationMetrics.evaluate_matching(matching_preds, matching_truth)
EvaluationMetrics.print_metrics(matching_metrics, "matching")
# Save results
results = {
"extraction": extraction_metrics,
"matching": matching_metrics
}
os.makedirs("results", exist_ok=True)
with open("results/evaluation_results.json", "w", encoding="utf-8") as f:
json.dump(results, f, indent=2)
print("\n✅ Results saved to results/evaluation_results.json")
return extraction_metrics, matching_metrics
if __name__ == "__main__":
import argparse
import os
parser = argparse.ArgumentParser()
parser.add_argument("--test-path", default="data/processed/test.json")
parser.add_argument("--model-path", default="models/checkpoints/final")
args = parser.parse_args()
os.makedirs("results", exist_ok=True)
evaluate_on_test_set(args.test_path, args.model_path)