minhan6559's picture
Upload 102 files
9e3d618 verified
#!/usr/bin/env python3
"""
Compare performance metrics across different models.
Reads tactic_counts_summary.json and generates a comparison report
showing detection rates, coverage, accuracy, and effectiveness for each model.
Usage:
python compare_models.py [--input INPUT_PATH] [--output OUTPUT_PATH]
"""
import argparse
import json
from pathlib import Path
from typing import Dict, List, Any
from datetime import datetime
import statistics
class ModelComparator:
"""Compares performance metrics across different models"""
def __init__(self, tactic_counts_file: Path):
self.tactic_counts_file = tactic_counts_file
self.tactic_data = []
self.load_tactic_counts()
def load_tactic_counts(self):
"""Load tactic counts summary data"""
if not self.tactic_counts_file.exists():
raise FileNotFoundError(
f"Tactic counts file not found: {self.tactic_counts_file}"
)
data = json.loads(self.tactic_counts_file.read_text(encoding="utf-8"))
self.tactic_data = data.get("results", [])
print(f"[INFO] Loaded {len(self.tactic_data)} tactic analysis results")
def group_by_model(self) -> Dict[str, List[Dict]]:
"""Group tactic data by model"""
models = {}
for item in self.tactic_data:
model = item["model"]
if model not in models:
models[model] = []
models[model].append(item)
return models
def calculate_model_metrics(self, model_data: List[Dict]) -> Dict[str, Any]:
"""Calculate comprehensive metrics for a single model"""
if not model_data:
return self._empty_metrics()
# Aggregate by tactic for this model
tactic_aggregates = {}
for item in model_data:
tactic = item["tactic"]
if tactic not in tactic_aggregates:
tactic_aggregates[tactic] = {
"total_files": 0,
"files_detected": 0,
"total_events": 0,
}
tactic_aggregates[tactic]["total_files"] += 1
tactic_aggregates[tactic]["files_detected"] += item["tactic_detected"]
tactic_aggregates[tactic]["total_events"] += item[
"total_abnormal_events_detected"
]
# Calculate detection rate
total_files = sum(agg["total_files"] for agg in tactic_aggregates.values())
total_detected = sum(
agg["files_detected"] for agg in tactic_aggregates.values()
)
total_events = sum(agg["total_events"] for agg in tactic_aggregates.values())
detection_rate = (
(total_detected / total_files * 100) if total_files > 0 else 0.0
)
# Calculate coverage
total_tactics = len(tactic_aggregates)
tactics_with_detection = sum(
1 for agg in tactic_aggregates.values() if agg["files_detected"] > 0
)
coverage_percent = (
(tactics_with_detection / total_tactics * 100) if total_tactics > 0 else 0.0
)
# Calculate accuracy
accuracy_scores = []
for tactic, agg in tactic_aggregates.items():
if agg["total_files"] > 0:
accuracy = agg["files_detected"] / agg["total_files"]
accuracy_scores.append(accuracy)
avg_accuracy = statistics.mean(accuracy_scores) if accuracy_scores else 0.0
# Calculate effectiveness
effectiveness_score = (
detection_rate * 0.4 + coverage_percent * 0.3 + avg_accuracy * 100 * 0.3
)
# Grade the model
if effectiveness_score >= 80:
grade = "EXCELLENT"
elif effectiveness_score >= 60:
grade = "GOOD"
elif effectiveness_score >= 40:
grade = "FAIR"
elif effectiveness_score >= 20:
grade = "POOR"
else:
grade = "CRITICAL"
# Per-tactic breakdown
per_tactic_detection = []
for tactic, agg in sorted(tactic_aggregates.items()):
files = agg["total_files"]
detected = agg["files_detected"]
events = agg["total_events"]
tactic_detection_rate = (detected / files * 100) if files > 0 else 0.0
per_tactic_detection.append(
{
"tactic": tactic,
"total_files": files,
"files_detected": detected,
"files_missed": files - detected,
"total_abnormal_events_detected": events,
"detection_rate_percent": tactic_detection_rate,
"status": (
"GOOD"
if tactic_detection_rate >= 50
else ("POOR" if tactic_detection_rate > 0 else "NONE")
),
}
)
return {
"model_name": model_data[0]["model"] if model_data else "unknown",
"total_files_analyzed": total_files,
"total_files_detected": total_detected,
"total_files_missed": total_files - total_detected,
"total_abnormal_events_detected": total_events,
"total_tactics_tested": total_tactics,
"detection_rate_percent": detection_rate,
"coverage_percent": coverage_percent,
"average_accuracy_score": avg_accuracy,
"effectiveness_score": effectiveness_score,
"grade": grade,
"per_tactic_detection": per_tactic_detection,
"tactics_with_detection": tactics_with_detection,
"tactics_with_zero_detection": total_tactics - tactics_with_detection,
}
def _empty_metrics(self) -> Dict[str, Any]:
"""Return empty metrics structure"""
return {
"model_name": "unknown",
"total_files_analyzed": 0,
"total_files_detected": 0,
"total_files_missed": 0,
"total_abnormal_events_detected": 0,
"total_tactics_tested": 0,
"detection_rate_percent": 0.0,
"coverage_percent": 0.0,
"average_accuracy_score": 0.0,
"effectiveness_score": 0.0,
"grade": "CRITICAL",
"per_tactic_detection": [],
"tactics_with_detection": 0,
"tactics_with_zero_detection": 0,
}
def generate_comparison(self) -> Dict[str, Any]:
"""Generate comprehensive model comparison report"""
print("\n" + "=" * 80)
print("GENERATING MODEL COMPARISON")
print("=" * 80 + "\n")
# Group data by model
models_data = self.group_by_model()
if not models_data:
print("[WARNING] No model data found")
return {"error": "No model data found"}
print(f"Found {len(models_data)} models: {', '.join(models_data.keys())}")
# Calculate metrics for each model
model_metrics = {}
for model_name, model_data in models_data.items():
print(
f"\nCalculating metrics for {model_name} ({len(model_data)} files)..."
)
model_metrics[model_name] = self.calculate_model_metrics(model_data)
# Generate comparison summary
comparison_summary = self._generate_comparison_summary(model_metrics)
# Generate ranking
ranking = self._generate_ranking(model_metrics)
# Generate detailed comparison
detailed_comparison = self._generate_detailed_comparison(model_metrics)
report = {
"timestamp": datetime.now().isoformat(),
"total_models_compared": len(model_metrics),
"models_analyzed": list(model_metrics.keys()),
"comparison_summary": comparison_summary,
"model_ranking": ranking,
"detailed_model_metrics": model_metrics,
"detailed_comparison": detailed_comparison,
}
return report
def _generate_comparison_summary(
self, model_metrics: Dict[str, Dict]
) -> Dict[str, Any]:
"""Generate high-level comparison summary"""
if not model_metrics:
return {}
# Find best and worst performers
best_detection = max(
model_metrics.items(), key=lambda x: x[1]["detection_rate_percent"]
)
worst_detection = min(
model_metrics.items(), key=lambda x: x[1]["detection_rate_percent"]
)
best_coverage = max(
model_metrics.items(), key=lambda x: x[1]["coverage_percent"]
)
worst_coverage = min(
model_metrics.items(), key=lambda x: x[1]["coverage_percent"]
)
best_effectiveness = max(
model_metrics.items(), key=lambda x: x[1]["effectiveness_score"]
)
worst_effectiveness = min(
model_metrics.items(), key=lambda x: x[1]["effectiveness_score"]
)
# Calculate averages
avg_detection = statistics.mean(
[m["detection_rate_percent"] for m in model_metrics.values()]
)
avg_coverage = statistics.mean(
[m["coverage_percent"] for m in model_metrics.values()]
)
avg_effectiveness = statistics.mean(
[m["effectiveness_score"] for m in model_metrics.values()]
)
return {
"average_detection_rate_percent": avg_detection,
"average_coverage_percent": avg_coverage,
"average_effectiveness_score": avg_effectiveness,
"best_detection": {
"model": best_detection[0],
"score": best_detection[1]["detection_rate_percent"],
},
"worst_detection": {
"model": worst_detection[0],
"score": worst_detection[1]["detection_rate_percent"],
},
"best_coverage": {
"model": best_coverage[0],
"score": best_coverage[1]["coverage_percent"],
},
"worst_coverage": {
"model": worst_coverage[0],
"score": worst_coverage[1]["coverage_percent"],
},
"best_overall": {
"model": best_effectiveness[0],
"score": best_effectiveness[1]["effectiveness_score"],
"grade": best_effectiveness[1]["grade"],
},
"worst_overall": {
"model": worst_effectiveness[0],
"score": worst_effectiveness[1]["effectiveness_score"],
"grade": worst_effectiveness[1]["grade"],
},
}
def _generate_ranking(self, model_metrics: Dict[str, Dict]) -> List[Dict[str, Any]]:
"""Generate ranked list of models by effectiveness"""
ranked_models = sorted(
model_metrics.items(),
key=lambda x: x[1]["effectiveness_score"],
reverse=True,
)
ranking = []
for rank, (model_name, metrics) in enumerate(ranked_models, 1):
ranking.append(
{
"rank": rank,
"model_name": model_name,
"effectiveness_score": metrics["effectiveness_score"],
"grade": metrics["grade"],
"detection_rate_percent": metrics["detection_rate_percent"],
"coverage_percent": metrics["coverage_percent"],
"average_accuracy_score": metrics["average_accuracy_score"],
"total_files_analyzed": metrics["total_files_analyzed"],
}
)
return ranking
def _generate_detailed_comparison(
self, model_metrics: Dict[str, Dict]
) -> Dict[str, Any]:
"""Generate detailed side-by-side comparison"""
if not model_metrics:
return {}
# Get all tactics across all models
all_tactics = set()
for metrics in model_metrics.values():
for tactic_data in metrics["per_tactic_detection"]:
all_tactics.add(tactic_data["tactic"])
all_tactics = sorted(list(all_tactics))
# Create tactic-by-tactic comparison
tactic_comparison = {}
for tactic in all_tactics:
tactic_comparison[tactic] = {}
for model_name, metrics in model_metrics.items():
# Find this tactic in the model's data
tactic_data = next(
(
t
for t in metrics["per_tactic_detection"]
if t["tactic"] == tactic
),
None,
)
if tactic_data:
tactic_comparison[tactic][model_name] = {
"detection_rate_percent": tactic_data["detection_rate_percent"],
"files_detected": tactic_data["files_detected"],
"total_files": tactic_data["total_files"],
"status": tactic_data["status"],
}
else:
tactic_comparison[tactic][model_name] = {
"detection_rate_percent": 0.0,
"files_detected": 0,
"total_files": 0,
"status": "NOT_TESTED",
}
return {
"tactic_by_tactic_comparison": tactic_comparison,
"all_tactics_tested": all_tactics,
}
def main():
parser = argparse.ArgumentParser(
description="Compare performance metrics across different models"
)
parser.add_argument(
"--input",
default="evaluation/full_pipeline/results/tactic_counts_summary.json",
help="Path to tactic_counts_summary.json",
)
parser.add_argument(
"--output",
default="evaluation/full_pipeline/results/model_comparison.json",
help="Output file for model comparison report",
)
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
if not input_path.exists():
print(f"[ERROR] Input file not found: {input_path}")
print("Run count_tactics.py first to generate tactic counts")
return 1
# Run comparison
comparator = ModelComparator(input_path)
report = comparator.generate_comparison()
# Save report
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(report, indent=2), encoding="utf-8")
# Display summary
print("\n" + "=" * 80)
print("MODEL COMPARISON COMPLETE")
print("=" * 80)
if "error" in report:
print(f"Error: {report['error']}")
return 1
print(f"Models compared: {report['total_models_compared']}")
print(f"Models: {', '.join(report['models_analyzed'])}")
if report["model_ranking"]:
print(
f"\nTop performer: {report['model_ranking'][0]['model_name']} "
f"(Score: {report['model_ranking'][0]['effectiveness_score']:.1f}, "
f"Grade: {report['model_ranking'][0]['grade']})"
)
summary = report["comparison_summary"]
if summary:
print(f"\nAverage effectiveness: {summary['average_effectiveness_score']:.1f}")
print(
f"Best detection: {summary['best_detection']['model']} ({summary['best_detection']['score']:.1f}%)"
)
print(
f"Best coverage: {summary['best_coverage']['model']} ({summary['best_coverage']['score']:.1f}%)"
)
print(f"\nReport saved to: {output_path}")
print("=" * 80 + "\n")
return 0
if __name__ == "__main__":
exit(main())