AnomalDrive / batch_production_pred.py
baiganinn's picture
init
7058515
import numpy as np
import pandas as pd
from typing import List, Dict, Optional, Tuple, Any
from datetime import datetime, timedelta
import logging
from production_predictor import ProductionAnomalyDetector, AnomalyResult, GPSPoint
import torch
logger = logging.getLogger(__name__)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class BatchAnomalyDetector(ProductionAnomalyDetector):
"""
Extended ProductionAnomalyDetector with batch processing capabilities
Processes data as list of lists: [[id, lat, lng, azm, spd, alt], ...]
"""
def __init__(self, model_dir: str, config: Dict = None):
super().__init__(model_dir, config)
self.batch_results = []
def process_batch_list_of_lists(self,
data: List[List],
column_order: List[str] = None,
sort_by_vehicle: bool = True,
generate_timestamps: bool = True) -> Dict[str, Any]:
"""
Process batch data as list of lists
Args:
data: List of lists in format [[id, lat, lng, azm, spd, alt], ...]
column_order: Order of columns if different from default
sort_by_vehicle: Whether to sort by vehicle_id for proper sequence
generate_timestamps: Whether to generate timestamps automatically
Returns:
Dictionary with batch processing results
"""
if column_order is None:
column_order = ['vehicle_id', 'lat', 'lng', 'azm', 'spd', 'alt']
print(f"πŸ”„ Processing batch of {len(data)} GPS points...")
# Convert list of lists to DataFrame
df = pd.DataFrame(data, columns=column_order)
# Rename to match your training format
column_mapping = {
'vehicle_id': 'randomized_id',
'azm': 'azm',
'spd': 'spd',
'alt': 'alt',
'lat': 'lat',
'lng': 'lng'
}
# Apply column mapping if needed
for old_col, new_col in column_mapping.items():
if old_col in df.columns and old_col != new_col:
df = df.rename(columns={old_col: new_col})
# Ensure we have the right columns
required_columns = ['randomized_id', 'lat', 'lng', 'alt', 'spd', 'azm']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
# Sort by vehicle and add sequence if requested
if sort_by_vehicle:
df = df.sort_values(['randomized_id', 'lat', 'lng']).reset_index(drop=True)
# Generate timestamps if requested
if generate_timestamps:
df['timestamp'] = self._generate_timestamps(df)
# Process batch
return self._process_dataframe_batch(df)
def process_batch_by_vehicle(self,
data: List[List],
column_order: List[str] = None,
time_interval_seconds: int = 2) -> Dict[str, List[AnomalyResult]]:
"""
Process batch data vehicle by vehicle to maintain proper sequence
Args:
data: List of lists format
column_order: Column order specification
time_interval_seconds: Time interval between GPS points
Returns:
Dictionary with vehicle_id as key and list of results as value
"""
if column_order is None:
column_order = ['vehicle_id', 'lat', 'lng', 'azm', 'spd', 'alt']
# Convert to DataFrame
df = pd.DataFrame(data, columns=column_order)
# Group by vehicle
vehicle_results = {}
total_anomalies = 0
print(f"πŸš› Processing {df['vehicle_id'].nunique()} vehicles with {len(df)} total points...")
for vehicle_id in df['vehicle_id'].unique():
vehicle_data = df[df['vehicle_id'] == vehicle_id].copy()
vehicle_data = vehicle_data.sort_values(['lat', 'lng']).reset_index(drop=True)
print(f"\nπŸ“ Processing vehicle: {vehicle_id} ({len(vehicle_data)} points)")
# Clear vehicle buffer to start fresh
if vehicle_id in self.vehicle_buffers:
del self.vehicle_buffers[vehicle_id]
vehicle_results[vehicle_id] = []
vehicle_anomalies = 0
# Process points sequentially for this vehicle
for idx, row in vehicle_data.iterrows():
timestamp = datetime.now() + timedelta(seconds=idx * time_interval_seconds)
gps_point = GPSPoint(
vehicle_id=vehicle_id,
lat=row['lat'],
lng=row['lng'],
alt=row.get('alt', 0.0),
spd=row.get('spd', 0.0),
azm=row.get('azm', 0.0),
timestamp=timestamp.isoformat()
)
result = self.process_gps_point(gps_point)
if result:
vehicle_results[vehicle_id].append(result)
if result.anomaly_detected:
vehicle_anomalies += 1
total_anomalies += 1
# Print anomaly details
print(f" 🚨 Point {idx+1}: {result.alert_level} "
f"(Speed: {result.driving_metrics['speed']:.1f} km/h, "
f"Conf: {result.confidence:.3f})")
print(f" Risk factors: {result.risk_factors}")
detection_rate = vehicle_anomalies / len(vehicle_results[vehicle_id]) if vehicle_results[vehicle_id] else 0
print(f" πŸ“Š Vehicle summary: {vehicle_anomalies} anomalies out of {len(vehicle_results[vehicle_id])} detections ({detection_rate:.1%})")
print(f"\n🎯 Batch Summary:")
print(f" Total vehicles: {len(vehicle_results)}")
print(f" Total points processed: {len(df)}")
print(f" Total anomalies detected: {total_anomalies}")
print(f" Overall anomaly rate: {total_anomalies/len(df):.1%}")
return vehicle_results
def process_realtime_stream(self, data_stream: List[List],
column_order: List[str] = None,
delay_seconds: float = 2.0,
callback_function = None) -> List[AnomalyResult]:
"""
Simulate real-time processing of list-of-lists data
Args:
data_stream: List of lists to process as real-time stream
column_order: Column order
delay_seconds: Delay between processing points (simulate real-time)
callback_function: Function to call when anomaly is detected
Returns:
List of all detection results
"""
import time
if column_order is None:
column_order = ['vehicle_id', 'lat', 'lng', 'azm', 'spd', 'alt']
print(f"πŸ”΄ Starting real-time stream simulation with {len(data_stream)} points...")
print(f"⏱️ Processing delay: {delay_seconds} seconds between points")
all_results = []
anomaly_count = 0
for i, point_data in enumerate(data_stream):
# Convert list to GPSPoint
point_dict = dict(zip(column_order, point_data))
gps_point = GPSPoint(
vehicle_id=point_dict['vehicle_id'],
lat=point_dict['lat'],
lng=point_dict['lng'],
alt=point_dict.get('alt', 0.0),
spd=point_dict.get('spd', 0.0),
azm=point_dict.get('azm', 0.0),
timestamp=datetime.now().isoformat()
)
# Process point
result = self.process_gps_point(gps_point)
if result:
all_results.append(result)
# Print status
status_icon = "🟒" if result.alert_level == "NORMAL" else "🟑" if result.alert_level in ["LOW", "MEDIUM"] else "πŸ”΄"
print(f"{status_icon} Point {i+1:3d}: {result.vehicle_id:12s} | "
f"{result.alert_level:8s} | Speed: {result.driving_metrics['speed']:5.1f} km/h | "
f"Conf: {result.confidence:.3f}")
if result.anomaly_detected:
anomaly_count += 1
print(f" 🚨 ANOMALY DETECTED! {result.risk_factors}")
# Call callback function if provided
if callback_function:
callback_function(result, gps_point)
else:
print(f"⏳ Point {i+1:3d}: {point_dict['vehicle_id']:12s} | Building buffer...")
# Simulate real-time delay
if i < len(data_stream) - 1: # Don't delay after last point
time.sleep(delay_seconds)
print(f"\nπŸ“Š Stream Complete:")
print(f" Points processed: {len(data_stream)}")
print(f" Detections made: {len(all_results)}")
print(f" Anomalies found: {anomaly_count}")
print(f" Anomaly rate: {anomaly_count/len(all_results)*100:.1f}%" if all_results else " No detections made")
return all_results
def _generate_timestamps(self, df: pd.DataFrame) -> List[str]:
"""Generate realistic timestamps for GPS data"""
base_time = datetime.now()
timestamps = []
for vehicle_id in df['randomized_id'].unique():
vehicle_mask = df['randomized_id'] == vehicle_id
vehicle_count = vehicle_mask.sum()
# Generate timestamps for this vehicle (2-second intervals)
for i in range(vehicle_count):
timestamp = base_time + timedelta(seconds=i * 2)
timestamps.append(timestamp.isoformat())
return timestamps
def _process_dataframe_batch(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Process DataFrame using the existing feature pipeline"""
# Use your exact feature engineering pipeline
features_df = self._calculate_features_exact_pipeline(df)
if len(features_df) == 0:
return {
"status": "error",
"message": "No features could be calculated",
"processed": 0,
"anomalies": 0
}
# Scale features
features_scaled = self.scaler.transform(features_df)
# Get anomaly scores for all points
anomaly_results = []
print("πŸ” Running anomaly detection on all points...")
for i in range(len(features_scaled)):
point_scaled = features_scaled[i:i+1]
# Get scores from all models
scores = {}
# Isolation Forest
if self.isolation_forest:
scores['isolation_forest'] = float(self.isolation_forest.decision_function(point_scaled)[0])
# One-Class SVM
if self.one_class_svm:
scores['one_class_svm'] = float(self.one_class_svm.decision_function(point_scaled)[0])
# LSTM (only if we have enough sequence data)
if self.lstm_autoencoder and i >= self.config['lstm_sequence_length'] - 1:
try:
sequence_start = max(0, i - self.config['lstm_sequence_length'] + 1)
sequence_features = features_scaled[sequence_start:i+1]
if len(sequence_features) == self.config['lstm_sequence_length']:
sequence_tensor = torch.FloatTensor(sequence_features).unsqueeze(0).to(device)
with torch.no_grad():
reconstructed = self.lstm_autoencoder(sequence_tensor)
reconstruction_error = torch.mean((sequence_tensor - reconstructed) ** 2).item()
scores['lstm'] = float(reconstruction_error)
except:
scores['lstm'] = 0.0
# Calculate ensemble score
ensemble_score = self._calculate_ensemble_score(scores)
alert_level = self._get_alert_level(ensemble_score)
# Extract metrics
feature_row = features_df.iloc[i]
driving_metrics = self._extract_driving_metrics_from_features(feature_row)
risk_factors = self._extract_risk_factors_from_features(feature_row)
anomaly_results.append({
'index': i,
'vehicle_id': df.iloc[i]['randomized_id'],
'anomaly_detected': ensemble_score > self.config['alert_threshold'],
'confidence': ensemble_score,
'alert_level': alert_level,
'raw_scores': scores,
'driving_metrics': driving_metrics,
'risk_factors': risk_factors
})
# Generate summary
total_anomalies = sum(1 for r in anomaly_results if r['anomaly_detected'])
return {
"status": "completed",
"processed": len(anomaly_results),
"anomalies": total_anomalies,
"anomaly_rate": total_anomalies / len(anomaly_results) if anomaly_results else 0,
"results": anomaly_results,
"summary": {
"total_vehicles": df['randomized_id'].nunique(),
"total_points": len(df),
"detection_ready_points": len(anomaly_results),
"anomalies_by_level": {
level: sum(1 for r in anomaly_results if r['alert_level'] == level)
for level in ['NORMAL', 'LOW', 'MEDIUM', 'HIGH', 'CRITICAL']
}
}
}
# Example usage functions
def example_list_of_lists_usage():
"""Example of how to use the batch processor with list of lists"""
print("πŸ”„ Example: Processing List of Lists Data")
print("=" * 50)
# Initialize batch detector
detector = BatchAnomalyDetector("/kaggle/working/anomaly_analysis_pytorch_fixed/models")
# Sample data as list of lists: [vehicle_id, lat, lng, azm, spd, alt]
sample_data = [
# Normal driving for vehicle_001
["vehicle_001", 55.7558, 37.6176, 90.0, 45.0, 156.0],
["vehicle_001", 55.7559, 37.6177, 92.0, 47.0, 157.0],
["vehicle_001", 55.7560, 37.6178, 94.0, 46.0, 158.0],
["vehicle_001", 55.7561, 37.6179, 96.0, 48.0, 159.0],
["vehicle_001", 55.7562, 37.6180, 98.0, 49.0, 160.0],
# Aggressive driving for vehicle_002
["vehicle_002", 55.7600, 37.6200, 180.0, 70.0, 150.0],
["vehicle_002", 55.7601, 37.6201, 182.0, 125.0, 151.0], # Speeding
["vehicle_002", 55.7602, 37.6202, 184.0, 15.0, 152.0], # Hard braking
["vehicle_002", 55.7603, 37.6203, 250.0, 55.0, 153.0], # Sharp turn
# Mixed behavior for vehicle_003
["vehicle_003", 55.7700, 37.6300, 45.0, 40.0, 145.0],
["vehicle_003", 55.7701, 37.6301, 47.0, 42.0, 146.0],
["vehicle_003", 55.7702, 37.6302, 49.0, 110.0, 147.0], # Speed violation
["vehicle_003", 55.7703, 37.6303, 51.0, 43.0, 148.0],
]
print(f"Processing {len(sample_data)} GPS points from {len(set(row[0] for row in sample_data))} vehicles...")
# Method 1: Process as batch
print("\nπŸ“Š Method 1: Batch Processing")
batch_results = detector.process_batch_list_of_lists(sample_data)
print(f"Batch Results:")
print(f" Status: {batch_results['status']}")
print(f" Points processed: {batch_results['processed']}")
print(f" Anomalies detected: {batch_results['anomalies']}")
print(f" Anomaly rate: {batch_results['anomaly_rate']:.1%}")
# Method 2: Process by vehicle
print("\nπŸš› Method 2: Vehicle-by-Vehicle Processing")
vehicle_results = detector.process_batch_by_vehicle(sample_data)
for vehicle_id, results in vehicle_results.items():
anomaly_count = sum(1 for r in results if r.anomaly_detected)
print(f" {vehicle_id}: {anomaly_count} anomalies out of {len(results)} detections")
# Method 3: Real-time simulation
print("\nπŸ”΄ Method 3: Real-time Stream Simulation (first 8 points)")
def anomaly_callback(result, gps_point):
"""Callback function for when anomaly is detected"""
print(f" πŸ“§ ALERT SENT: {result.vehicle_id} - {result.alert_level}")
stream_results = detector.process_realtime_stream(
sample_data[:8], # First 8 points
delay_seconds=0.5, # Faster for demo
callback_function=anomaly_callback
)
def load_from_csv_example():
"""Example of loading data from CSV and converting to list of lists"""
print("\nπŸ“ Example: Loading from CSV")
print("=" * 50)
# Simulate CSV loading (you would use pd.read_csv('your_file.csv'))
csv_data = """vehicle_id,lat,lng,azm,spd,alt
vehicle_001,55.7558,37.6176,90.0,45.0,156.0
vehicle_001,55.7559,37.6177,92.0,47.0,157.0
vehicle_002,55.7600,37.6200,180.0,125.0,150.0
vehicle_002,55.7601,37.6201,182.0,15.0,151.0"""
# Convert CSV to list of lists
from io import StringIO
df = pd.read_csv(StringIO(csv_data))
# Convert DataFrame to list of lists
data_as_lists = df.values.tolist()
print(f"Loaded {len(data_as_lists)} rows from CSV")
print(f"Column order: {df.columns.tolist()}")
print(f"Sample data: {data_as_lists[0]}")
# Process with detector
detector = BatchAnomalyDetector("/kaggle/working/anomaly_analysis_pytorch_fixed/models")
results = detector.process_batch_list_of_lists(
data_as_lists,
column_order=df.columns.tolist()
)
print(f"Processing complete: {results['anomalies']} anomalies detected")
def large_dataset_example():
"""Example for processing large datasets efficiently"""
print("\nπŸ”’ Example: Large Dataset Processing")
print("=" * 50)
# Simulate large dataset
np.random.seed(42)
large_data = []
vehicles = [f"vehicle_{i:03d}" for i in range(1, 11)] # 10 vehicles
for vehicle in vehicles:
for point in range(100): # 100 points per vehicle
lat = 55.7500 + np.random.uniform(-0.01, 0.01)
lng = 37.6000 + np.random.uniform(-0.01, 0.01)
azm = np.random.uniform(0, 360)
spd = np.random.uniform(20, 80) if np.random.random() > 0.1 else np.random.uniform(90, 140) # 10% aggressive
alt = 150 + np.random.uniform(-20, 20)
large_data.append([vehicle, lat, lng, azm, spd, alt])
print(f"Generated large dataset: {len(large_data)} points from {len(vehicles)} vehicles")
# Process efficiently
detector = BatchAnomalyDetector("/kaggle/working/anomaly_analysis_pytorch_fixed/models")
# Process in chunks for memory efficiency
chunk_size = 500
total_anomalies = 0
for i in range(0, len(large_data), chunk_size):
chunk = large_data[i:i + chunk_size]
print(f"Processing chunk {i//chunk_size + 1}: points {i+1}-{i+len(chunk)}")
results = detector.process_batch_list_of_lists(chunk)
total_anomalies += results['anomalies']
print(f" Chunk anomalies: {results['anomalies']}")
print(f"\nLarge dataset complete:")
print(f" Total points: {len(large_data)}")
print(f" Total anomalies: {total_anomalies}")
print(f" Overall anomaly rate: {total_anomalies/len(large_data):.1%}")