|
|
|
|
|
"""
|
|
|
Evaluate system performance metrics.
|
|
|
|
|
|
Calculates detection rates, coverage, accuracy, and overall effectiveness
|
|
|
based on tactic occurrence counts. Generates separate reports for each model.
|
|
|
|
|
|
Usage:
|
|
|
python evaluate_metrics.py [--input INPUT_PATH] [--output OUTPUT_PATH]
|
|
|
"""
|
|
|
import argparse
|
|
|
import json
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, List, Any
|
|
|
from datetime import datetime
|
|
|
import statistics
|
|
|
|
|
|
|
|
|
class SystemEvaluator:
|
|
|
"""Evaluates multi-agent system performance"""
|
|
|
|
|
|
def __init__(self, tactic_counts_file: Path):
|
|
|
self.tactic_counts_file = tactic_counts_file
|
|
|
self.tactic_data = []
|
|
|
self.load_tactic_counts()
|
|
|
|
|
|
def load_tactic_counts(self):
|
|
|
"""Load tactic counts summary data"""
|
|
|
if not self.tactic_counts_file.exists():
|
|
|
raise FileNotFoundError(
|
|
|
f"Tactic counts file not found: {self.tactic_counts_file}"
|
|
|
)
|
|
|
|
|
|
data = json.loads(self.tactic_counts_file.read_text(encoding="utf-8"))
|
|
|
self.tactic_data = data.get("results", [])
|
|
|
print(f"[INFO] Loaded {len(self.tactic_data)} tactic analysis results")
|
|
|
|
|
|
def group_by_model(self) -> Dict[str, List[Dict]]:
|
|
|
"""Group tactic data by model"""
|
|
|
models = {}
|
|
|
for item in self.tactic_data:
|
|
|
model = item["model"]
|
|
|
if model not in models:
|
|
|
models[model] = []
|
|
|
models[model].append(item)
|
|
|
return models
|
|
|
|
|
|
def calculate_detection_rate(self, model_data: List[Dict] = None) -> Dict[str, Any]:
|
|
|
"""Calculate detection rate: % of files where tactic was correctly detected"""
|
|
|
data_to_use = model_data if model_data is not None else self.tactic_data
|
|
|
|
|
|
|
|
|
tactic_aggregates = {}
|
|
|
for item in data_to_use:
|
|
|
tactic = item["tactic"]
|
|
|
if tactic not in tactic_aggregates:
|
|
|
tactic_aggregates[tactic] = {
|
|
|
"total_files": 0,
|
|
|
"files_detected": 0,
|
|
|
"total_events": 0,
|
|
|
}
|
|
|
tactic_aggregates[tactic]["total_files"] += 1
|
|
|
tactic_aggregates[tactic]["files_detected"] += item["tactic_detected"]
|
|
|
tactic_aggregates[tactic]["total_events"] += item[
|
|
|
"total_abnormal_events_detected"
|
|
|
]
|
|
|
|
|
|
total_files = sum(agg["total_files"] for agg in tactic_aggregates.values())
|
|
|
total_detected = sum(
|
|
|
agg["files_detected"] for agg in tactic_aggregates.values()
|
|
|
)
|
|
|
total_events = sum(agg["total_events"] for agg in tactic_aggregates.values())
|
|
|
|
|
|
per_tactic_detection = []
|
|
|
for tactic, agg in sorted(tactic_aggregates.items()):
|
|
|
files = agg["total_files"]
|
|
|
detected = agg["files_detected"]
|
|
|
events = agg["total_events"]
|
|
|
|
|
|
detection_rate = (detected / files * 100) if files > 0 else 0.0
|
|
|
|
|
|
per_tactic_detection.append(
|
|
|
{
|
|
|
"tactic": tactic,
|
|
|
"total_files": files,
|
|
|
"files_detected": detected,
|
|
|
"files_missed": files - detected,
|
|
|
"total_abnormal_events_detected": events,
|
|
|
"detection_rate_percent": detection_rate,
|
|
|
"status": (
|
|
|
"GOOD"
|
|
|
if detection_rate >= 50
|
|
|
else ("POOR" if detection_rate > 0 else "NONE")
|
|
|
),
|
|
|
}
|
|
|
)
|
|
|
|
|
|
overall_detection_rate = (
|
|
|
(total_detected / total_files * 100) if total_files > 0 else 0.0
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"overall_detection_rate_percent": overall_detection_rate,
|
|
|
"total_files": total_files,
|
|
|
"total_files_detected": total_detected,
|
|
|
"total_files_missed": total_files - total_detected,
|
|
|
"total_abnormal_events_detected": total_events,
|
|
|
"total_tactics": len(tactic_aggregates),
|
|
|
"per_tactic_detection": per_tactic_detection,
|
|
|
}
|
|
|
|
|
|
def calculate_coverage(self, model_data: List[Dict] = None) -> Dict[str, Any]:
|
|
|
"""Calculate coverage: how many tactics have at least one successful detection"""
|
|
|
data_to_use = model_data if model_data is not None else self.tactic_data
|
|
|
|
|
|
|
|
|
tactic_aggregates = {}
|
|
|
for item in data_to_use:
|
|
|
tactic = item["tactic"]
|
|
|
if tactic not in tactic_aggregates:
|
|
|
tactic_aggregates[tactic] = 0
|
|
|
tactic_aggregates[tactic] += item["tactic_detected"]
|
|
|
|
|
|
total_tactics = len(tactic_aggregates)
|
|
|
tactics_with_detection = sum(
|
|
|
1 for count in tactic_aggregates.values() if count > 0
|
|
|
)
|
|
|
tactics_with_zero_detection = total_tactics - tactics_with_detection
|
|
|
|
|
|
coverage_percent = (
|
|
|
(tactics_with_detection / total_tactics * 100) if total_tactics > 0 else 0.0
|
|
|
)
|
|
|
|
|
|
detected_tactics = sorted(
|
|
|
[tactic for tactic, count in tactic_aggregates.items() if count > 0]
|
|
|
)
|
|
|
missed_tactics = sorted(
|
|
|
[tactic for tactic, count in tactic_aggregates.items() if count == 0]
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"coverage_percent": coverage_percent,
|
|
|
"total_tactics_tested": total_tactics,
|
|
|
"tactics_with_detection": tactics_with_detection,
|
|
|
"tactics_with_zero_detection": tactics_with_zero_detection,
|
|
|
"detected_tactics": detected_tactics,
|
|
|
"missed_tactics": missed_tactics,
|
|
|
}
|
|
|
|
|
|
def calculate_accuracy_proxy(self, model_data: List[Dict] = None) -> Dict[str, Any]:
|
|
|
"""Calculate accuracy proxy: detection success rate per tactic"""
|
|
|
data_to_use = model_data if model_data is not None else self.tactic_data
|
|
|
|
|
|
|
|
|
tactic_aggregates = {}
|
|
|
for item in data_to_use:
|
|
|
tactic = item["tactic"]
|
|
|
if tactic not in tactic_aggregates:
|
|
|
tactic_aggregates[tactic] = {"total_files": 0, "files_detected": 0}
|
|
|
tactic_aggregates[tactic]["total_files"] += 1
|
|
|
tactic_aggregates[tactic]["files_detected"] += item["tactic_detected"]
|
|
|
|
|
|
accuracy_scores = []
|
|
|
for tactic, agg in sorted(tactic_aggregates.items()):
|
|
|
if agg["total_files"] > 0:
|
|
|
accuracy = agg["files_detected"] / agg["total_files"]
|
|
|
accuracy_scores.append(
|
|
|
{
|
|
|
"tactic": tactic,
|
|
|
"accuracy_score": accuracy,
|
|
|
"interpretation": (
|
|
|
"Perfect"
|
|
|
if accuracy == 1.0
|
|
|
else ("Partial" if accuracy > 0 else "Failed")
|
|
|
),
|
|
|
}
|
|
|
)
|
|
|
|
|
|
avg_accuracy = (
|
|
|
statistics.mean([s["accuracy_score"] for s in accuracy_scores])
|
|
|
if accuracy_scores
|
|
|
else 0.0
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"average_accuracy_score": avg_accuracy,
|
|
|
"per_tactic_accuracy": accuracy_scores,
|
|
|
"perfect_matches": sum(
|
|
|
1 for s in accuracy_scores if s["accuracy_score"] == 1.0
|
|
|
),
|
|
|
"partial_matches": sum(
|
|
|
1 for s in accuracy_scores if 0 < s["accuracy_score"] < 1.0
|
|
|
),
|
|
|
"failed_matches": sum(
|
|
|
1 for s in accuracy_scores if s["accuracy_score"] == 0.0
|
|
|
),
|
|
|
}
|
|
|
|
|
|
def calculate_effectiveness(self, model_data: List[Dict] = None) -> Dict[str, Any]:
|
|
|
"""Calculate overall system effectiveness score (0-100)"""
|
|
|
detection = self.calculate_detection_rate(model_data)
|
|
|
coverage = self.calculate_coverage(model_data)
|
|
|
accuracy = self.calculate_accuracy_proxy(model_data)
|
|
|
|
|
|
|
|
|
|
|
|
effectiveness_score = (
|
|
|
detection["overall_detection_rate_percent"] * 0.4
|
|
|
+ coverage["coverage_percent"] * 0.3
|
|
|
+ accuracy["average_accuracy_score"] * 100 * 0.3
|
|
|
)
|
|
|
|
|
|
|
|
|
if effectiveness_score >= 80:
|
|
|
grade = "EXCELLENT"
|
|
|
elif effectiveness_score >= 60:
|
|
|
grade = "GOOD"
|
|
|
elif effectiveness_score >= 40:
|
|
|
grade = "FAIR"
|
|
|
elif effectiveness_score >= 20:
|
|
|
grade = "POOR"
|
|
|
else:
|
|
|
grade = "CRITICAL"
|
|
|
|
|
|
return {
|
|
|
"effectiveness_score": effectiveness_score,
|
|
|
"grade": grade,
|
|
|
"component_scores": {
|
|
|
"detection_rate": detection["overall_detection_rate_percent"],
|
|
|
"coverage_rate": coverage["coverage_percent"],
|
|
|
"accuracy_score": accuracy["average_accuracy_score"] * 100,
|
|
|
},
|
|
|
}
|
|
|
|
|
|
def identify_issues(self, model_data: List[Dict] = None) -> List[str]:
|
|
|
"""Identify specific issues and gaps"""
|
|
|
issues = []
|
|
|
|
|
|
detection = self.calculate_detection_rate(model_data)
|
|
|
coverage = self.calculate_coverage(model_data)
|
|
|
|
|
|
|
|
|
if detection["overall_detection_rate_percent"] < 20:
|
|
|
issues.append(
|
|
|
f"CRITICAL: Overall detection rate is only {detection['overall_detection_rate_percent']:.1f}%. "
|
|
|
f"System is failing to detect most attacks ({detection['total_files_missed']}/{detection['total_files']} files missed)."
|
|
|
)
|
|
|
elif detection["overall_detection_rate_percent"] < 50:
|
|
|
issues.append(
|
|
|
f"WARNING: Detection rate is {detection['overall_detection_rate_percent']:.1f}%, "
|
|
|
f"below acceptable threshold of 50% ({detection['total_files_missed']}/{detection['total_files']} files missed)."
|
|
|
)
|
|
|
|
|
|
|
|
|
if coverage["tactics_with_zero_detection"] > 0:
|
|
|
missed = ", ".join(coverage["missed_tactics"])
|
|
|
issues.append(
|
|
|
f"COVERAGE GAP: {coverage['tactics_with_zero_detection']} tactics have zero detection: {missed}"
|
|
|
)
|
|
|
|
|
|
|
|
|
for item in detection["per_tactic_detection"]:
|
|
|
if item["total_files"] > 0 and item["detection_rate_percent"] == 0:
|
|
|
issues.append(
|
|
|
f"TACTIC FAILURE: '{item['tactic']}' - "
|
|
|
f"{item['total_files']} files analyzed, 0 detected"
|
|
|
)
|
|
|
|
|
|
|
|
|
data_to_use = model_data if model_data is not None else self.tactic_data
|
|
|
zero_event_tactics = [
|
|
|
item["tactic"]
|
|
|
for item in data_to_use
|
|
|
if item["total_abnormal_events_detected"] == 0
|
|
|
]
|
|
|
if zero_event_tactics:
|
|
|
unique_zero = list(set(zero_event_tactics))
|
|
|
issues.append(
|
|
|
f"DATA ISSUE: No events to analyze for tactics: {', '.join(unique_zero)}"
|
|
|
)
|
|
|
|
|
|
if not issues:
|
|
|
issues.append(
|
|
|
"No critical issues detected. System is performing within acceptable parameters."
|
|
|
)
|
|
|
|
|
|
return issues
|
|
|
|
|
|
def run_evaluation_for_model(
|
|
|
self, model_name: str, model_data: List[Dict]
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Run full evaluation for a specific model"""
|
|
|
print(f"\nEvaluating model: {model_name} ({len(model_data)} files)")
|
|
|
|
|
|
detection = self.calculate_detection_rate(model_data)
|
|
|
coverage = self.calculate_coverage(model_data)
|
|
|
accuracy = self.calculate_accuracy_proxy(model_data)
|
|
|
effectiveness = self.calculate_effectiveness(model_data)
|
|
|
issues = self.identify_issues(model_data)
|
|
|
|
|
|
report = {
|
|
|
"timestamp": datetime.now().isoformat(),
|
|
|
"model_name": model_name,
|
|
|
"evaluation_metrics": {
|
|
|
"detection_rate": detection,
|
|
|
"coverage": coverage,
|
|
|
"accuracy_proxy": accuracy,
|
|
|
"effectiveness": effectiveness,
|
|
|
},
|
|
|
"issues_identified": issues,
|
|
|
}
|
|
|
|
|
|
return report
|
|
|
|
|
|
def run_evaluation(self) -> Dict[str, Any]:
|
|
|
"""Run full evaluation and compile report for all models"""
|
|
|
print("\n" + "=" * 80)
|
|
|
print("RUNNING SYSTEM EVALUATION")
|
|
|
print("=" * 80 + "\n")
|
|
|
|
|
|
|
|
|
models_data = self.group_by_model()
|
|
|
|
|
|
if not models_data:
|
|
|
print("[WARNING] No model data found")
|
|
|
return {"error": "No model data found"}
|
|
|
|
|
|
print(f"Found {len(models_data)} models: {', '.join(models_data.keys())}")
|
|
|
|
|
|
|
|
|
model_reports = {}
|
|
|
for model_name, model_data in models_data.items():
|
|
|
print(f"\nProcessing model: {model_name}")
|
|
|
model_reports[model_name] = self.run_evaluation_for_model(
|
|
|
model_name, model_data
|
|
|
)
|
|
|
|
|
|
|
|
|
summary_report = {
|
|
|
"timestamp": datetime.now().isoformat(),
|
|
|
"total_models_evaluated": len(model_reports),
|
|
|
"models": list(model_reports.keys()),
|
|
|
"model_reports": model_reports,
|
|
|
}
|
|
|
|
|
|
return summary_report
|
|
|
|
|
|
|
|
|
def main():
|
|
|
parser = argparse.ArgumentParser(
|
|
|
description="Evaluate multi-agent system performance"
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"--input",
|
|
|
default="evaluation/full_pipeline/results/tactic_counts_summary.json",
|
|
|
help="Path to tactic_counts_summary.json",
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"--output",
|
|
|
default="evaluation/full_pipeline/results/evaluation_report.json",
|
|
|
help="Output file for evaluation report",
|
|
|
)
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
input_path = Path(args.input)
|
|
|
output_path = Path(args.output)
|
|
|
|
|
|
if not input_path.exists():
|
|
|
print(f"[ERROR] Input file not found: {input_path}")
|
|
|
print("Run count_tactics.py first to generate tactic counts")
|
|
|
return 1
|
|
|
|
|
|
|
|
|
evaluator = SystemEvaluator(input_path)
|
|
|
report = evaluator.run_evaluation()
|
|
|
|
|
|
if "error" in report:
|
|
|
print(f"[ERROR] {report['error']}")
|
|
|
return 1
|
|
|
|
|
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
output_path.write_text(json.dumps(report, indent=2), encoding="utf-8")
|
|
|
|
|
|
|
|
|
for model_name, model_report in report["model_reports"].items():
|
|
|
model_output_path = (
|
|
|
output_path.parent
|
|
|
/ f"evaluation_report_{model_name.replace(':', '_').replace('/', '_')}.json"
|
|
|
)
|
|
|
model_output_path.write_text(
|
|
|
json.dumps(model_report, indent=2), encoding="utf-8"
|
|
|
)
|
|
|
print(f"Model report saved: {model_output_path}")
|
|
|
|
|
|
|
|
|
print("\n" + "=" * 80)
|
|
|
print("EVALUATION COMPLETE")
|
|
|
print("=" * 80)
|
|
|
print(f"Models evaluated: {report['total_models_evaluated']}")
|
|
|
print(f"Models: {', '.join(report['models'])}")
|
|
|
|
|
|
|
|
|
for model_name, model_report in report["model_reports"].items():
|
|
|
effectiveness = model_report["evaluation_metrics"]["effectiveness"]
|
|
|
print(f"\n{model_name}:")
|
|
|
print(f" Effectiveness Score: {effectiveness['effectiveness_score']:.1f}/100")
|
|
|
print(f" Grade: {effectiveness['grade']}")
|
|
|
print(
|
|
|
f" Detection Rate: {effectiveness['component_scores']['detection_rate']:.1f}%"
|
|
|
)
|
|
|
print(f" Coverage: {effectiveness['component_scores']['coverage_rate']:.1f}%")
|
|
|
print(f" Accuracy: {effectiveness['component_scores']['accuracy_score']:.1f}%")
|
|
|
|
|
|
print(f"\nMain report saved to: {output_path}")
|
|
|
print("=" * 80 + "\n")
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
exit(main())
|
|
|
|