|
|
|
|
|
"""
|
|
|
Generate CSV file with simple metrics for each model.
|
|
|
|
|
|
Reads tactic_counts_summary.json and generates a CSV file containing
|
|
|
F1, accuracy, precision, recall, and other metrics for each model.
|
|
|
|
|
|
Usage:
|
|
|
python generate_metrics_csv.py [--input INPUT_PATH] [--output OUTPUT_PATH]
|
|
|
"""
|
|
|
import argparse
|
|
|
import json
|
|
|
import csv
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, List, Any
|
|
|
from datetime import datetime
|
|
|
import statistics
|
|
|
|
|
|
|
|
|
class MetricsCSVGenerator:
|
|
|
"""Generates CSV file with simple metrics for each model"""
|
|
|
|
|
|
def __init__(self, tactic_counts_file: Path):
|
|
|
self.tactic_counts_file = tactic_counts_file
|
|
|
self.tactic_data = []
|
|
|
self.load_tactic_counts()
|
|
|
|
|
|
def load_tactic_counts(self):
|
|
|
"""Load tactic counts summary data"""
|
|
|
if not self.tactic_counts_file.exists():
|
|
|
raise FileNotFoundError(f"Tactic counts file not found: {self.tactic_counts_file}")
|
|
|
|
|
|
data = json.loads(self.tactic_counts_file.read_text(encoding='utf-8'))
|
|
|
self.tactic_data = data.get('results', [])
|
|
|
print(f"[INFO] Loaded {len(self.tactic_data)} tactic analysis results")
|
|
|
|
|
|
def group_by_model(self) -> Dict[str, List[Dict]]:
|
|
|
"""Group tactic data by model"""
|
|
|
models = {}
|
|
|
for item in self.tactic_data:
|
|
|
model = item['model']
|
|
|
if model not in models:
|
|
|
models[model] = []
|
|
|
models[model].append(item)
|
|
|
return models
|
|
|
|
|
|
def calculate_model_metrics(self, model_data: List[Dict]) -> Dict[str, Any]:
|
|
|
"""Calculate comprehensive metrics for a single model"""
|
|
|
if not model_data:
|
|
|
return self._empty_metrics()
|
|
|
|
|
|
|
|
|
tactic_aggregates = {}
|
|
|
for item in model_data:
|
|
|
tactic = item['tactic']
|
|
|
if tactic not in tactic_aggregates:
|
|
|
tactic_aggregates[tactic] = {
|
|
|
'total_files': 0,
|
|
|
'files_detected': 0,
|
|
|
'total_events': 0,
|
|
|
'true_positives': 0,
|
|
|
'false_positives': 0,
|
|
|
'false_negatives': 0
|
|
|
}
|
|
|
tactic_aggregates[tactic]['total_files'] += 1
|
|
|
tactic_aggregates[tactic]['files_detected'] += item['tactic_detected']
|
|
|
tactic_aggregates[tactic]['total_events'] += item['total_abnormal_events_detected']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if item['tactic_detected'] == 1:
|
|
|
tactic_aggregates[tactic]['true_positives'] += 1
|
|
|
else:
|
|
|
if item['total_abnormal_events_detected'] > 0:
|
|
|
tactic_aggregates[tactic]['false_negatives'] += 1
|
|
|
else:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
total_files = sum(agg['total_files'] for agg in tactic_aggregates.values())
|
|
|
total_detected = sum(agg['files_detected'] for agg in tactic_aggregates.values())
|
|
|
total_events = sum(agg['total_events'] for agg in tactic_aggregates.values())
|
|
|
|
|
|
|
|
|
detection_rate = (total_detected / total_files * 100) if total_files > 0 else 0.0
|
|
|
|
|
|
|
|
|
total_tactics = len(tactic_aggregates)
|
|
|
tactics_with_detection = sum(1 for agg in tactic_aggregates.values() if agg['files_detected'] > 0)
|
|
|
coverage_percent = (tactics_with_detection / total_tactics * 100) if total_tactics > 0 else 0.0
|
|
|
|
|
|
|
|
|
accuracy = (total_detected / total_files) if total_files > 0 else 0.0
|
|
|
|
|
|
|
|
|
precision_scores = []
|
|
|
recall_scores = []
|
|
|
f1_scores = []
|
|
|
|
|
|
for tactic, agg in tactic_aggregates.items():
|
|
|
tp = agg['true_positives']
|
|
|
fp = agg['false_positives']
|
|
|
fn = agg['false_negatives']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
precision = (tp / agg['total_files']) if agg['total_files'] > 0 else 0.0
|
|
|
|
|
|
|
|
|
recall = (tp / agg['total_files']) if agg['total_files'] > 0 else 0.0
|
|
|
|
|
|
|
|
|
if precision + recall > 0:
|
|
|
f1 = 2 * (precision * recall) / (precision + recall)
|
|
|
else:
|
|
|
f1 = 0.0
|
|
|
|
|
|
precision_scores.append(precision)
|
|
|
recall_scores.append(recall)
|
|
|
f1_scores.append(f1)
|
|
|
|
|
|
|
|
|
avg_precision = statistics.mean(precision_scores) if precision_scores else 0.0
|
|
|
avg_recall = statistics.mean(recall_scores) if recall_scores else 0.0
|
|
|
avg_f1 = statistics.mean(f1_scores) if f1_scores else 0.0
|
|
|
|
|
|
|
|
|
effectiveness_score = (
|
|
|
detection_rate * 0.4 +
|
|
|
coverage_percent * 0.3 +
|
|
|
avg_f1 * 100 * 0.3
|
|
|
)
|
|
|
|
|
|
|
|
|
if effectiveness_score >= 80:
|
|
|
grade = 'EXCELLENT'
|
|
|
elif effectiveness_score >= 60:
|
|
|
grade = 'GOOD'
|
|
|
elif effectiveness_score >= 40:
|
|
|
grade = 'FAIR'
|
|
|
elif effectiveness_score >= 20:
|
|
|
grade = 'POOR'
|
|
|
else:
|
|
|
grade = 'CRITICAL'
|
|
|
|
|
|
return {
|
|
|
'model_name': model_data[0]['model'] if model_data else 'unknown',
|
|
|
'total_files_analyzed': total_files,
|
|
|
'total_files_detected': total_detected,
|
|
|
'total_files_missed': total_files - total_detected,
|
|
|
'total_abnormal_events_detected': total_events,
|
|
|
'total_tactics_tested': total_tactics,
|
|
|
'tactics_with_detection': tactics_with_detection,
|
|
|
'tactics_with_zero_detection': total_tactics - tactics_with_detection,
|
|
|
'detection_rate_percent': detection_rate,
|
|
|
'coverage_percent': coverage_percent,
|
|
|
'accuracy': accuracy,
|
|
|
'precision': avg_precision,
|
|
|
'recall': avg_recall,
|
|
|
'f1_score': avg_f1,
|
|
|
'effectiveness_score': effectiveness_score,
|
|
|
'grade': grade
|
|
|
}
|
|
|
|
|
|
def _empty_metrics(self) -> Dict[str, Any]:
|
|
|
"""Return empty metrics structure"""
|
|
|
return {
|
|
|
'model_name': 'unknown',
|
|
|
'total_files_analyzed': 0,
|
|
|
'total_files_detected': 0,
|
|
|
'total_files_missed': 0,
|
|
|
'total_abnormal_events_detected': 0,
|
|
|
'total_tactics_tested': 0,
|
|
|
'tactics_with_detection': 0,
|
|
|
'tactics_with_zero_detection': 0,
|
|
|
'detection_rate_percent': 0.0,
|
|
|
'coverage_percent': 0.0,
|
|
|
'accuracy': 0.0,
|
|
|
'precision': 0.0,
|
|
|
'recall': 0.0,
|
|
|
'f1_score': 0.0,
|
|
|
'effectiveness_score': 0.0,
|
|
|
'grade': 'CRITICAL'
|
|
|
}
|
|
|
|
|
|
def generate_csv(self, output_path: Path) -> bool:
|
|
|
"""Generate CSV file with metrics for all models"""
|
|
|
print("\n" + "="*80)
|
|
|
print("GENERATING METRICS CSV")
|
|
|
print("="*80 + "\n")
|
|
|
|
|
|
|
|
|
models_data = self.group_by_model()
|
|
|
|
|
|
if not models_data:
|
|
|
print("[WARNING] No model data found")
|
|
|
return False
|
|
|
|
|
|
print(f"Found {len(models_data)} models: {', '.join(models_data.keys())}")
|
|
|
|
|
|
|
|
|
all_metrics = []
|
|
|
for model_name, model_data in models_data.items():
|
|
|
print(f"Calculating metrics for {model_name} ({len(model_data)} files)...")
|
|
|
metrics = self.calculate_model_metrics(model_data)
|
|
|
all_metrics.append(metrics)
|
|
|
|
|
|
|
|
|
fieldnames = [
|
|
|
'model_name',
|
|
|
'total_files_analyzed',
|
|
|
'total_files_detected',
|
|
|
'total_files_missed',
|
|
|
'total_abnormal_events_detected',
|
|
|
'total_tactics_tested',
|
|
|
'tactics_with_detection',
|
|
|
'tactics_with_zero_detection',
|
|
|
'detection_rate_percent',
|
|
|
'coverage_percent',
|
|
|
'accuracy',
|
|
|
'precision',
|
|
|
'recall',
|
|
|
'f1_score',
|
|
|
'effectiveness_score',
|
|
|
'grade'
|
|
|
]
|
|
|
|
|
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
|
|
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
|
|
writer.writeheader()
|
|
|
|
|
|
for metrics in all_metrics:
|
|
|
|
|
|
row = {}
|
|
|
for field in fieldnames:
|
|
|
value = metrics.get(field, 0)
|
|
|
if isinstance(value, float):
|
|
|
row[field] = round(value, 4)
|
|
|
else:
|
|
|
row[field] = value
|
|
|
writer.writerow(row)
|
|
|
|
|
|
print(f"\nCSV file generated: {output_path}")
|
|
|
print(f"Models included: {len(all_metrics)}")
|
|
|
|
|
|
|
|
|
print("\nSummary:")
|
|
|
for metrics in all_metrics:
|
|
|
print(f" {metrics['model_name']}: F1={metrics['f1_score']:.3f}, "
|
|
|
f"Accuracy={metrics['accuracy']:.3f}, "
|
|
|
f"Precision={metrics['precision']:.3f}, "
|
|
|
f"Recall={metrics['recall']:.3f}, "
|
|
|
f"Grade={metrics['grade']}")
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
def main():
|
|
|
parser = argparse.ArgumentParser(
|
|
|
description="Generate CSV file with simple metrics for each model"
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"--input",
|
|
|
default="full_pipeline_evaluation/results/tactic_counts_summary.json",
|
|
|
help="Path to tactic_counts_summary.json"
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"--output",
|
|
|
default="full_pipeline_evaluation/results/model_metrics.csv",
|
|
|
help="Output file for CSV metrics"
|
|
|
)
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
input_path = Path(args.input)
|
|
|
output_path = Path(args.output)
|
|
|
|
|
|
if not input_path.exists():
|
|
|
print(f"[ERROR] Input file not found: {input_path}")
|
|
|
print("Run count_tactics.py first to generate tactic counts")
|
|
|
return 1
|
|
|
|
|
|
|
|
|
generator = MetricsCSVGenerator(input_path)
|
|
|
success = generator.generate_csv(output_path)
|
|
|
|
|
|
if not success:
|
|
|
print("[ERROR] Failed to generate CSV file")
|
|
|
return 1
|
|
|
|
|
|
print("\n" + "="*80)
|
|
|
print("CSV GENERATION COMPLETE")
|
|
|
print("="*80 + "\n")
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
exit(main())
|
|
|
|