|
|
| """
|
| Evaluate Scam Detector Accuracy.
|
|
|
| Tests the current detector (keyword-based or fine-tuned) against the dataset.
|
| Used to determine if fine-tuning is needed (Task 4.2 prerequisite).
|
|
|
| Note: Task 4.2 states "Only if time permits and pre-trained model accuracy <85%"
|
| """
|
|
|
| import json
|
| import os
|
| import sys
|
| import time
|
|
|
|
|
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
| from typing import Dict, List, Tuple
|
|
|
|
|
| if sys.stdout.encoding != 'utf-8':
|
| sys.stdout.reconfigure(encoding='utf-8')
|
|
|
|
|
| DATASET_PATH = os.path.join(
|
| os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
| "data",
|
| "scam_detection_train.jsonl"
|
| )
|
|
|
|
|
| def load_dataset(filepath: str) -> List[Dict]:
|
| """Load dataset from JSONL file."""
|
| samples = []
|
| with open(filepath, "r", encoding="utf-8") as f:
|
| for line in f:
|
| line = line.strip()
|
| if not line:
|
| continue
|
| samples.append(json.loads(line))
|
| return samples
|
|
|
|
|
| def evaluate_detector(samples: List[Dict]) -> Dict[str, float]:
|
| """Evaluate the ScamDetector on samples."""
|
| from app.models.detector import ScamDetector
|
|
|
|
|
| detector = ScamDetector(load_model=True)
|
|
|
| correct = 0
|
| total = 0
|
| true_positives = 0
|
| false_positives = 0
|
| true_negatives = 0
|
| false_negatives = 0
|
|
|
| total_time = 0.0
|
|
|
| for sample in samples:
|
| message = sample["message"]
|
| expected_label = sample["label"]
|
| language = sample["language"]
|
|
|
| start_time = time.perf_counter()
|
| result = detector.detect(message, language)
|
| detection_time = time.perf_counter() - start_time
|
| total_time += detection_time
|
|
|
| predicted_scam = result["scam_detected"]
|
| actual_scam = (expected_label == "scam")
|
|
|
| if predicted_scam == actual_scam:
|
| correct += 1
|
| if actual_scam:
|
| true_positives += 1
|
| else:
|
| true_negatives += 1
|
| else:
|
| if predicted_scam:
|
| false_positives += 1
|
| else:
|
| false_negatives += 1
|
|
|
| total += 1
|
|
|
|
|
| accuracy = correct / total if total > 0 else 0
|
| precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
|
| recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
|
| f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
|
| fpr = false_positives / (false_positives + true_negatives) if (false_positives + true_negatives) > 0 else 0
|
| avg_time = total_time / total if total > 0 else 0
|
|
|
| return {
|
| "accuracy": accuracy,
|
| "precision": precision,
|
| "recall": recall,
|
| "f1": f1,
|
| "false_positive_rate": fpr,
|
| "true_positives": true_positives,
|
| "false_positives": false_positives,
|
| "true_negatives": true_negatives,
|
| "false_negatives": false_negatives,
|
| "total": total,
|
| "correct": correct,
|
| "avg_inference_time_ms": avg_time * 1000,
|
| "model_loaded": detector._model_loaded,
|
| }
|
|
|
|
|
| def evaluate_by_language(samples: List[Dict]) -> Dict[str, Dict[str, float]]:
|
| """Evaluate detector accuracy by language."""
|
| from app.models.detector import ScamDetector
|
|
|
| detector = ScamDetector(load_model=True)
|
|
|
| language_results = {}
|
|
|
| for lang in ["en", "hi", "hinglish"]:
|
| lang_samples = [s for s in samples if s["language"] == lang]
|
|
|
| correct = 0
|
| total = len(lang_samples)
|
|
|
| for sample in lang_samples:
|
| result = detector.detect(sample["message"], sample["language"])
|
| predicted_scam = result["scam_detected"]
|
| actual_scam = (sample["label"] == "scam")
|
| if predicted_scam == actual_scam:
|
| correct += 1
|
|
|
| language_results[lang] = {
|
| "accuracy": correct / total if total > 0 else 0,
|
| "total": total,
|
| "correct": correct,
|
| }
|
|
|
| return language_results
|
|
|
|
|
| def evaluate_by_scam_type(samples: List[Dict]) -> Dict[str, Dict[str, float]]:
|
| """Evaluate detector accuracy by scam type."""
|
| from app.models.detector import ScamDetector
|
|
|
| detector = ScamDetector(load_model=True)
|
|
|
| type_results = {}
|
|
|
|
|
| scam_types = set(s["scam_type"] for s in samples if s["scam_type"])
|
|
|
| for scam_type in scam_types:
|
| type_samples = [s for s in samples if s["scam_type"] == scam_type]
|
|
|
| correct = 0
|
| total = len(type_samples)
|
|
|
| for sample in type_samples:
|
| result = detector.detect(sample["message"], sample["language"])
|
| if result["scam_detected"]:
|
| correct += 1
|
|
|
| type_results[scam_type] = {
|
| "recall": correct / total if total > 0 else 0,
|
| "total": total,
|
| "detected": correct,
|
| }
|
|
|
| return type_results
|
|
|
|
|
| def main():
|
| """Main evaluation function."""
|
| print("=" * 60)
|
| print("Scam Detector Evaluation")
|
| print("=" * 60)
|
|
|
|
|
| print(f"\nLoading dataset: {DATASET_PATH}")
|
| if not os.path.exists(DATASET_PATH):
|
| print("[ERROR] Dataset not found. Run scripts/generate_dataset.py first.")
|
| return 1
|
|
|
| samples = load_dataset(DATASET_PATH)
|
| print(f"Loaded {len(samples)} samples")
|
|
|
|
|
| print(f"\n{'=' * 60}")
|
| print("Overall Evaluation")
|
| print(f"{'=' * 60}")
|
|
|
| metrics = evaluate_detector(samples)
|
|
|
| print(f"\nDetector Mode: {'BERT + Keyword' if metrics['model_loaded'] else 'Keyword-only'}")
|
| print(f"\nResults:")
|
| print(f" Accuracy: {metrics['accuracy']:.4f} ({metrics['accuracy']*100:.1f}%)")
|
| print(f" Precision: {metrics['precision']:.4f}")
|
| print(f" Recall: {metrics['recall']:.4f}")
|
| print(f" F1 Score: {metrics['f1']:.4f}")
|
| print(f" False Positive Rate: {metrics['false_positive_rate']:.4f}")
|
| print(f" Avg Inference Time: {metrics['avg_inference_time_ms']:.2f}ms")
|
|
|
| print(f"\nConfusion Matrix:")
|
| print(f" True Positives: {metrics['true_positives']}")
|
| print(f" False Positives: {metrics['false_positives']}")
|
| print(f" True Negatives: {metrics['true_negatives']}")
|
| print(f" False Negatives: {metrics['false_negatives']}")
|
|
|
|
|
| print(f"\n{'=' * 60}")
|
| print("Accuracy by Language")
|
| print(f"{'=' * 60}")
|
|
|
| lang_results = evaluate_by_language(samples)
|
| for lang, result in lang_results.items():
|
| print(f" {lang}: {result['accuracy']:.1%} ({result['correct']}/{result['total']})")
|
|
|
|
|
| print(f"\n{'=' * 60}")
|
| print("Recall by Scam Type")
|
| print(f"{'=' * 60}")
|
|
|
| type_results = evaluate_by_scam_type(samples)
|
| for scam_type, result in sorted(type_results.items()):
|
| print(f" {scam_type}: {result['recall']:.1%} ({result['detected']}/{result['total']})")
|
|
|
|
|
| print(f"\n{'=' * 60}")
|
| print("Task 4.2 Prerequisite Check")
|
| print(f"{'=' * 60}")
|
|
|
| print(f"\nNote: Task 4.2 states 'Only if pre-trained model accuracy <85%'")
|
| print(f"Current Accuracy: {metrics['accuracy']*100:.1f}%")
|
|
|
| if metrics['accuracy'] < 0.85:
|
| print("\n[RECOMMENDED] Fine-tuning is recommended (accuracy <85%)")
|
| print("Run: python scripts/fine_tune_indicbert.py")
|
| else:
|
| print("\n[OK] Current accuracy is sufficient (>=85%)")
|
| print("Fine-tuning is optional but may still improve results.")
|
|
|
|
|
| print(f"\n{'=' * 60}")
|
| print("Acceptance Criteria Status")
|
| print(f"{'=' * 60}")
|
|
|
| ac1_pass = metrics['accuracy'] >= 0.90
|
| print(f"\nAC (Accuracy >90%): {metrics['accuracy']*100:.1f}% - {'PASS' if ac1_pass else 'NEEDS IMPROVEMENT'}")
|
|
|
| return 0
|
|
|
|
|
| if __name__ == "__main__":
|
| sys.exit(main())
|
|
|