Ariyan-Pro's picture
Enterprise Adversarial ML Governance Engine v5.0 LTS
f4bee9e
"""
🔍 ADVERSARIAL INTELLIGENCE CORE - REAL IMPLEMENTATION
Attacks are signals, not failures.
"""
import json
import pickle
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from collections import defaultdict
import numpy as np
from sklearn.cluster import DBSCAN
import hashlib
class AttackTelemetry:
"""Real attack pattern analysis and threat scoring"""
def __init__(self, telemetry_dir: str = "intelligence/telemetry"):
self.telemetry_dir = Path(telemetry_dir)
self.telemetry_dir.mkdir(parents=True, exist_ok=True)
# In-memory telemetry storage
self.inference_log: List[Dict] = []
self.attack_patterns: List[Dict] = []
self.threat_signatures: Dict[str, Dict] = {}
# Load existing telemetry
self._load_telemetry()
def record_inference(self, request_id: str, request: Dict, prediction: Dict):
"""Record inference with adversarial indicators"""
telemetry = {
"timestamp": datetime.now().isoformat(),
"request_id": request_id,
"input_hash": self._hash_input(request.get("data", {})),
"prediction_confidence": prediction.get("confidence", 0.0),
"prediction_class": prediction.get("class", -1),
"inference_time_ms": prediction.get("inference_time_ms", 0),
"adversarial_indicators": self._extract_indicators(request, prediction)
}
self.inference_log.append(telemetry)
# Check for attack patterns
self._analyze_for_attacks(telemetry)
# Save to disk
self._save_telemetry()
def record_attack(self, attack_type: str, success: bool,
request: Dict, original_pred: Dict, adversarial_pred: Dict):
"""Record a confirmed attack attempt"""
attack_record = {
"timestamp": datetime.now().isoformat(),
"attack_type": attack_type,
"success": success,
"original_confidence": original_pred.get("confidence", 0.0),
"adversarial_confidence": adversarial_pred.get("confidence", 0.0),
"confidence_drop": original_pred.get("confidence", 0.0) - adversarial_pred.get("confidence", 0.0),
"input_signature": self._extract_attack_signature(request),
"metadata": {
"model": request.get("model", "unknown"),
"domain": request.get("domain", "unknown")
}
}
self.attack_patterns.append(attack_record)
# Update threat signatures
self._update_threat_signatures(attack_record)
# Generate immediate alert for high-confidence attacks
if attack_record["confidence_drop"] > 0.5: # 50% confidence drop
self._generate_attack_alert(attack_record)
def generate_threat_report(self, timeframe_hours: int = 24) -> Dict:
"""Generate real threat intelligence report"""
cutoff_time = datetime.now() - timedelta(hours=timeframe_hours)
# Filter recent data
recent_attacks = [
a for a in self.attack_patterns
if datetime.fromisoformat(a["timestamp"]) > cutoff_time
]
recent_inferences = [
i for i in self.inference_log
if datetime.fromisoformat(i["timestamp"]) > cutoff_time
]
# Calculate threat metrics
attack_success_rate = self._calculate_attack_success_rate(recent_attacks)
threat_score = self._calculate_threat_score(recent_attacks, recent_inferences)
top_attack_types = self._get_top_attack_types(recent_attacks)
# Cluster similar attacks
attack_clusters = self._cluster_attacks(recent_attacks)
return {
"report_time": datetime.now().isoformat(),
"timeframe_hours": timeframe_hours,
"summary": {
"total_inferences": len(recent_inferences),
"total_attacks_detected": len(recent_attacks),
"attack_success_rate": attack_success_rate,
"overall_threat_score": threat_score,
"top_attack_types": top_attack_types
},
"detailed_analysis": {
"attack_clusters": attack_clusters,
"confidence_trends": self._analyze_confidence_trends(recent_inferences),
"temporal_patterns": self._analyze_temporal_patterns(recent_attacks)
},
"recommendations": self._generate_recommendations(recent_attacks)
}
def get_attack_statistics(self) -> Dict:
"""Get real-time attack statistics"""
total_attacks = len(self.attack_patterns)
successful_attacks = sum(1 for a in self.attack_patterns if a["success"])
attacks_by_type = defaultdict(int)
for attack in self.attack_patterns:
attacks_by_type[attack["attack_type"]] += 1
# Calculate average confidence drop
if self.attack_patterns:
avg_drop = sum(a["confidence_drop"] for a in self.attack_patterns) / len(self.attack_patterns)
else:
avg_drop = 0.0
return {
"total_attacks": total_attacks,
"successful_attacks": successful_attacks,
"success_rate": successful_attacks / total_attacks if total_attacks > 0 else 0,
"attacks_by_type": dict(attacks_by_type),
"average_confidence_drop": avg_drop,
"threat_signatures_count": len(self.threat_signatures)
}
def _extract_indicators(self, request: Dict, prediction: Dict) -> Dict:
"""Extract adversarial indicators from request and prediction"""
indicators = {}
# Confidence anomaly
confidence = prediction.get("confidence", 0.0)
if confidence < 0.3:
indicators["low_confidence"] = confidence
# Input statistics
data = request.get("data", {}).get("input", [])
if isinstance(data, list) and len(data) > 0:
data_array = np.array(data)
indicators["input_stats"] = {
"mean": float(np.mean(data_array)),
"std": float(np.std(data_array)),
"max": float(np.max(data_array)),
"min": float(np.min(data_array))
}
# Check for suspicious patterns (placeholder for real pattern detection)
if "metadata" in request and "suspicious" in request["metadata"]:
indicators["suspicious_metadata"] = True
return indicators
def _analyze_for_attacks(self, telemetry: Dict):
"""Analyze telemetry for attack patterns"""
indicators = telemetry["adversarial_indicators"]
# Simple heuristic: very low confidence + unusual input stats
if indicators.get("low_confidence", 1.0) < 0.2:
stats = indicators.get("input_stats", {})
if stats.get("std", 0) > 0.5: # High variance
self._flag_potential_attack(telemetry)
def _flag_potential_attack(self, telemetry: Dict):
"""Flag a potential attack for further investigation"""
flag_record = {
**telemetry,
"flagged_as": "potential_attack",
"review_status": "pending"
}
# Save flagged record
flag_file = self.telemetry_dir / "flagged_attacks.jsonl"
with open(flag_file, "a") as f:
f.write(json.dumps(flag_record) + "\n")
def _hash_input(self, data: Any) -> str:
"""Create hash of input data for deduplication"""
data_str = json.dumps(data, sort_keys=True, default=str)
return hashlib.sha256(data_str.encode()).hexdigest()[:16]
def _extract_attack_signature(self, request: Dict) -> Dict:
"""Extract signature features from attack request"""
data = request.get("data", {})
signature = {
"input_shape": self._get_shape(data.get("input", [])),
"feature_stats": self._calculate_feature_stats(data.get("input", [])),
"request_pattern": {
"has_metadata": "metadata" in request,
"has_multiple_inputs": isinstance(data.get("input"), list) and len(data.get("input", [])) > 1
}
}
return signature
def _get_shape(self, data: Any) -> List[int]:
"""Get shape of input data"""
if isinstance(data, list):
if len(data) > 0 and isinstance(data[0], list):
return [len(data), len(data[0])]
return [len(data)]
return []
def _calculate_feature_stats(self, data: Any) -> Dict:
"""Calculate statistical features"""
if not data or not isinstance(data, list):
return {}
try:
flat_data = np.array(data).flatten()
return {
"mean": float(np.mean(flat_data)),
"std": float(np.std(flat_data)),
"skew": float(self._safe_skew(flat_data)),
"kurtosis": float(self._safe_kurtosis(flat_data))
}
except:
return {}
def _safe_skew(self, data):
"""Calculate skewness safely"""
from scipy.stats import skew
try:
return skew(data) if len(data) > 0 else 0.0
except:
return 0.0
def _safe_kurtosis(self, data):
"""Calculate kurtosis safely"""
from scipy.stats import kurtosis
try:
return kurtosis(data) if len(data) > 0 else 0.0
except:
return 0.0
def _update_threat_signatures(self, attack_record: Dict):
"""Update threat signatures database"""
signature_hash = hashlib.sha256(
json.dumps(attack_record["input_signature"], sort_keys=True).encode()
).hexdigest()[:12]
if signature_hash not in self.threat_signatures:
self.threat_signatures[signature_hash] = {
"first_seen": attack_record["timestamp"],
"last_seen": attack_record["timestamp"],
"attack_type": attack_record["attack_type"],
"occurrences": 1,
"signature": attack_record["input_signature"]
}
else:
self.threat_signatures[signature_hash]["last_seen"] = attack_record["timestamp"]
self.threat_signatures[signature_hash]["occurrences"] += 1
def _generate_attack_alert(self, attack_record: Dict):
"""Generate alert for serious attack"""
alert = {
"alert_type": "HIGH_CONFIDENCE_ATTACK",
"timestamp": datetime.now().isoformat(),
"severity": "HIGH",
"attack_details": attack_record,
"recommended_action": "Review firewall thresholds and consider model retraining"
}
# Save alert
alert_file = self.telemetry_dir / "alerts.jsonl"
with open(alert_file, "a") as f:
f.write(json.dumps(alert) + "\n")
# Log to console
print(f"🚨 HIGH SEVERITY ATTACK ALERT: {attack_record['attack_type']} "
f"caused {attack_record['confidence_drop']:.1%} confidence drop")
def _calculate_attack_success_rate(self, attacks: List[Dict]) -> float:
"""Calculate attack success rate"""
if not attacks:
return 0.0
successful = sum(1 for a in attacks if a["success"])
return successful / len(attacks)
def _calculate_threat_score(self, attacks: List[Dict], inferences: List[Dict]) -> float:
"""Calculate composite threat score (0-100)"""
if not inferences:
return 0.0
# Attack frequency component
attack_rate = len(attacks) / len(inferences) if inferences else 0
# Attack success component
success_rate = self._calculate_attack_success_rate(attacks)
# Confidence drop component
avg_drop = sum(a["confidence_drop"] for a in attacks) / len(attacks) if attacks else 0
# Composite score
threat_score = (attack_rate * 40 + success_rate * 30 + avg_drop * 30)
return min(100.0, threat_score * 100)
def _get_top_attack_types(self, attacks: List[Dict]) -> List[Dict]:
"""Get top attack types by frequency"""
from collections import Counter
attack_types = Counter(a["attack_type"] for a in attacks)
return [
{"type": atype, "count": count}
for atype, count in attack_types.most_common(5)
]
def _cluster_attacks(self, attacks: List[Dict]) -> List[Dict]:
"""Cluster similar attacks using feature vectors"""
if len(attacks) < 2:
return []
# Extract feature vectors
features = []
for attack in attacks:
sig = attack["input_signature"]
stats = sig.get("feature_stats", {})
feat = [
stats.get("mean", 0),
stats.get("std", 0),
stats.get("skew", 0),
stats.get("kurtosis", 0)
]
features.append(feat)
# Cluster using DBSCAN
try:
features_array = np.array(features)
clustering = DBSCAN(eps=0.5, min_samples=2).fit(features_array)
clusters = defaultdict(list)
for idx, label in enumerate(clustering.labels_):
if label != -1: # -1 means noise in DBSCAN
clusters[label].append(attacks[idx]["attack_type"])
return [
{"cluster_id": cid, "attack_types": list(set(types)), "size": len(types)}
for cid, types in clusters.items()
]
except:
return []
def _analyze_confidence_trends(self, inferences: List[Dict]) -> Dict:
"""Analyze confidence trends over time"""
if not inferences:
return {}
# Group by hour
hourly_confidences = defaultdict(list)
for inf in inferences:
dt = datetime.fromisoformat(inf["timestamp"])
hour_key = dt.strftime("%Y-%m-%d %H:00")
hourly_confidences[hour_key].append(inf["prediction_confidence"])
# Calculate hourly averages
hourly_avg = {
hour: sum(confs) / len(confs)
for hour, confs in hourly_confidences.items()
}
return {
"hourly_averages": hourly_avg,
"overall_average": sum(inf["prediction_confidence"] for inf in inferences) / len(inferences),
"confidence_volatility": np.std([inf["prediction_confidence"] for inf in inferences]) if inferences else 0
}
def _analyze_temporal_patterns(self, attacks: List[Dict]) -> Dict:
"""Analyze temporal patterns in attacks"""
if not attacks:
return {}
# Attacks by hour of day
hourly_counts = defaultdict(int)
for attack in attacks:
dt = datetime.fromisoformat(attack["timestamp"])
hour = dt.hour
hourly_counts[hour] += 1
return {
"attacks_by_hour": dict(hourly_counts),
"peak_attack_hour": max(hourly_counts.items(), key=lambda x: x[1])[0] if hourly_counts else None,
"total_attack_period_hours": len(set(
datetime.fromisoformat(a["timestamp"]).strftime("%Y-%m-%d %H")
for a in attacks
))
}
def _generate_recommendations(self, attacks: List[Dict]) -> List[str]:
"""Generate actionable recommendations"""
recommendations = []
if not attacks:
recommendations.append("No attacks detected in timeframe. Maintain current security posture.")
return recommendations
# Analyze attack patterns
fgsm_count = sum(1 for a in attacks if a["attack_type"] == "FGSM")
pgd_count = sum(1 for a in attacks if a["attack_type"] == "PGD")
cw_count = sum(1 for a in attacks if a["attack_type"] == "C&W")
if fgsm_count > 0:
recommendations.append(
f"FGSM attacks detected ({fgsm_count} occurrences). "
"Consider implementing gradient masking or input preprocessing."
)
if pgd_count > 0:
recommendations.append(
f"PGD attacks detected ({pgd_count} occurrences). "
"Consider adversarial training or certified defenses."
)
if cw_count > 0:
recommendations.append(
f"C&W attacks detected ({cw_count} occurrences). "
"High sophistication attack. Consider ensemble defenses or detection-based approaches."
)
# Check success rate
success_rate = self._calculate_attack_success_rate(attacks)
if success_rate > 0.3:
recommendations.append(
f"High attack success rate ({success_rate:.1%}). "
"Immediate model retraining with adversarial examples recommended."
)
# Check confidence drops
avg_drop = sum(a["confidence_drop"] for a in attacks) / len(attacks)
if avg_drop > 0.4:
recommendations.append(
f"Large confidence drops detected (average {avg_drop:.1%}). "
"Review model calibration and consider confidence threshold adjustments."
)
return recommendations
def _load_telemetry(self):
"""Load telemetry from disk"""
telemetry_file = self.telemetry_dir / "telemetry.json"
if telemetry_file.exists():
try:
with open(telemetry_file, "r") as f:
data = json.load(f)
self.inference_log = data.get("inference_log", [])
self.attack_patterns = data.get("attack_patterns", [])
self.threat_signatures = data.get("threat_signatures", {})
except:
pass
def _save_telemetry(self):
"""Save telemetry to disk"""
telemetry_file = self.telemetry_dir / "telemetry.json"
data = {
"inference_log": self.inference_log[-10000:], # Keep last 10k
"attack_patterns": self.attack_patterns,
"threat_signatures": self.threat_signatures,
"last_updated": datetime.now().isoformat()
}
with open(telemetry_file, "w") as f:
json.dump(data, f, indent=2)