File size: 11,760 Bytes
9e3d618
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
#!/usr/bin/env python3
"""

Generate CSV file with simple metrics for each model.



Reads tactic_counts_summary.json and generates a CSV file containing

F1, accuracy, precision, recall, and other metrics for each model.



Usage:

    python generate_metrics_csv.py [--input INPUT_PATH] [--output OUTPUT_PATH]

"""
import argparse
import json
import csv
from pathlib import Path
from typing import Dict, List, Any
from datetime import datetime
import statistics


class MetricsCSVGenerator:
    """Generates CSV file with simple metrics for each model"""

    def __init__(self, tactic_counts_file: Path):
        self.tactic_counts_file = tactic_counts_file
        self.tactic_data = []
        self.load_tactic_counts()

    def load_tactic_counts(self):
        """Load tactic counts summary data"""
        if not self.tactic_counts_file.exists():
            raise FileNotFoundError(
                f"Tactic counts file not found: {self.tactic_counts_file}"
            )

        data = json.loads(self.tactic_counts_file.read_text(encoding="utf-8"))
        self.tactic_data = data.get("results", [])
        print(f"[INFO] Loaded {len(self.tactic_data)} tactic analysis results")

    def group_by_model(self) -> Dict[str, List[Dict]]:
        """Group tactic data by model"""
        models = {}
        for item in self.tactic_data:
            model = item["model"]
            if model not in models:
                models[model] = []
            models[model].append(item)
        return models

    def calculate_model_metrics(self, model_data: List[Dict]) -> Dict[str, Any]:
        """Calculate comprehensive metrics for a single model"""
        if not model_data:
            return self._empty_metrics()

        # Aggregate by tactic for this model
        tactic_aggregates = {}
        for item in model_data:
            tactic = item["tactic"]
            if tactic not in tactic_aggregates:
                tactic_aggregates[tactic] = {
                    "total_files": 0,
                    "files_detected": 0,
                    "total_events": 0,
                    "true_positives": 0,
                    "false_positives": 0,
                    "false_negatives": 0,
                }
            tactic_aggregates[tactic]["total_files"] += 1
            tactic_aggregates[tactic]["files_detected"] += item["tactic_detected"]
            tactic_aggregates[tactic]["total_events"] += item[
                "total_abnormal_events_detected"
            ]

            # For binary classification metrics, we consider:
            # - True Positive: tactic_detected = 1 (correctly detected)
            # - False Positive: tactic_detected = 0 but there were events (missed detection)
            # - False Negative: tactic_detected = 0 (missed detection)
            # - True Negative: tactic_detected = 0 and no events (correctly identified as normal)

            if item["tactic_detected"] == 1:
                tactic_aggregates[tactic]["true_positives"] += 1
            else:
                if item["total_abnormal_events_detected"] > 0:
                    tactic_aggregates[tactic]["false_negatives"] += 1
                else:
                    # This is actually a true negative (correctly identified as normal)
                    pass

        # Calculate overall metrics
        total_files = sum(agg["total_files"] for agg in tactic_aggregates.values())
        total_detected = sum(
            agg["files_detected"] for agg in tactic_aggregates.values()
        )
        total_events = sum(agg["total_events"] for agg in tactic_aggregates.values())

        # Calculate detection rate (recall)
        detection_rate = (
            (total_detected / total_files * 100) if total_files > 0 else 0.0
        )

        # Calculate coverage
        total_tactics = len(tactic_aggregates)
        tactics_with_detection = sum(
            1 for agg in tactic_aggregates.values() if agg["files_detected"] > 0
        )
        coverage_percent = (
            (tactics_with_detection / total_tactics * 100) if total_tactics > 0 else 0.0
        )

        # Calculate accuracy (overall correctness)
        accuracy = (total_detected / total_files) if total_files > 0 else 0.0

        # Calculate precision, recall, and F1 for each tactic, then average
        precision_scores = []
        recall_scores = []
        f1_scores = []

        for tactic, agg in tactic_aggregates.items():
            tp = agg["true_positives"]
            fp = agg["false_positives"]
            fn = agg["false_negatives"]

            # Precision = TP / (TP + FP)
            # For our case, FP is when we detect but shouldn't have (hard to measure from this data)
            # So we'll use a simplified approach: precision = detection rate
            precision = (tp / agg["total_files"]) if agg["total_files"] > 0 else 0.0

            # Recall = TP / (TP + FN) = detection rate
            recall = (tp / agg["total_files"]) if agg["total_files"] > 0 else 0.0

            # F1 = 2 * (precision * recall) / (precision + recall)
            if precision + recall > 0:
                f1 = 2 * (precision * recall) / (precision + recall)
            else:
                f1 = 0.0

            precision_scores.append(precision)
            recall_scores.append(recall)
            f1_scores.append(f1)

        # Calculate averages
        avg_precision = statistics.mean(precision_scores) if precision_scores else 0.0
        avg_recall = statistics.mean(recall_scores) if recall_scores else 0.0
        avg_f1 = statistics.mean(f1_scores) if f1_scores else 0.0

        # Calculate effectiveness score (weighted combination)
        effectiveness_score = (
            detection_rate * 0.4 + coverage_percent * 0.3 + avg_f1 * 100 * 0.3
        )

        # Grade the model
        if effectiveness_score >= 80:
            grade = "EXCELLENT"
        elif effectiveness_score >= 60:
            grade = "GOOD"
        elif effectiveness_score >= 40:
            grade = "FAIR"
        elif effectiveness_score >= 20:
            grade = "POOR"
        else:
            grade = "CRITICAL"

        return {
            "model_name": model_data[0]["model"] if model_data else "unknown",
            "total_files_analyzed": total_files,
            "total_files_detected": total_detected,
            "total_files_missed": total_files - total_detected,
            "total_abnormal_events_detected": total_events,
            "total_tactics_tested": total_tactics,
            "tactics_with_detection": tactics_with_detection,
            "tactics_with_zero_detection": total_tactics - tactics_with_detection,
            "detection_rate_percent": detection_rate,
            "coverage_percent": coverage_percent,
            "accuracy": accuracy,
            "precision": avg_precision,
            "recall": avg_recall,
            "f1_score": avg_f1,
            "effectiveness_score": effectiveness_score,
            "grade": grade,
        }

    def _empty_metrics(self) -> Dict[str, Any]:
        """Return empty metrics structure"""
        return {
            "model_name": "unknown",
            "total_files_analyzed": 0,
            "total_files_detected": 0,
            "total_files_missed": 0,
            "total_abnormal_events_detected": 0,
            "total_tactics_tested": 0,
            "tactics_with_detection": 0,
            "tactics_with_zero_detection": 0,
            "detection_rate_percent": 0.0,
            "coverage_percent": 0.0,
            "accuracy": 0.0,
            "precision": 0.0,
            "recall": 0.0,
            "f1_score": 0.0,
            "effectiveness_score": 0.0,
            "grade": "CRITICAL",
        }

    def generate_csv(self, output_path: Path) -> bool:
        """Generate CSV file with metrics for all models"""
        print("\n" + "=" * 80)
        print("GENERATING METRICS CSV")
        print("=" * 80 + "\n")

        # Group data by model
        models_data = self.group_by_model()

        if not models_data:
            print("[WARNING] No model data found")
            return False

        print(f"Found {len(models_data)} models: {', '.join(models_data.keys())}")

        # Calculate metrics for each model
        all_metrics = []
        for model_name, model_data in models_data.items():
            print(f"Calculating metrics for {model_name} ({len(model_data)} files)...")
            metrics = self.calculate_model_metrics(model_data)
            all_metrics.append(metrics)

        # Define CSV columns
        fieldnames = [
            "model_name",
            "total_files_analyzed",
            "total_files_detected",
            "total_files_missed",
            "total_abnormal_events_detected",
            "total_tactics_tested",
            "tactics_with_detection",
            "tactics_with_zero_detection",
            "detection_rate_percent",
            "coverage_percent",
            "accuracy",
            "precision",
            "recall",
            "f1_score",
            "effectiveness_score",
            "grade",
        ]

        # Write CSV file
        output_path.parent.mkdir(parents=True, exist_ok=True)

        with open(output_path, "w", newline="", encoding="utf-8") as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()

            for metrics in all_metrics:
                # Convert all values to appropriate types for CSV
                row = {}
                for field in fieldnames:
                    value = metrics.get(field, 0)
                    if isinstance(value, float):
                        row[field] = round(value, 4)
                    else:
                        row[field] = value
                writer.writerow(row)

        print(f"\nCSV file generated: {output_path}")
        print(f"Models included: {len(all_metrics)}")

        # Display summary
        print("\nSummary:")
        for metrics in all_metrics:
            print(
                f"  {metrics['model_name']}: F1={metrics['f1_score']:.3f}, "
                f"Accuracy={metrics['accuracy']:.3f}, "
                f"Precision={metrics['precision']:.3f}, "
                f"Recall={metrics['recall']:.3f}, "
                f"Grade={metrics['grade']}"
            )

        return True


def main():
    parser = argparse.ArgumentParser(
        description="Generate CSV file with simple metrics for each model"
    )
    parser.add_argument(
        "--input",
        default="evaluation/full_pipeline/results/tactic_counts_summary.json",
        help="Path to tactic_counts_summary.json",
    )
    parser.add_argument(
        "--output",
        default="evaluation/full_pipeline/results/model_metrics.csv",
        help="Output file for CSV metrics",
    )
    args = parser.parse_args()

    input_path = Path(args.input)
    output_path = Path(args.output)

    if not input_path.exists():
        print(f"[ERROR] Input file not found: {input_path}")
        print("Run count_tactics.py first to generate tactic counts")
        return 1

    # Generate CSV
    generator = MetricsCSVGenerator(input_path)
    success = generator.generate_csv(output_path)

    if not success:
        print("[ERROR] Failed to generate CSV file")
        return 1

    print("\n" + "=" * 80)
    print("CSV GENERATION COMPLETE")
    print("=" * 80 + "\n")

    return 0


if __name__ == "__main__":
    exit(main())