Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from typing import List, Dict, Optional, Tuple, Any | |
| from datetime import datetime, timedelta | |
| import logging | |
| from production_predictor import ProductionAnomalyDetector, AnomalyResult, GPSPoint | |
| import torch | |
| logger = logging.getLogger(__name__) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| class BatchAnomalyDetector(ProductionAnomalyDetector): | |
| """ | |
| Extended ProductionAnomalyDetector with batch processing capabilities | |
| Processes data as list of lists: [[id, lat, lng, azm, spd, alt], ...] | |
| """ | |
| def __init__(self, model_dir: str, config: Dict = None): | |
| super().__init__(model_dir, config) | |
| self.batch_results = [] | |
| def process_batch_list_of_lists(self, | |
| data: List[List], | |
| column_order: List[str] = None, | |
| sort_by_vehicle: bool = True, | |
| generate_timestamps: bool = True) -> Dict[str, Any]: | |
| """ | |
| Process batch data as list of lists | |
| Args: | |
| data: List of lists in format [[id, lat, lng, azm, spd, alt], ...] | |
| column_order: Order of columns if different from default | |
| sort_by_vehicle: Whether to sort by vehicle_id for proper sequence | |
| generate_timestamps: Whether to generate timestamps automatically | |
| Returns: | |
| Dictionary with batch processing results | |
| """ | |
| if column_order is None: | |
| column_order = ['vehicle_id', 'lat', 'lng', 'azm', 'spd', 'alt'] | |
| print(f"π Processing batch of {len(data)} GPS points...") | |
| # Convert list of lists to DataFrame | |
| df = pd.DataFrame(data, columns=column_order) | |
| # Rename to match your training format | |
| column_mapping = { | |
| 'vehicle_id': 'randomized_id', | |
| 'azm': 'azm', | |
| 'spd': 'spd', | |
| 'alt': 'alt', | |
| 'lat': 'lat', | |
| 'lng': 'lng' | |
| } | |
| # Apply column mapping if needed | |
| for old_col, new_col in column_mapping.items(): | |
| if old_col in df.columns and old_col != new_col: | |
| df = df.rename(columns={old_col: new_col}) | |
| # Ensure we have the right columns | |
| required_columns = ['randomized_id', 'lat', 'lng', 'alt', 'spd', 'azm'] | |
| missing_columns = [col for col in required_columns if col not in df.columns] | |
| if missing_columns: | |
| raise ValueError(f"Missing required columns: {missing_columns}") | |
| # Sort by vehicle and add sequence if requested | |
| if sort_by_vehicle: | |
| df = df.sort_values(['randomized_id', 'lat', 'lng']).reset_index(drop=True) | |
| # Generate timestamps if requested | |
| if generate_timestamps: | |
| df['timestamp'] = self._generate_timestamps(df) | |
| # Process batch | |
| return self._process_dataframe_batch(df) | |
| def process_batch_by_vehicle(self, | |
| data: List[List], | |
| column_order: List[str] = None, | |
| time_interval_seconds: int = 2) -> Dict[str, List[AnomalyResult]]: | |
| """ | |
| Process batch data vehicle by vehicle to maintain proper sequence | |
| Args: | |
| data: List of lists format | |
| column_order: Column order specification | |
| time_interval_seconds: Time interval between GPS points | |
| Returns: | |
| Dictionary with vehicle_id as key and list of results as value | |
| """ | |
| if column_order is None: | |
| column_order = ['vehicle_id', 'lat', 'lng', 'azm', 'spd', 'alt'] | |
| # Convert to DataFrame | |
| df = pd.DataFrame(data, columns=column_order) | |
| # Group by vehicle | |
| vehicle_results = {} | |
| total_anomalies = 0 | |
| print(f"π Processing {df['vehicle_id'].nunique()} vehicles with {len(df)} total points...") | |
| for vehicle_id in df['vehicle_id'].unique(): | |
| vehicle_data = df[df['vehicle_id'] == vehicle_id].copy() | |
| vehicle_data = vehicle_data.sort_values(['lat', 'lng']).reset_index(drop=True) | |
| print(f"\nπ Processing vehicle: {vehicle_id} ({len(vehicle_data)} points)") | |
| # Clear vehicle buffer to start fresh | |
| if vehicle_id in self.vehicle_buffers: | |
| del self.vehicle_buffers[vehicle_id] | |
| vehicle_results[vehicle_id] = [] | |
| vehicle_anomalies = 0 | |
| # Process points sequentially for this vehicle | |
| for idx, row in vehicle_data.iterrows(): | |
| timestamp = datetime.now() + timedelta(seconds=idx * time_interval_seconds) | |
| gps_point = GPSPoint( | |
| vehicle_id=vehicle_id, | |
| lat=row['lat'], | |
| lng=row['lng'], | |
| alt=row.get('alt', 0.0), | |
| spd=row.get('spd', 0.0), | |
| azm=row.get('azm', 0.0), | |
| timestamp=timestamp.isoformat() | |
| ) | |
| result = self.process_gps_point(gps_point) | |
| if result: | |
| vehicle_results[vehicle_id].append(result) | |
| if result.anomaly_detected: | |
| vehicle_anomalies += 1 | |
| total_anomalies += 1 | |
| # Print anomaly details | |
| print(f" π¨ Point {idx+1}: {result.alert_level} " | |
| f"(Speed: {result.driving_metrics['speed']:.1f} km/h, " | |
| f"Conf: {result.confidence:.3f})") | |
| print(f" Risk factors: {result.risk_factors}") | |
| detection_rate = vehicle_anomalies / len(vehicle_results[vehicle_id]) if vehicle_results[vehicle_id] else 0 | |
| print(f" π Vehicle summary: {vehicle_anomalies} anomalies out of {len(vehicle_results[vehicle_id])} detections ({detection_rate:.1%})") | |
| print(f"\nπ― Batch Summary:") | |
| print(f" Total vehicles: {len(vehicle_results)}") | |
| print(f" Total points processed: {len(df)}") | |
| print(f" Total anomalies detected: {total_anomalies}") | |
| print(f" Overall anomaly rate: {total_anomalies/len(df):.1%}") | |
| return vehicle_results | |
| def process_realtime_stream(self, data_stream: List[List], | |
| column_order: List[str] = None, | |
| delay_seconds: float = 2.0, | |
| callback_function = None) -> List[AnomalyResult]: | |
| """ | |
| Simulate real-time processing of list-of-lists data | |
| Args: | |
| data_stream: List of lists to process as real-time stream | |
| column_order: Column order | |
| delay_seconds: Delay between processing points (simulate real-time) | |
| callback_function: Function to call when anomaly is detected | |
| Returns: | |
| List of all detection results | |
| """ | |
| import time | |
| if column_order is None: | |
| column_order = ['vehicle_id', 'lat', 'lng', 'azm', 'spd', 'alt'] | |
| print(f"π΄ Starting real-time stream simulation with {len(data_stream)} points...") | |
| print(f"β±οΈ Processing delay: {delay_seconds} seconds between points") | |
| all_results = [] | |
| anomaly_count = 0 | |
| for i, point_data in enumerate(data_stream): | |
| # Convert list to GPSPoint | |
| point_dict = dict(zip(column_order, point_data)) | |
| gps_point = GPSPoint( | |
| vehicle_id=point_dict['vehicle_id'], | |
| lat=point_dict['lat'], | |
| lng=point_dict['lng'], | |
| alt=point_dict.get('alt', 0.0), | |
| spd=point_dict.get('spd', 0.0), | |
| azm=point_dict.get('azm', 0.0), | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| # Process point | |
| result = self.process_gps_point(gps_point) | |
| if result: | |
| all_results.append(result) | |
| # Print status | |
| status_icon = "π’" if result.alert_level == "NORMAL" else "π‘" if result.alert_level in ["LOW", "MEDIUM"] else "π΄" | |
| print(f"{status_icon} Point {i+1:3d}: {result.vehicle_id:12s} | " | |
| f"{result.alert_level:8s} | Speed: {result.driving_metrics['speed']:5.1f} km/h | " | |
| f"Conf: {result.confidence:.3f}") | |
| if result.anomaly_detected: | |
| anomaly_count += 1 | |
| print(f" π¨ ANOMALY DETECTED! {result.risk_factors}") | |
| # Call callback function if provided | |
| if callback_function: | |
| callback_function(result, gps_point) | |
| else: | |
| print(f"β³ Point {i+1:3d}: {point_dict['vehicle_id']:12s} | Building buffer...") | |
| # Simulate real-time delay | |
| if i < len(data_stream) - 1: # Don't delay after last point | |
| time.sleep(delay_seconds) | |
| print(f"\nπ Stream Complete:") | |
| print(f" Points processed: {len(data_stream)}") | |
| print(f" Detections made: {len(all_results)}") | |
| print(f" Anomalies found: {anomaly_count}") | |
| print(f" Anomaly rate: {anomaly_count/len(all_results)*100:.1f}%" if all_results else " No detections made") | |
| return all_results | |
| def _generate_timestamps(self, df: pd.DataFrame) -> List[str]: | |
| """Generate realistic timestamps for GPS data""" | |
| base_time = datetime.now() | |
| timestamps = [] | |
| for vehicle_id in df['randomized_id'].unique(): | |
| vehicle_mask = df['randomized_id'] == vehicle_id | |
| vehicle_count = vehicle_mask.sum() | |
| # Generate timestamps for this vehicle (2-second intervals) | |
| for i in range(vehicle_count): | |
| timestamp = base_time + timedelta(seconds=i * 2) | |
| timestamps.append(timestamp.isoformat()) | |
| return timestamps | |
| def _process_dataframe_batch(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """Process DataFrame using the existing feature pipeline""" | |
| # Use your exact feature engineering pipeline | |
| features_df = self._calculate_features_exact_pipeline(df) | |
| if len(features_df) == 0: | |
| return { | |
| "status": "error", | |
| "message": "No features could be calculated", | |
| "processed": 0, | |
| "anomalies": 0 | |
| } | |
| # Scale features | |
| features_scaled = self.scaler.transform(features_df) | |
| # Get anomaly scores for all points | |
| anomaly_results = [] | |
| print("π Running anomaly detection on all points...") | |
| for i in range(len(features_scaled)): | |
| point_scaled = features_scaled[i:i+1] | |
| # Get scores from all models | |
| scores = {} | |
| # Isolation Forest | |
| if self.isolation_forest: | |
| scores['isolation_forest'] = float(self.isolation_forest.decision_function(point_scaled)[0]) | |
| # One-Class SVM | |
| if self.one_class_svm: | |
| scores['one_class_svm'] = float(self.one_class_svm.decision_function(point_scaled)[0]) | |
| # LSTM (only if we have enough sequence data) | |
| if self.lstm_autoencoder and i >= self.config['lstm_sequence_length'] - 1: | |
| try: | |
| sequence_start = max(0, i - self.config['lstm_sequence_length'] + 1) | |
| sequence_features = features_scaled[sequence_start:i+1] | |
| if len(sequence_features) == self.config['lstm_sequence_length']: | |
| sequence_tensor = torch.FloatTensor(sequence_features).unsqueeze(0).to(device) | |
| with torch.no_grad(): | |
| reconstructed = self.lstm_autoencoder(sequence_tensor) | |
| reconstruction_error = torch.mean((sequence_tensor - reconstructed) ** 2).item() | |
| scores['lstm'] = float(reconstruction_error) | |
| except: | |
| scores['lstm'] = 0.0 | |
| # Calculate ensemble score | |
| ensemble_score = self._calculate_ensemble_score(scores) | |
| alert_level = self._get_alert_level(ensemble_score) | |
| # Extract metrics | |
| feature_row = features_df.iloc[i] | |
| driving_metrics = self._extract_driving_metrics_from_features(feature_row) | |
| risk_factors = self._extract_risk_factors_from_features(feature_row) | |
| anomaly_results.append({ | |
| 'index': i, | |
| 'vehicle_id': df.iloc[i]['randomized_id'], | |
| 'anomaly_detected': ensemble_score > self.config['alert_threshold'], | |
| 'confidence': ensemble_score, | |
| 'alert_level': alert_level, | |
| 'raw_scores': scores, | |
| 'driving_metrics': driving_metrics, | |
| 'risk_factors': risk_factors | |
| }) | |
| # Generate summary | |
| total_anomalies = sum(1 for r in anomaly_results if r['anomaly_detected']) | |
| return { | |
| "status": "completed", | |
| "processed": len(anomaly_results), | |
| "anomalies": total_anomalies, | |
| "anomaly_rate": total_anomalies / len(anomaly_results) if anomaly_results else 0, | |
| "results": anomaly_results, | |
| "summary": { | |
| "total_vehicles": df['randomized_id'].nunique(), | |
| "total_points": len(df), | |
| "detection_ready_points": len(anomaly_results), | |
| "anomalies_by_level": { | |
| level: sum(1 for r in anomaly_results if r['alert_level'] == level) | |
| for level in ['NORMAL', 'LOW', 'MEDIUM', 'HIGH', 'CRITICAL'] | |
| } | |
| } | |
| } | |
| # Example usage functions | |
| def example_list_of_lists_usage(): | |
| """Example of how to use the batch processor with list of lists""" | |
| print("π Example: Processing List of Lists Data") | |
| print("=" * 50) | |
| # Initialize batch detector | |
| detector = BatchAnomalyDetector("/kaggle/working/anomaly_analysis_pytorch_fixed/models") | |
| # Sample data as list of lists: [vehicle_id, lat, lng, azm, spd, alt] | |
| sample_data = [ | |
| # Normal driving for vehicle_001 | |
| ["vehicle_001", 55.7558, 37.6176, 90.0, 45.0, 156.0], | |
| ["vehicle_001", 55.7559, 37.6177, 92.0, 47.0, 157.0], | |
| ["vehicle_001", 55.7560, 37.6178, 94.0, 46.0, 158.0], | |
| ["vehicle_001", 55.7561, 37.6179, 96.0, 48.0, 159.0], | |
| ["vehicle_001", 55.7562, 37.6180, 98.0, 49.0, 160.0], | |
| # Aggressive driving for vehicle_002 | |
| ["vehicle_002", 55.7600, 37.6200, 180.0, 70.0, 150.0], | |
| ["vehicle_002", 55.7601, 37.6201, 182.0, 125.0, 151.0], # Speeding | |
| ["vehicle_002", 55.7602, 37.6202, 184.0, 15.0, 152.0], # Hard braking | |
| ["vehicle_002", 55.7603, 37.6203, 250.0, 55.0, 153.0], # Sharp turn | |
| # Mixed behavior for vehicle_003 | |
| ["vehicle_003", 55.7700, 37.6300, 45.0, 40.0, 145.0], | |
| ["vehicle_003", 55.7701, 37.6301, 47.0, 42.0, 146.0], | |
| ["vehicle_003", 55.7702, 37.6302, 49.0, 110.0, 147.0], # Speed violation | |
| ["vehicle_003", 55.7703, 37.6303, 51.0, 43.0, 148.0], | |
| ] | |
| print(f"Processing {len(sample_data)} GPS points from {len(set(row[0] for row in sample_data))} vehicles...") | |
| # Method 1: Process as batch | |
| print("\nπ Method 1: Batch Processing") | |
| batch_results = detector.process_batch_list_of_lists(sample_data) | |
| print(f"Batch Results:") | |
| print(f" Status: {batch_results['status']}") | |
| print(f" Points processed: {batch_results['processed']}") | |
| print(f" Anomalies detected: {batch_results['anomalies']}") | |
| print(f" Anomaly rate: {batch_results['anomaly_rate']:.1%}") | |
| # Method 2: Process by vehicle | |
| print("\nπ Method 2: Vehicle-by-Vehicle Processing") | |
| vehicle_results = detector.process_batch_by_vehicle(sample_data) | |
| for vehicle_id, results in vehicle_results.items(): | |
| anomaly_count = sum(1 for r in results if r.anomaly_detected) | |
| print(f" {vehicle_id}: {anomaly_count} anomalies out of {len(results)} detections") | |
| # Method 3: Real-time simulation | |
| print("\nπ΄ Method 3: Real-time Stream Simulation (first 8 points)") | |
| def anomaly_callback(result, gps_point): | |
| """Callback function for when anomaly is detected""" | |
| print(f" π§ ALERT SENT: {result.vehicle_id} - {result.alert_level}") | |
| stream_results = detector.process_realtime_stream( | |
| sample_data[:8], # First 8 points | |
| delay_seconds=0.5, # Faster for demo | |
| callback_function=anomaly_callback | |
| ) | |
| def load_from_csv_example(): | |
| """Example of loading data from CSV and converting to list of lists""" | |
| print("\nπ Example: Loading from CSV") | |
| print("=" * 50) | |
| # Simulate CSV loading (you would use pd.read_csv('your_file.csv')) | |
| csv_data = """vehicle_id,lat,lng,azm,spd,alt | |
| vehicle_001,55.7558,37.6176,90.0,45.0,156.0 | |
| vehicle_001,55.7559,37.6177,92.0,47.0,157.0 | |
| vehicle_002,55.7600,37.6200,180.0,125.0,150.0 | |
| vehicle_002,55.7601,37.6201,182.0,15.0,151.0""" | |
| # Convert CSV to list of lists | |
| from io import StringIO | |
| df = pd.read_csv(StringIO(csv_data)) | |
| # Convert DataFrame to list of lists | |
| data_as_lists = df.values.tolist() | |
| print(f"Loaded {len(data_as_lists)} rows from CSV") | |
| print(f"Column order: {df.columns.tolist()}") | |
| print(f"Sample data: {data_as_lists[0]}") | |
| # Process with detector | |
| detector = BatchAnomalyDetector("/kaggle/working/anomaly_analysis_pytorch_fixed/models") | |
| results = detector.process_batch_list_of_lists( | |
| data_as_lists, | |
| column_order=df.columns.tolist() | |
| ) | |
| print(f"Processing complete: {results['anomalies']} anomalies detected") | |
| def large_dataset_example(): | |
| """Example for processing large datasets efficiently""" | |
| print("\nπ’ Example: Large Dataset Processing") | |
| print("=" * 50) | |
| # Simulate large dataset | |
| np.random.seed(42) | |
| large_data = [] | |
| vehicles = [f"vehicle_{i:03d}" for i in range(1, 11)] # 10 vehicles | |
| for vehicle in vehicles: | |
| for point in range(100): # 100 points per vehicle | |
| lat = 55.7500 + np.random.uniform(-0.01, 0.01) | |
| lng = 37.6000 + np.random.uniform(-0.01, 0.01) | |
| azm = np.random.uniform(0, 360) | |
| spd = np.random.uniform(20, 80) if np.random.random() > 0.1 else np.random.uniform(90, 140) # 10% aggressive | |
| alt = 150 + np.random.uniform(-20, 20) | |
| large_data.append([vehicle, lat, lng, azm, spd, alt]) | |
| print(f"Generated large dataset: {len(large_data)} points from {len(vehicles)} vehicles") | |
| # Process efficiently | |
| detector = BatchAnomalyDetector("/kaggle/working/anomaly_analysis_pytorch_fixed/models") | |
| # Process in chunks for memory efficiency | |
| chunk_size = 500 | |
| total_anomalies = 0 | |
| for i in range(0, len(large_data), chunk_size): | |
| chunk = large_data[i:i + chunk_size] | |
| print(f"Processing chunk {i//chunk_size + 1}: points {i+1}-{i+len(chunk)}") | |
| results = detector.process_batch_list_of_lists(chunk) | |
| total_anomalies += results['anomalies'] | |
| print(f" Chunk anomalies: {results['anomalies']}") | |
| print(f"\nLarge dataset complete:") | |
| print(f" Total points: {len(large_data)}") | |
| print(f" Total anomalies: {total_anomalies}") | |
| print(f" Overall anomaly rate: {total_anomalies/len(large_data):.1%}") |