File size: 8,567 Bytes
31f0e50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
#!/usr/bin/env python
"""

Evaluate Scam Detector Accuracy.



Tests the current detector (keyword-based or fine-tuned) against the dataset.

Used to determine if fine-tuning is needed (Task 4.2 prerequisite).



Note: Task 4.2 states "Only if time permits and pre-trained model accuracy <85%"

"""

import json
import os
import sys
import time

# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from typing import Dict, List, Tuple

# Ensure UTF-8 output on Windows
if sys.stdout.encoding != 'utf-8':
    sys.stdout.reconfigure(encoding='utf-8')

# Dataset path
DATASET_PATH = os.path.join(
    os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
    "data",
    "scam_detection_train.jsonl"
)


def load_dataset(filepath: str) -> List[Dict]:
    """Load dataset from JSONL file."""
    samples = []
    with open(filepath, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            samples.append(json.loads(line))
    return samples


def evaluate_detector(samples: List[Dict]) -> Dict[str, float]:
    """Evaluate the ScamDetector on samples."""
    from app.models.detector import ScamDetector
    
    # Initialize detector (may use BERT if available, fallback to keyword)
    detector = ScamDetector(load_model=True)
    
    correct = 0
    total = 0
    true_positives = 0
    false_positives = 0
    true_negatives = 0
    false_negatives = 0
    
    total_time = 0.0
    
    for sample in samples:
        message = sample["message"]
        expected_label = sample["label"]  # 'scam' or 'legitimate'
        language = sample["language"]
        
        start_time = time.perf_counter()
        result = detector.detect(message, language)
        detection_time = time.perf_counter() - start_time
        total_time += detection_time
        
        predicted_scam = result["scam_detected"]
        actual_scam = (expected_label == "scam")
        
        if predicted_scam == actual_scam:
            correct += 1
            if actual_scam:
                true_positives += 1
            else:
                true_negatives += 1
        else:
            if predicted_scam:
                false_positives += 1
            else:
                false_negatives += 1
        
        total += 1
    
    # Calculate metrics
    accuracy = correct / total if total > 0 else 0
    precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    fpr = false_positives / (false_positives + true_negatives) if (false_positives + true_negatives) > 0 else 0
    avg_time = total_time / total if total > 0 else 0
    
    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "false_positive_rate": fpr,
        "true_positives": true_positives,
        "false_positives": false_positives,
        "true_negatives": true_negatives,
        "false_negatives": false_negatives,
        "total": total,
        "correct": correct,
        "avg_inference_time_ms": avg_time * 1000,
        "model_loaded": detector._model_loaded,
    }


def evaluate_by_language(samples: List[Dict]) -> Dict[str, Dict[str, float]]:
    """Evaluate detector accuracy by language."""
    from app.models.detector import ScamDetector
    
    detector = ScamDetector(load_model=True)
    
    language_results = {}
    
    for lang in ["en", "hi", "hinglish"]:
        lang_samples = [s for s in samples if s["language"] == lang]
        
        correct = 0
        total = len(lang_samples)
        
        for sample in lang_samples:
            result = detector.detect(sample["message"], sample["language"])
            predicted_scam = result["scam_detected"]
            actual_scam = (sample["label"] == "scam")
            if predicted_scam == actual_scam:
                correct += 1
        
        language_results[lang] = {
            "accuracy": correct / total if total > 0 else 0,
            "total": total,
            "correct": correct,
        }
    
    return language_results


def evaluate_by_scam_type(samples: List[Dict]) -> Dict[str, Dict[str, float]]:
    """Evaluate detector accuracy by scam type."""
    from app.models.detector import ScamDetector
    
    detector = ScamDetector(load_model=True)
    
    type_results = {}
    
    # Get unique scam types
    scam_types = set(s["scam_type"] for s in samples if s["scam_type"])
    
    for scam_type in scam_types:
        type_samples = [s for s in samples if s["scam_type"] == scam_type]
        
        correct = 0
        total = len(type_samples)
        
        for sample in type_samples:
            result = detector.detect(sample["message"], sample["language"])
            if result["scam_detected"]:  # All these samples are scams
                correct += 1
        
        type_results[scam_type] = {
            "recall": correct / total if total > 0 else 0,  # Recall for this scam type
            "total": total,
            "detected": correct,
        }
    
    return type_results


def main():
    """Main evaluation function."""
    print("=" * 60)
    print("Scam Detector Evaluation")
    print("=" * 60)
    
    # Load dataset
    print(f"\nLoading dataset: {DATASET_PATH}")
    if not os.path.exists(DATASET_PATH):
        print("[ERROR] Dataset not found. Run scripts/generate_dataset.py first.")
        return 1
    
    samples = load_dataset(DATASET_PATH)
    print(f"Loaded {len(samples)} samples")
    
    # Overall evaluation
    print(f"\n{'=' * 60}")
    print("Overall Evaluation")
    print(f"{'=' * 60}")
    
    metrics = evaluate_detector(samples)
    
    print(f"\nDetector Mode: {'BERT + Keyword' if metrics['model_loaded'] else 'Keyword-only'}")
    print(f"\nResults:")
    print(f"  Accuracy: {metrics['accuracy']:.4f} ({metrics['accuracy']*100:.1f}%)")
    print(f"  Precision: {metrics['precision']:.4f}")
    print(f"  Recall: {metrics['recall']:.4f}")
    print(f"  F1 Score: {metrics['f1']:.4f}")
    print(f"  False Positive Rate: {metrics['false_positive_rate']:.4f}")
    print(f"  Avg Inference Time: {metrics['avg_inference_time_ms']:.2f}ms")
    
    print(f"\nConfusion Matrix:")
    print(f"  True Positives: {metrics['true_positives']}")
    print(f"  False Positives: {metrics['false_positives']}")
    print(f"  True Negatives: {metrics['true_negatives']}")
    print(f"  False Negatives: {metrics['false_negatives']}")
    
    # By language
    print(f"\n{'=' * 60}")
    print("Accuracy by Language")
    print(f"{'=' * 60}")
    
    lang_results = evaluate_by_language(samples)
    for lang, result in lang_results.items():
        print(f"  {lang}: {result['accuracy']:.1%} ({result['correct']}/{result['total']})")
    
    # By scam type
    print(f"\n{'=' * 60}")
    print("Recall by Scam Type")
    print(f"{'=' * 60}")
    
    type_results = evaluate_by_scam_type(samples)
    for scam_type, result in sorted(type_results.items()):
        print(f"  {scam_type}: {result['recall']:.1%} ({result['detected']}/{result['total']})")
    
    # Task 4.2 Prerequisite Check
    print(f"\n{'=' * 60}")
    print("Task 4.2 Prerequisite Check")
    print(f"{'=' * 60}")
    
    print(f"\nNote: Task 4.2 states 'Only if pre-trained model accuracy <85%'")
    print(f"Current Accuracy: {metrics['accuracy']*100:.1f}%")
    
    if metrics['accuracy'] < 0.85:
        print("\n[RECOMMENDED] Fine-tuning is recommended (accuracy <85%)")
        print("Run: python scripts/fine_tune_indicbert.py")
    else:
        print("\n[OK] Current accuracy is sufficient (>=85%)")
        print("Fine-tuning is optional but may still improve results.")
    
    # AC Check
    print(f"\n{'=' * 60}")
    print("Acceptance Criteria Status")
    print(f"{'=' * 60}")
    
    ac1_pass = metrics['accuracy'] >= 0.90
    print(f"\nAC (Accuracy >90%): {metrics['accuracy']*100:.1f}% - {'PASS' if ac1_pass else 'NEEDS IMPROVEMENT'}")
    
    return 0


if __name__ == "__main__":
    sys.exit(main())