import numpy as np import pandas as pd from typing import List, Dict, Optional, Tuple, Any from datetime import datetime, timedelta import logging from production_predictor import ProductionAnomalyDetector, AnomalyResult, GPSPoint import torch logger = logging.getLogger(__name__) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") class BatchAnomalyDetector(ProductionAnomalyDetector): """ Extended ProductionAnomalyDetector with batch processing capabilities Processes data as list of lists: [[id, lat, lng, azm, spd, alt], ...] """ def __init__(self, model_dir: str, config: Dict = None): super().__init__(model_dir, config) self.batch_results = [] def process_batch_list_of_lists(self, data: List[List], column_order: List[str] = None, sort_by_vehicle: bool = True, generate_timestamps: bool = True) -> Dict[str, Any]: """ Process batch data as list of lists Args: data: List of lists in format [[id, lat, lng, azm, spd, alt], ...] column_order: Order of columns if different from default sort_by_vehicle: Whether to sort by vehicle_id for proper sequence generate_timestamps: Whether to generate timestamps automatically Returns: Dictionary with batch processing results """ if column_order is None: column_order = ['vehicle_id', 'lat', 'lng', 'azm', 'spd', 'alt'] print(f"šŸ”„ Processing batch of {len(data)} GPS points...") # Convert list of lists to DataFrame df = pd.DataFrame(data, columns=column_order) # Rename to match your training format column_mapping = { 'vehicle_id': 'randomized_id', 'azm': 'azm', 'spd': 'spd', 'alt': 'alt', 'lat': 'lat', 'lng': 'lng' } # Apply column mapping if needed for old_col, new_col in column_mapping.items(): if old_col in df.columns and old_col != new_col: df = df.rename(columns={old_col: new_col}) # Ensure we have the right columns required_columns = ['randomized_id', 'lat', 'lng', 'alt', 'spd', 'azm'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: raise ValueError(f"Missing required columns: {missing_columns}") # Sort by vehicle and add sequence if requested if sort_by_vehicle: df = df.sort_values(['randomized_id', 'lat', 'lng']).reset_index(drop=True) # Generate timestamps if requested if generate_timestamps: df['timestamp'] = self._generate_timestamps(df) # Process batch return self._process_dataframe_batch(df) def process_batch_by_vehicle(self, data: List[List], column_order: List[str] = None, time_interval_seconds: int = 2) -> Dict[str, List[AnomalyResult]]: """ Process batch data vehicle by vehicle to maintain proper sequence Args: data: List of lists format column_order: Column order specification time_interval_seconds: Time interval between GPS points Returns: Dictionary with vehicle_id as key and list of results as value """ if column_order is None: column_order = ['vehicle_id', 'lat', 'lng', 'azm', 'spd', 'alt'] # Convert to DataFrame df = pd.DataFrame(data, columns=column_order) # Group by vehicle vehicle_results = {} total_anomalies = 0 print(f"šŸš› Processing {df['vehicle_id'].nunique()} vehicles with {len(df)} total points...") for vehicle_id in df['vehicle_id'].unique(): vehicle_data = df[df['vehicle_id'] == vehicle_id].copy() vehicle_data = vehicle_data.sort_values(['lat', 'lng']).reset_index(drop=True) print(f"\nšŸ“ Processing vehicle: {vehicle_id} ({len(vehicle_data)} points)") # Clear vehicle buffer to start fresh if vehicle_id in self.vehicle_buffers: del self.vehicle_buffers[vehicle_id] vehicle_results[vehicle_id] = [] vehicle_anomalies = 0 # Process points sequentially for this vehicle for idx, row in vehicle_data.iterrows(): timestamp = datetime.now() + timedelta(seconds=idx * time_interval_seconds) gps_point = GPSPoint( vehicle_id=vehicle_id, lat=row['lat'], lng=row['lng'], alt=row.get('alt', 0.0), spd=row.get('spd', 0.0), azm=row.get('azm', 0.0), timestamp=timestamp.isoformat() ) result = self.process_gps_point(gps_point) if result: vehicle_results[vehicle_id].append(result) if result.anomaly_detected: vehicle_anomalies += 1 total_anomalies += 1 # Print anomaly details print(f" 🚨 Point {idx+1}: {result.alert_level} " f"(Speed: {result.driving_metrics['speed']:.1f} km/h, " f"Conf: {result.confidence:.3f})") print(f" Risk factors: {result.risk_factors}") detection_rate = vehicle_anomalies / len(vehicle_results[vehicle_id]) if vehicle_results[vehicle_id] else 0 print(f" šŸ“Š Vehicle summary: {vehicle_anomalies} anomalies out of {len(vehicle_results[vehicle_id])} detections ({detection_rate:.1%})") print(f"\nšŸŽÆ Batch Summary:") print(f" Total vehicles: {len(vehicle_results)}") print(f" Total points processed: {len(df)}") print(f" Total anomalies detected: {total_anomalies}") print(f" Overall anomaly rate: {total_anomalies/len(df):.1%}") return vehicle_results def process_realtime_stream(self, data_stream: List[List], column_order: List[str] = None, delay_seconds: float = 2.0, callback_function = None) -> List[AnomalyResult]: """ Simulate real-time processing of list-of-lists data Args: data_stream: List of lists to process as real-time stream column_order: Column order delay_seconds: Delay between processing points (simulate real-time) callback_function: Function to call when anomaly is detected Returns: List of all detection results """ import time if column_order is None: column_order = ['vehicle_id', 'lat', 'lng', 'azm', 'spd', 'alt'] print(f"šŸ”“ Starting real-time stream simulation with {len(data_stream)} points...") print(f"ā±ļø Processing delay: {delay_seconds} seconds between points") all_results = [] anomaly_count = 0 for i, point_data in enumerate(data_stream): # Convert list to GPSPoint point_dict = dict(zip(column_order, point_data)) gps_point = GPSPoint( vehicle_id=point_dict['vehicle_id'], lat=point_dict['lat'], lng=point_dict['lng'], alt=point_dict.get('alt', 0.0), spd=point_dict.get('spd', 0.0), azm=point_dict.get('azm', 0.0), timestamp=datetime.now().isoformat() ) # Process point result = self.process_gps_point(gps_point) if result: all_results.append(result) # Print status status_icon = "🟢" if result.alert_level == "NORMAL" else "🟔" if result.alert_level in ["LOW", "MEDIUM"] else "šŸ”“" print(f"{status_icon} Point {i+1:3d}: {result.vehicle_id:12s} | " f"{result.alert_level:8s} | Speed: {result.driving_metrics['speed']:5.1f} km/h | " f"Conf: {result.confidence:.3f}") if result.anomaly_detected: anomaly_count += 1 print(f" 🚨 ANOMALY DETECTED! {result.risk_factors}") # Call callback function if provided if callback_function: callback_function(result, gps_point) else: print(f"ā³ Point {i+1:3d}: {point_dict['vehicle_id']:12s} | Building buffer...") # Simulate real-time delay if i < len(data_stream) - 1: # Don't delay after last point time.sleep(delay_seconds) print(f"\nšŸ“Š Stream Complete:") print(f" Points processed: {len(data_stream)}") print(f" Detections made: {len(all_results)}") print(f" Anomalies found: {anomaly_count}") print(f" Anomaly rate: {anomaly_count/len(all_results)*100:.1f}%" if all_results else " No detections made") return all_results def _generate_timestamps(self, df: pd.DataFrame) -> List[str]: """Generate realistic timestamps for GPS data""" base_time = datetime.now() timestamps = [] for vehicle_id in df['randomized_id'].unique(): vehicle_mask = df['randomized_id'] == vehicle_id vehicle_count = vehicle_mask.sum() # Generate timestamps for this vehicle (2-second intervals) for i in range(vehicle_count): timestamp = base_time + timedelta(seconds=i * 2) timestamps.append(timestamp.isoformat()) return timestamps def _process_dataframe_batch(self, df: pd.DataFrame) -> Dict[str, Any]: """Process DataFrame using the existing feature pipeline""" # Use your exact feature engineering pipeline features_df = self._calculate_features_exact_pipeline(df) if len(features_df) == 0: return { "status": "error", "message": "No features could be calculated", "processed": 0, "anomalies": 0 } # Scale features features_scaled = self.scaler.transform(features_df) # Get anomaly scores for all points anomaly_results = [] print("šŸ” Running anomaly detection on all points...") for i in range(len(features_scaled)): point_scaled = features_scaled[i:i+1] # Get scores from all models scores = {} # Isolation Forest if self.isolation_forest: scores['isolation_forest'] = float(self.isolation_forest.decision_function(point_scaled)[0]) # One-Class SVM if self.one_class_svm: scores['one_class_svm'] = float(self.one_class_svm.decision_function(point_scaled)[0]) # LSTM (only if we have enough sequence data) if self.lstm_autoencoder and i >= self.config['lstm_sequence_length'] - 1: try: sequence_start = max(0, i - self.config['lstm_sequence_length'] + 1) sequence_features = features_scaled[sequence_start:i+1] if len(sequence_features) == self.config['lstm_sequence_length']: sequence_tensor = torch.FloatTensor(sequence_features).unsqueeze(0).to(device) with torch.no_grad(): reconstructed = self.lstm_autoencoder(sequence_tensor) reconstruction_error = torch.mean((sequence_tensor - reconstructed) ** 2).item() scores['lstm'] = float(reconstruction_error) except: scores['lstm'] = 0.0 # Calculate ensemble score ensemble_score = self._calculate_ensemble_score(scores) alert_level = self._get_alert_level(ensemble_score) # Extract metrics feature_row = features_df.iloc[i] driving_metrics = self._extract_driving_metrics_from_features(feature_row) risk_factors = self._extract_risk_factors_from_features(feature_row) anomaly_results.append({ 'index': i, 'vehicle_id': df.iloc[i]['randomized_id'], 'anomaly_detected': ensemble_score > self.config['alert_threshold'], 'confidence': ensemble_score, 'alert_level': alert_level, 'raw_scores': scores, 'driving_metrics': driving_metrics, 'risk_factors': risk_factors }) # Generate summary total_anomalies = sum(1 for r in anomaly_results if r['anomaly_detected']) return { "status": "completed", "processed": len(anomaly_results), "anomalies": total_anomalies, "anomaly_rate": total_anomalies / len(anomaly_results) if anomaly_results else 0, "results": anomaly_results, "summary": { "total_vehicles": df['randomized_id'].nunique(), "total_points": len(df), "detection_ready_points": len(anomaly_results), "anomalies_by_level": { level: sum(1 for r in anomaly_results if r['alert_level'] == level) for level in ['NORMAL', 'LOW', 'MEDIUM', 'HIGH', 'CRITICAL'] } } } # Example usage functions def example_list_of_lists_usage(): """Example of how to use the batch processor with list of lists""" print("šŸ”„ Example: Processing List of Lists Data") print("=" * 50) # Initialize batch detector detector = BatchAnomalyDetector("/kaggle/working/anomaly_analysis_pytorch_fixed/models") # Sample data as list of lists: [vehicle_id, lat, lng, azm, spd, alt] sample_data = [ # Normal driving for vehicle_001 ["vehicle_001", 55.7558, 37.6176, 90.0, 45.0, 156.0], ["vehicle_001", 55.7559, 37.6177, 92.0, 47.0, 157.0], ["vehicle_001", 55.7560, 37.6178, 94.0, 46.0, 158.0], ["vehicle_001", 55.7561, 37.6179, 96.0, 48.0, 159.0], ["vehicle_001", 55.7562, 37.6180, 98.0, 49.0, 160.0], # Aggressive driving for vehicle_002 ["vehicle_002", 55.7600, 37.6200, 180.0, 70.0, 150.0], ["vehicle_002", 55.7601, 37.6201, 182.0, 125.0, 151.0], # Speeding ["vehicle_002", 55.7602, 37.6202, 184.0, 15.0, 152.0], # Hard braking ["vehicle_002", 55.7603, 37.6203, 250.0, 55.0, 153.0], # Sharp turn # Mixed behavior for vehicle_003 ["vehicle_003", 55.7700, 37.6300, 45.0, 40.0, 145.0], ["vehicle_003", 55.7701, 37.6301, 47.0, 42.0, 146.0], ["vehicle_003", 55.7702, 37.6302, 49.0, 110.0, 147.0], # Speed violation ["vehicle_003", 55.7703, 37.6303, 51.0, 43.0, 148.0], ] print(f"Processing {len(sample_data)} GPS points from {len(set(row[0] for row in sample_data))} vehicles...") # Method 1: Process as batch print("\nšŸ“Š Method 1: Batch Processing") batch_results = detector.process_batch_list_of_lists(sample_data) print(f"Batch Results:") print(f" Status: {batch_results['status']}") print(f" Points processed: {batch_results['processed']}") print(f" Anomalies detected: {batch_results['anomalies']}") print(f" Anomaly rate: {batch_results['anomaly_rate']:.1%}") # Method 2: Process by vehicle print("\nšŸš› Method 2: Vehicle-by-Vehicle Processing") vehicle_results = detector.process_batch_by_vehicle(sample_data) for vehicle_id, results in vehicle_results.items(): anomaly_count = sum(1 for r in results if r.anomaly_detected) print(f" {vehicle_id}: {anomaly_count} anomalies out of {len(results)} detections") # Method 3: Real-time simulation print("\nšŸ”“ Method 3: Real-time Stream Simulation (first 8 points)") def anomaly_callback(result, gps_point): """Callback function for when anomaly is detected""" print(f" šŸ“§ ALERT SENT: {result.vehicle_id} - {result.alert_level}") stream_results = detector.process_realtime_stream( sample_data[:8], # First 8 points delay_seconds=0.5, # Faster for demo callback_function=anomaly_callback ) def load_from_csv_example(): """Example of loading data from CSV and converting to list of lists""" print("\nšŸ“ Example: Loading from CSV") print("=" * 50) # Simulate CSV loading (you would use pd.read_csv('your_file.csv')) csv_data = """vehicle_id,lat,lng,azm,spd,alt vehicle_001,55.7558,37.6176,90.0,45.0,156.0 vehicle_001,55.7559,37.6177,92.0,47.0,157.0 vehicle_002,55.7600,37.6200,180.0,125.0,150.0 vehicle_002,55.7601,37.6201,182.0,15.0,151.0""" # Convert CSV to list of lists from io import StringIO df = pd.read_csv(StringIO(csv_data)) # Convert DataFrame to list of lists data_as_lists = df.values.tolist() print(f"Loaded {len(data_as_lists)} rows from CSV") print(f"Column order: {df.columns.tolist()}") print(f"Sample data: {data_as_lists[0]}") # Process with detector detector = BatchAnomalyDetector("/kaggle/working/anomaly_analysis_pytorch_fixed/models") results = detector.process_batch_list_of_lists( data_as_lists, column_order=df.columns.tolist() ) print(f"Processing complete: {results['anomalies']} anomalies detected") def large_dataset_example(): """Example for processing large datasets efficiently""" print("\nšŸ”¢ Example: Large Dataset Processing") print("=" * 50) # Simulate large dataset np.random.seed(42) large_data = [] vehicles = [f"vehicle_{i:03d}" for i in range(1, 11)] # 10 vehicles for vehicle in vehicles: for point in range(100): # 100 points per vehicle lat = 55.7500 + np.random.uniform(-0.01, 0.01) lng = 37.6000 + np.random.uniform(-0.01, 0.01) azm = np.random.uniform(0, 360) spd = np.random.uniform(20, 80) if np.random.random() > 0.1 else np.random.uniform(90, 140) # 10% aggressive alt = 150 + np.random.uniform(-20, 20) large_data.append([vehicle, lat, lng, azm, spd, alt]) print(f"Generated large dataset: {len(large_data)} points from {len(vehicles)} vehicles") # Process efficiently detector = BatchAnomalyDetector("/kaggle/working/anomaly_analysis_pytorch_fixed/models") # Process in chunks for memory efficiency chunk_size = 500 total_anomalies = 0 for i in range(0, len(large_data), chunk_size): chunk = large_data[i:i + chunk_size] print(f"Processing chunk {i//chunk_size + 1}: points {i+1}-{i+len(chunk)}") results = detector.process_batch_list_of_lists(chunk) total_anomalies += results['anomalies'] print(f" Chunk anomalies: {results['anomalies']}") print(f"\nLarge dataset complete:") print(f" Total points: {len(large_data)}") print(f" Total anomalies: {total_anomalies}") print(f" Overall anomaly rate: {total_anomalies/len(large_data):.1%}")