Spaces:
Sleeping
Sleeping
Prepare project for Hugging Face Space deployment - Add app.py with Gradio interface - Update requirements.txt with torch dependencies - Configure LFS for large files (models, data) - Update README with comprehensive documentation
d2173d1
| """ | |
| Anomaly Detection Agent - Detects unusual patterns in sensor data | |
| """ | |
| import numpy as np | |
| import sys | |
| from pathlib import Path | |
| # Add parent directory to path | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| from models.anomaly_detector import AnomalyDetectionModel | |
| from typing import Dict, List, Tuple | |
| class AnomalyDetectionAgent: | |
| """ | |
| Agent responsible for detecting anomalies in vehicle sensor data | |
| """ | |
| def __init__(self, model_path='src/models/best_anomaly_detector.pth', threshold=0.5): | |
| self.model_path = Path(model_path) | |
| self.threshold = threshold | |
| self.model = None | |
| self._load_model() | |
| def _load_model(self): | |
| """Load the trained anomaly detection model""" | |
| if self.model_path.exists(): | |
| # Get input size from model file | |
| import torch | |
| checkpoint = torch.load(self.model_path, map_location='cpu') | |
| input_size = checkpoint['input_size'] | |
| sequence_length = checkpoint['sequence_length'] | |
| self.model = AnomalyDetectionModel(input_size, sequence_length) | |
| self.model.load(self.model_path) | |
| print(f"✓ Loaded anomaly detection model from {self.model_path}") | |
| else: | |
| print(f"⚠ Model not found at {self.model_path}. Using rule-based detection.") | |
| self.model = None | |
| def detect_anomalies_ml(self, features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Detect anomalies using ML model | |
| Args: | |
| features: Feature array of shape (n_samples, n_features) | |
| Returns: | |
| Tuple of (anomaly_scores, anomaly_predictions) | |
| """ | |
| if self.model is None: | |
| raise ValueError("ML model not loaded") | |
| scores, predictions = self.model.predict(features) | |
| return scores, predictions | |
| def detect_anomalies_rules(self, raw_data) -> np.ndarray: | |
| """ | |
| Detect anomalies using rule-based approach (fallback) | |
| Args: | |
| raw_data: DataFrame with raw sensor data | |
| Returns: | |
| Array of anomaly predictions | |
| """ | |
| anomalies = np.zeros(len(raw_data), dtype=int) | |
| # Rule 1: Engine overheating | |
| if 'engine_temp' in raw_data.columns: | |
| anomalies |= (raw_data['engine_temp'] > 2.0).astype(int) # Normalized threshold | |
| # Rule 2: Low oil pressure | |
| if 'oil_pressure' in raw_data.columns: | |
| anomalies |= (raw_data['oil_pressure'] < -1.5).astype(int) | |
| # Rule 3: Battery issues | |
| if 'battery_voltage' in raw_data.columns: | |
| anomalies |= (raw_data['battery_voltage'] < -1.0).astype(int) | |
| # Rule 4: High vibration | |
| if 'vibration_level' in raw_data.columns: | |
| anomalies |= (raw_data['vibration_level'] > 2.0).astype(int) | |
| # Rule 5: Tire pressure issues | |
| tire_cols = [col for col in raw_data.columns if 'tire_pressure' in col] | |
| if tire_cols: | |
| for col in tire_cols: | |
| anomalies |= (raw_data[col] < -1.5).astype(int) | |
| return anomalies | |
| def identify_anomalous_sensors(self, raw_data, anomaly_indices: List[int]) -> Dict: | |
| """ | |
| Identify which sensors are showing anomalous behavior | |
| Args: | |
| raw_data: DataFrame with raw sensor data | |
| anomaly_indices: Indices where anomalies were detected | |
| Returns: | |
| Dictionary mapping sensor names to anomaly information | |
| """ | |
| if len(anomaly_indices) == 0: | |
| return {} | |
| anomalous_data = raw_data.iloc[anomaly_indices] | |
| sensor_cols = [col for col in raw_data.columns | |
| if col not in ['vehicle_id', 'timestamp', 'anomaly']] | |
| anomalous_sensors = {} | |
| for col in sensor_cols: | |
| # Check if this sensor shows unusual values | |
| overall_mean = raw_data[col].mean() | |
| overall_std = raw_data[col].std() | |
| anomaly_mean = anomalous_data[col].mean() | |
| # If anomaly mean is more than 2 std away from overall mean | |
| if abs(anomaly_mean - overall_mean) > 2 * overall_std: | |
| anomalous_sensors[col] = { | |
| 'overall_mean': float(overall_mean), | |
| 'anomaly_mean': float(anomaly_mean), | |
| 'deviation': float(abs(anomaly_mean - overall_mean) / overall_std), | |
| 'severity': 'high' if abs(anomaly_mean - overall_mean) > 3 * overall_std else 'medium' | |
| } | |
| return anomalous_sensors | |
| def calculate_anomaly_score(self, predictions: np.ndarray, scores: np.ndarray = None) -> float: | |
| """ | |
| Calculate overall anomaly score for the vehicle | |
| Args: | |
| predictions: Binary anomaly predictions | |
| scores: Optional continuous anomaly scores | |
| Returns: | |
| Overall anomaly score (0-1) | |
| """ | |
| if scores is not None: | |
| return float(np.mean(scores)) | |
| else: | |
| return float(np.mean(predictions)) | |
| def run(self, prepared_data: Dict) -> Dict: | |
| """ | |
| Main execution method for the Anomaly Detection Agent | |
| Args: | |
| prepared_data: Data prepared by Data Ingestion Agent | |
| Returns: | |
| Dictionary containing anomaly detection results | |
| """ | |
| print(f"\n{'='*60}") | |
| print(f"ANOMALY DETECTION AGENT - Vehicle {prepared_data['vehicle_id']}") | |
| print(f"{'='*60}") | |
| features = prepared_data['features'] | |
| raw_data = prepared_data['raw_data'] | |
| # Detect anomalies | |
| if self.model is not None: | |
| print("Using ML-based anomaly detection...") | |
| scores, predictions = self.detect_anomalies_ml(features) | |
| # Pad predictions to match original length | |
| padded_predictions = np.zeros(len(raw_data), dtype=int) | |
| padded_predictions[-len(predictions):] = predictions | |
| padded_scores = np.zeros(len(raw_data)) | |
| padded_scores[-len(scores):] = scores | |
| else: | |
| print("Using rule-based anomaly detection...") | |
| padded_predictions = self.detect_anomalies_rules(raw_data) | |
| padded_scores = padded_predictions.astype(float) | |
| # Find anomaly indices | |
| anomaly_indices = np.where(padded_predictions == 1)[0].tolist() | |
| num_anomalies = len(anomaly_indices) | |
| print(f"✓ Detected {num_anomalies} anomalous readings out of {len(raw_data)}") | |
| print(f" Anomaly rate: {num_anomalies/len(raw_data):.2%}") | |
| # Calculate overall anomaly score | |
| overall_score = self.calculate_anomaly_score(padded_predictions, padded_scores) | |
| print(f" Overall anomaly score: {overall_score:.3f}") | |
| # Identify anomalous sensors | |
| anomalous_sensors = {} | |
| if num_anomalies > 0: | |
| anomalous_sensors = self.identify_anomalous_sensors(raw_data, anomaly_indices) | |
| print(f"✓ Identified {len(anomalous_sensors)} sensors with anomalous behavior") | |
| if anomalous_sensors: | |
| print(" Top anomalous sensors:") | |
| sorted_sensors = sorted(anomalous_sensors.items(), | |
| key=lambda x: x[1]['deviation'], | |
| reverse=True) | |
| for sensor, info in sorted_sensors[:3]: | |
| print(f" - {sensor}: {info['severity']} severity (deviation: {info['deviation']:.2f}σ)") | |
| # Compare with ground truth if available | |
| if prepared_data['ground_truth'] is not None: | |
| ground_truth = prepared_data['ground_truth'] | |
| accuracy = (padded_predictions == ground_truth).mean() | |
| print(f" Accuracy vs ground truth: {accuracy:.2%}") | |
| print(f"{'='*60}\n") | |
| result = { | |
| 'vehicle_id': prepared_data['vehicle_id'], | |
| 'anomaly_detected': num_anomalies > 0, | |
| 'num_anomalies': num_anomalies, | |
| 'anomaly_rate': num_anomalies / len(raw_data), | |
| 'overall_score': overall_score, | |
| 'anomaly_indices': anomaly_indices, | |
| 'anomaly_predictions': padded_predictions, | |
| 'anomaly_scores': padded_scores, | |
| 'anomalous_sensors': anomalous_sensors, | |
| 'timestamps': prepared_data['timestamps'], | |
| 'raw_data': raw_data | |
| } | |
| return result | |
| if __name__ == '__main__': | |
| # Test the Anomaly Detection Agent | |
| from data_ingestion_agent import DataIngestionAgent | |
| # Load data | |
| ingestion_agent = DataIngestionAgent() | |
| test_df = ingestion_agent.load_test_data() | |
| test_vehicle_id = test_df['vehicle_id'].iloc[0] | |
| # Prepare data | |
| prepared_data = ingestion_agent.run(test_vehicle_id, n_readings=200) | |
| # Detect anomalies | |
| detection_agent = AnomalyDetectionAgent() | |
| result = detection_agent.run(prepared_data) | |
| print(f"\nAnomaly Detection Summary:") | |
| print(f" Anomalies detected: {result['anomaly_detected']}") | |
| print(f" Overall score: {result['overall_score']:.3f}") | |
| print(f" Anomalous sensors: {len(result['anomalous_sensors'])}") | |