Spaces:
Sleeping
Sleeping
File size: 9,530 Bytes
d2173d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
"""
Anomaly Detection Agent - Detects unusual patterns in sensor data
"""
import numpy as np
import sys
from pathlib import Path
# Add parent directory to path
sys.path.append(str(Path(__file__).parent.parent))
from models.anomaly_detector import AnomalyDetectionModel
from typing import Dict, List, Tuple
class AnomalyDetectionAgent:
"""
Agent responsible for detecting anomalies in vehicle sensor data
"""
def __init__(self, model_path='src/models/best_anomaly_detector.pth', threshold=0.5):
self.model_path = Path(model_path)
self.threshold = threshold
self.model = None
self._load_model()
def _load_model(self):
"""Load the trained anomaly detection model"""
if self.model_path.exists():
# Get input size from model file
import torch
checkpoint = torch.load(self.model_path, map_location='cpu')
input_size = checkpoint['input_size']
sequence_length = checkpoint['sequence_length']
self.model = AnomalyDetectionModel(input_size, sequence_length)
self.model.load(self.model_path)
print(f"✓ Loaded anomaly detection model from {self.model_path}")
else:
print(f"⚠ Model not found at {self.model_path}. Using rule-based detection.")
self.model = None
def detect_anomalies_ml(self, features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Detect anomalies using ML model
Args:
features: Feature array of shape (n_samples, n_features)
Returns:
Tuple of (anomaly_scores, anomaly_predictions)
"""
if self.model is None:
raise ValueError("ML model not loaded")
scores, predictions = self.model.predict(features)
return scores, predictions
def detect_anomalies_rules(self, raw_data) -> np.ndarray:
"""
Detect anomalies using rule-based approach (fallback)
Args:
raw_data: DataFrame with raw sensor data
Returns:
Array of anomaly predictions
"""
anomalies = np.zeros(len(raw_data), dtype=int)
# Rule 1: Engine overheating
if 'engine_temp' in raw_data.columns:
anomalies |= (raw_data['engine_temp'] > 2.0).astype(int) # Normalized threshold
# Rule 2: Low oil pressure
if 'oil_pressure' in raw_data.columns:
anomalies |= (raw_data['oil_pressure'] < -1.5).astype(int)
# Rule 3: Battery issues
if 'battery_voltage' in raw_data.columns:
anomalies |= (raw_data['battery_voltage'] < -1.0).astype(int)
# Rule 4: High vibration
if 'vibration_level' in raw_data.columns:
anomalies |= (raw_data['vibration_level'] > 2.0).astype(int)
# Rule 5: Tire pressure issues
tire_cols = [col for col in raw_data.columns if 'tire_pressure' in col]
if tire_cols:
for col in tire_cols:
anomalies |= (raw_data[col] < -1.5).astype(int)
return anomalies
def identify_anomalous_sensors(self, raw_data, anomaly_indices: List[int]) -> Dict:
"""
Identify which sensors are showing anomalous behavior
Args:
raw_data: DataFrame with raw sensor data
anomaly_indices: Indices where anomalies were detected
Returns:
Dictionary mapping sensor names to anomaly information
"""
if len(anomaly_indices) == 0:
return {}
anomalous_data = raw_data.iloc[anomaly_indices]
sensor_cols = [col for col in raw_data.columns
if col not in ['vehicle_id', 'timestamp', 'anomaly']]
anomalous_sensors = {}
for col in sensor_cols:
# Check if this sensor shows unusual values
overall_mean = raw_data[col].mean()
overall_std = raw_data[col].std()
anomaly_mean = anomalous_data[col].mean()
# If anomaly mean is more than 2 std away from overall mean
if abs(anomaly_mean - overall_mean) > 2 * overall_std:
anomalous_sensors[col] = {
'overall_mean': float(overall_mean),
'anomaly_mean': float(anomaly_mean),
'deviation': float(abs(anomaly_mean - overall_mean) / overall_std),
'severity': 'high' if abs(anomaly_mean - overall_mean) > 3 * overall_std else 'medium'
}
return anomalous_sensors
def calculate_anomaly_score(self, predictions: np.ndarray, scores: np.ndarray = None) -> float:
"""
Calculate overall anomaly score for the vehicle
Args:
predictions: Binary anomaly predictions
scores: Optional continuous anomaly scores
Returns:
Overall anomaly score (0-1)
"""
if scores is not None:
return float(np.mean(scores))
else:
return float(np.mean(predictions))
def run(self, prepared_data: Dict) -> Dict:
"""
Main execution method for the Anomaly Detection Agent
Args:
prepared_data: Data prepared by Data Ingestion Agent
Returns:
Dictionary containing anomaly detection results
"""
print(f"\n{'='*60}")
print(f"ANOMALY DETECTION AGENT - Vehicle {prepared_data['vehicle_id']}")
print(f"{'='*60}")
features = prepared_data['features']
raw_data = prepared_data['raw_data']
# Detect anomalies
if self.model is not None:
print("Using ML-based anomaly detection...")
scores, predictions = self.detect_anomalies_ml(features)
# Pad predictions to match original length
padded_predictions = np.zeros(len(raw_data), dtype=int)
padded_predictions[-len(predictions):] = predictions
padded_scores = np.zeros(len(raw_data))
padded_scores[-len(scores):] = scores
else:
print("Using rule-based anomaly detection...")
padded_predictions = self.detect_anomalies_rules(raw_data)
padded_scores = padded_predictions.astype(float)
# Find anomaly indices
anomaly_indices = np.where(padded_predictions == 1)[0].tolist()
num_anomalies = len(anomaly_indices)
print(f"✓ Detected {num_anomalies} anomalous readings out of {len(raw_data)}")
print(f" Anomaly rate: {num_anomalies/len(raw_data):.2%}")
# Calculate overall anomaly score
overall_score = self.calculate_anomaly_score(padded_predictions, padded_scores)
print(f" Overall anomaly score: {overall_score:.3f}")
# Identify anomalous sensors
anomalous_sensors = {}
if num_anomalies > 0:
anomalous_sensors = self.identify_anomalous_sensors(raw_data, anomaly_indices)
print(f"✓ Identified {len(anomalous_sensors)} sensors with anomalous behavior")
if anomalous_sensors:
print(" Top anomalous sensors:")
sorted_sensors = sorted(anomalous_sensors.items(),
key=lambda x: x[1]['deviation'],
reverse=True)
for sensor, info in sorted_sensors[:3]:
print(f" - {sensor}: {info['severity']} severity (deviation: {info['deviation']:.2f}σ)")
# Compare with ground truth if available
if prepared_data['ground_truth'] is not None:
ground_truth = prepared_data['ground_truth']
accuracy = (padded_predictions == ground_truth).mean()
print(f" Accuracy vs ground truth: {accuracy:.2%}")
print(f"{'='*60}\n")
result = {
'vehicle_id': prepared_data['vehicle_id'],
'anomaly_detected': num_anomalies > 0,
'num_anomalies': num_anomalies,
'anomaly_rate': num_anomalies / len(raw_data),
'overall_score': overall_score,
'anomaly_indices': anomaly_indices,
'anomaly_predictions': padded_predictions,
'anomaly_scores': padded_scores,
'anomalous_sensors': anomalous_sensors,
'timestamps': prepared_data['timestamps'],
'raw_data': raw_data
}
return result
if __name__ == '__main__':
# Test the Anomaly Detection Agent
from data_ingestion_agent import DataIngestionAgent
# Load data
ingestion_agent = DataIngestionAgent()
test_df = ingestion_agent.load_test_data()
test_vehicle_id = test_df['vehicle_id'].iloc[0]
# Prepare data
prepared_data = ingestion_agent.run(test_vehicle_id, n_readings=200)
# Detect anomalies
detection_agent = AnomalyDetectionAgent()
result = detection_agent.run(prepared_data)
print(f"\nAnomaly Detection Summary:")
print(f" Anomalies detected: {result['anomaly_detected']}")
print(f" Overall score: {result['overall_score']:.3f}")
print(f" Anomalous sensors: {len(result['anomalous_sensors'])}")
|