minhan6559's picture
Upload 126 files
223ef32 verified
raw
history blame
15.8 kB
#!/usr/bin/env python3
"""
Evaluate system performance metrics.
Calculates detection rates, coverage, accuracy, and overall effectiveness
based on tactic occurrence counts. Generates separate reports for each model.
Usage:
python evaluate_metrics.py [--input INPUT_PATH] [--output OUTPUT_PATH]
"""
import argparse
import json
from pathlib import Path
from typing import Dict, List, Any
from datetime import datetime
import statistics
class SystemEvaluator:
"""Evaluates multi-agent system performance"""
def __init__(self, tactic_counts_file: Path):
self.tactic_counts_file = tactic_counts_file
self.tactic_data = []
self.load_tactic_counts()
def load_tactic_counts(self):
"""Load tactic counts summary data"""
if not self.tactic_counts_file.exists():
raise FileNotFoundError(f"Tactic counts file not found: {self.tactic_counts_file}")
data = json.loads(self.tactic_counts_file.read_text(encoding='utf-8'))
self.tactic_data = data.get('results', [])
print(f"[INFO] Loaded {len(self.tactic_data)} tactic analysis results")
def group_by_model(self) -> Dict[str, List[Dict]]:
"""Group tactic data by model"""
models = {}
for item in self.tactic_data:
model = item['model']
if model not in models:
models[model] = []
models[model].append(item)
return models
def calculate_detection_rate(self, model_data: List[Dict] = None) -> Dict[str, Any]:
"""Calculate detection rate: % of files where tactic was correctly detected"""
data_to_use = model_data if model_data is not None else self.tactic_data
# Aggregate by tactic
tactic_aggregates = {}
for item in data_to_use:
tactic = item['tactic']
if tactic not in tactic_aggregates:
tactic_aggregates[tactic] = {
'total_files': 0,
'files_detected': 0,
'total_events': 0
}
tactic_aggregates[tactic]['total_files'] += 1
tactic_aggregates[tactic]['files_detected'] += item['tactic_detected']
tactic_aggregates[tactic]['total_events'] += item['total_abnormal_events_detected']
total_files = sum(agg['total_files'] for agg in tactic_aggregates.values())
total_detected = sum(agg['files_detected'] for agg in tactic_aggregates.values())
total_events = sum(agg['total_events'] for agg in tactic_aggregates.values())
per_tactic_detection = []
for tactic, agg in sorted(tactic_aggregates.items()):
files = agg['total_files']
detected = agg['files_detected']
events = agg['total_events']
detection_rate = (detected / files * 100) if files > 0 else 0.0
per_tactic_detection.append({
'tactic': tactic,
'total_files': files,
'files_detected': detected,
'files_missed': files - detected,
'total_abnormal_events_detected': events,
'detection_rate_percent': detection_rate,
'status': 'GOOD' if detection_rate >= 50 else ('POOR' if detection_rate > 0 else 'NONE')
})
overall_detection_rate = (total_detected / total_files * 100) if total_files > 0 else 0.0
return {
'overall_detection_rate_percent': overall_detection_rate,
'total_files': total_files,
'total_files_detected': total_detected,
'total_files_missed': total_files - total_detected,
'total_abnormal_events_detected': total_events,
'total_tactics': len(tactic_aggregates),
'per_tactic_detection': per_tactic_detection
}
def calculate_coverage(self, model_data: List[Dict] = None) -> Dict[str, Any]:
"""Calculate coverage: how many tactics have at least one successful detection"""
data_to_use = model_data if model_data is not None else self.tactic_data
# Aggregate by tactic
tactic_aggregates = {}
for item in data_to_use:
tactic = item['tactic']
if tactic not in tactic_aggregates:
tactic_aggregates[tactic] = 0
tactic_aggregates[tactic] += item['tactic_detected']
total_tactics = len(tactic_aggregates)
tactics_with_detection = sum(1 for count in tactic_aggregates.values() if count > 0)
tactics_with_zero_detection = total_tactics - tactics_with_detection
coverage_percent = (tactics_with_detection / total_tactics * 100) if total_tactics > 0 else 0.0
detected_tactics = sorted([tactic for tactic, count in tactic_aggregates.items() if count > 0])
missed_tactics = sorted([tactic for tactic, count in tactic_aggregates.items() if count == 0])
return {
'coverage_percent': coverage_percent,
'total_tactics_tested': total_tactics,
'tactics_with_detection': tactics_with_detection,
'tactics_with_zero_detection': tactics_with_zero_detection,
'detected_tactics': detected_tactics,
'missed_tactics': missed_tactics
}
def calculate_accuracy_proxy(self, model_data: List[Dict] = None) -> Dict[str, Any]:
"""Calculate accuracy proxy: detection success rate per tactic"""
data_to_use = model_data if model_data is not None else self.tactic_data
# Aggregate by tactic
tactic_aggregates = {}
for item in data_to_use:
tactic = item['tactic']
if tactic not in tactic_aggregates:
tactic_aggregates[tactic] = {
'total_files': 0,
'files_detected': 0
}
tactic_aggregates[tactic]['total_files'] += 1
tactic_aggregates[tactic]['files_detected'] += item['tactic_detected']
accuracy_scores = []
for tactic, agg in sorted(tactic_aggregates.items()):
if agg['total_files'] > 0:
accuracy = agg['files_detected'] / agg['total_files']
accuracy_scores.append({
'tactic': tactic,
'accuracy_score': accuracy,
'interpretation': 'Perfect' if accuracy == 1.0 else ('Partial' if accuracy > 0 else 'Failed')
})
avg_accuracy = statistics.mean([s['accuracy_score'] for s in accuracy_scores]) if accuracy_scores else 0.0
return {
'average_accuracy_score': avg_accuracy,
'per_tactic_accuracy': accuracy_scores,
'perfect_matches': sum(1 for s in accuracy_scores if s['accuracy_score'] == 1.0),
'partial_matches': sum(1 for s in accuracy_scores if 0 < s['accuracy_score'] < 1.0),
'failed_matches': sum(1 for s in accuracy_scores if s['accuracy_score'] == 0.0)
}
def calculate_effectiveness(self, model_data: List[Dict] = None) -> Dict[str, Any]:
"""Calculate overall system effectiveness score (0-100)"""
detection = self.calculate_detection_rate(model_data)
coverage = self.calculate_coverage(model_data)
accuracy = self.calculate_accuracy_proxy(model_data)
# Weighted effectiveness score
# 40% detection rate, 30% coverage, 30% accuracy
effectiveness_score = (
detection['overall_detection_rate_percent'] * 0.4 +
coverage['coverage_percent'] * 0.3 +
accuracy['average_accuracy_score'] * 100 * 0.3
)
# Grade the system
if effectiveness_score >= 80:
grade = 'EXCELLENT'
elif effectiveness_score >= 60:
grade = 'GOOD'
elif effectiveness_score >= 40:
grade = 'FAIR'
elif effectiveness_score >= 20:
grade = 'POOR'
else:
grade = 'CRITICAL'
return {
'effectiveness_score': effectiveness_score,
'grade': grade,
'component_scores': {
'detection_rate': detection['overall_detection_rate_percent'],
'coverage_rate': coverage['coverage_percent'],
'accuracy_score': accuracy['average_accuracy_score'] * 100
}
}
def identify_issues(self, model_data: List[Dict] = None) -> List[str]:
"""Identify specific issues and gaps"""
issues = []
detection = self.calculate_detection_rate(model_data)
coverage = self.calculate_coverage(model_data)
# Check overall detection
if detection['overall_detection_rate_percent'] < 20:
issues.append(
f"CRITICAL: Overall detection rate is only {detection['overall_detection_rate_percent']:.1f}%. "
f"System is failing to detect most attacks ({detection['total_files_missed']}/{detection['total_files']} files missed)."
)
elif detection['overall_detection_rate_percent'] < 50:
issues.append(
f"WARNING: Detection rate is {detection['overall_detection_rate_percent']:.1f}%, "
f"below acceptable threshold of 50% ({detection['total_files_missed']}/{detection['total_files']} files missed)."
)
# Check coverage
if coverage['tactics_with_zero_detection'] > 0:
missed = ', '.join(coverage['missed_tactics'])
issues.append(
f"COVERAGE GAP: {coverage['tactics_with_zero_detection']} tactics have zero detection: {missed}"
)
# Check for specific problematic tactics
for item in detection['per_tactic_detection']:
if item['total_files'] > 0 and item['detection_rate_percent'] == 0:
issues.append(
f"TACTIC FAILURE: '{item['tactic']}' - "
f"{item['total_files']} files analyzed, 0 detected"
)
# Check for data quality issues
data_to_use = model_data if model_data is not None else self.tactic_data
zero_event_tactics = [item['tactic'] for item in data_to_use if item['total_abnormal_events_detected'] == 0]
if zero_event_tactics:
unique_zero = list(set(zero_event_tactics))
issues.append(f"DATA ISSUE: No events to analyze for tactics: {', '.join(unique_zero)}")
if not issues:
issues.append("No critical issues detected. System is performing within acceptable parameters.")
return issues
def run_evaluation_for_model(self, model_name: str, model_data: List[Dict]) -> Dict[str, Any]:
"""Run full evaluation for a specific model"""
print(f"\nEvaluating model: {model_name} ({len(model_data)} files)")
detection = self.calculate_detection_rate(model_data)
coverage = self.calculate_coverage(model_data)
accuracy = self.calculate_accuracy_proxy(model_data)
effectiveness = self.calculate_effectiveness(model_data)
issues = self.identify_issues(model_data)
report = {
'timestamp': datetime.now().isoformat(),
'model_name': model_name,
'evaluation_metrics': {
'detection_rate': detection,
'coverage': coverage,
'accuracy_proxy': accuracy,
'effectiveness': effectiveness
},
'issues_identified': issues,
}
return report
def run_evaluation(self) -> Dict[str, Any]:
"""Run full evaluation and compile report for all models"""
print("\n" + "="*80)
print("RUNNING SYSTEM EVALUATION")
print("="*80 + "\n")
# Group data by model
models_data = self.group_by_model()
if not models_data:
print("[WARNING] No model data found")
return {'error': 'No model data found'}
print(f"Found {len(models_data)} models: {', '.join(models_data.keys())}")
# Generate reports for each model
model_reports = {}
for model_name, model_data in models_data.items():
print(f"\nProcessing model: {model_name}")
model_reports[model_name] = self.run_evaluation_for_model(model_name, model_data)
# Create summary report
summary_report = {
'timestamp': datetime.now().isoformat(),
'total_models_evaluated': len(model_reports),
'models': list(model_reports.keys()),
'model_reports': model_reports
}
return summary_report
def main():
parser = argparse.ArgumentParser(
description="Evaluate multi-agent system performance"
)
parser.add_argument(
"--input",
default="full_pipeline_evaluation/results/tactic_counts_summary.json",
help="Path to tactic_counts_summary.json"
)
parser.add_argument(
"--output",
default="full_pipeline_evaluation/results/evaluation_report.json",
help="Output file for evaluation report"
)
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
if not input_path.exists():
print(f"[ERROR] Input file not found: {input_path}")
print("Run count_tactics.py first to generate tactic counts")
return 1
# Run evaluation
evaluator = SystemEvaluator(input_path)
report = evaluator.run_evaluation()
if 'error' in report:
print(f"[ERROR] {report['error']}")
return 1
# Save main report
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(report, indent=2), encoding='utf-8')
# Save individual model reports
for model_name, model_report in report['model_reports'].items():
model_output_path = output_path.parent / f"evaluation_report_{model_name.replace(':', '_').replace('/', '_')}.json"
model_output_path.write_text(json.dumps(model_report, indent=2), encoding='utf-8')
print(f"Model report saved: {model_output_path}")
# Display summary
print("\n" + "="*80)
print("EVALUATION COMPLETE")
print("="*80)
print(f"Models evaluated: {report['total_models_evaluated']}")
print(f"Models: {', '.join(report['models'])}")
# Show summary for each model
for model_name, model_report in report['model_reports'].items():
effectiveness = model_report['evaluation_metrics']['effectiveness']
print(f"\n{model_name}:")
print(f" Effectiveness Score: {effectiveness['effectiveness_score']:.1f}/100")
print(f" Grade: {effectiveness['grade']}")
print(f" Detection Rate: {effectiveness['component_scores']['detection_rate']:.1f}%")
print(f" Coverage: {effectiveness['component_scores']['coverage_rate']:.1f}%")
print(f" Accuracy: {effectiveness['component_scores']['accuracy_score']:.1f}%")
print(f"\nMain report saved to: {output_path}")
print("="*80 + "\n")
return 0
if __name__ == "__main__":
exit(main())