VehicleDiagnosticsAgent / src /utils /data_preprocessing.py
saadmannan's picture
Prepare project for Hugging Face Space deployment - Add app.py with Gradio interface - Update requirements.txt with torch dependencies - Configure LFS for large files (models, data) - Update README with comprehensive documentation
d2173d1
"""
Data preprocessing and feature engineering for vehicle sensor data
"""
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
from pathlib import Path
import pickle
class VehicleDataPreprocessor:
"""Preprocess and engineer features from vehicle sensor data"""
def __init__(self, data_path='data/raw/vehicle_sensor_data.csv'):
self.data_path = Path(data_path)
self.scaler = StandardScaler()
self.feature_columns = None
self.target_column = 'anomaly'
def load_data(self):
"""Load raw sensor data"""
print(f"Loading data from {self.data_path}...")
df = pd.read_csv(self.data_path)
print(f"βœ“ Loaded {len(df)} records for {df['vehicle_id'].nunique()} vehicles")
return df
def clean_data(self, df):
"""Clean and filter noisy data"""
print("Cleaning data...")
# Remove duplicates
df = df.drop_duplicates()
# Handle missing values
df = df.fillna(df.median(numeric_only=True))
# Remove outliers using IQR method for key sensors
sensor_cols = [col for col in df.columns if col not in ['vehicle_id', 'timestamp', 'anomaly']]
for col in sensor_cols:
Q1 = df[col].quantile(0.01)
Q3 = df[col].quantile(0.99)
IQR = Q3 - Q1
lower_bound = Q1 - 3 * IQR
upper_bound = Q3 + 3 * IQR
df[col] = df[col].clip(lower_bound, upper_bound)
print(f"βœ“ Cleaned data: {len(df)} records remaining")
return df
def apply_moving_average(self, df, window=5):
"""Apply moving average filter to reduce noise"""
print(f"Applying moving average filter (window={window})...")
sensor_cols = [col for col in df.columns if col not in ['vehicle_id', 'timestamp', 'anomaly']]
# Group by vehicle and apply rolling average
for col in sensor_cols:
df[f'{col}_ma'] = df.groupby('vehicle_id')[col].transform(
lambda x: x.rolling(window=window, min_periods=1).mean()
)
print(f"βœ“ Applied moving average to {len(sensor_cols)} sensors")
return df
def engineer_features(self, df):
"""Create domain-specific features"""
print("Engineering features...")
# Rate of change features
sensor_cols = [col for col in df.columns if col not in ['vehicle_id', 'timestamp', 'anomaly'] and not col.endswith('_ma')]
for col in sensor_cols:
# Rate of change
df[f'{col}_rate'] = df.groupby('vehicle_id')[col].diff()
# Rolling statistics
df[f'{col}_std'] = df.groupby('vehicle_id')[col].transform(
lambda x: x.rolling(window=10, min_periods=1).std()
)
# Domain-specific features
# Temperature differential
df['temp_differential'] = df['engine_temp'] - df['coolant_temp']
# Tire pressure imbalance
df['tire_pressure_imbalance'] = df[['tire_pressure_fl', 'tire_pressure_fr',
'tire_pressure_rl', 'tire_pressure_rr']].std(axis=1)
# Engine stress indicator
df['engine_stress'] = (df['rpm'] / 1000) * (df['engine_temp'] / 100)
# Battery health indicator
df['battery_health'] = df['battery_voltage'] / 12.6 # Normalized to ideal voltage
# Fill NaN values created by diff and rolling operations
df = df.fillna(0)
print(f"βœ“ Engineered features: {df.shape[1]} total columns")
return df
def normalize_features(self, df, fit=True):
"""Normalize sensor values"""
print("Normalizing features...")
# Select feature columns (exclude metadata and target)
exclude_cols = ['vehicle_id', 'timestamp', 'anomaly']
self.feature_columns = [col for col in df.columns if col not in exclude_cols]
if fit:
df[self.feature_columns] = self.scaler.fit_transform(df[self.feature_columns])
else:
df[self.feature_columns] = self.scaler.transform(df[self.feature_columns])
print(f"βœ“ Normalized {len(self.feature_columns)} features")
return df
def split_data(self, df, test_size=0.2, val_size=0.1):
"""Split data into train, validation, and test sets"""
print("Splitting data...")
# Split by vehicle to avoid data leakage
vehicle_ids = df['vehicle_id'].unique()
# First split: train+val vs test
train_val_ids, test_ids = train_test_split(
vehicle_ids, test_size=test_size, random_state=42
)
# Second split: train vs val
train_ids, val_ids = train_test_split(
train_val_ids, test_size=val_size/(1-test_size), random_state=42
)
train_df = df[df['vehicle_id'].isin(train_ids)]
val_df = df[df['vehicle_id'].isin(val_ids)]
test_df = df[df['vehicle_id'].isin(test_ids)]
print(f"βœ“ Train: {len(train_df)} records ({len(train_ids)} vehicles)")
print(f"βœ“ Val: {len(val_df)} records ({len(val_ids)} vehicles)")
print(f"βœ“ Test: {len(test_df)} records ({len(test_ids)} vehicles)")
return train_df, val_df, test_df
def save_processed_data(self, train_df, val_df, test_df, output_dir='data/processed'):
"""Save processed datasets"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
print(f"Saving processed data to {output_path}...")
train_df.to_csv(output_path / 'train.csv', index=False)
val_df.to_csv(output_path / 'val.csv', index=False)
test_df.to_csv(output_path / 'test.csv', index=False)
# Save scaler
with open(output_path / 'scaler.pkl', 'wb') as f:
pickle.dump(self.scaler, f)
# Save feature columns
with open(output_path / 'feature_columns.pkl', 'wb') as f:
pickle.dump(self.feature_columns, f)
print("βœ“ Saved all processed datasets and preprocessing artifacts")
# Print statistics
print("\nDataset Statistics:")
print(f"Train anomaly rate: {train_df['anomaly'].mean():.2%}")
print(f"Val anomaly rate: {val_df['anomaly'].mean():.2%}")
print(f"Test anomaly rate: {test_df['anomaly'].mean():.2%}")
def preprocess_pipeline(self):
"""Run complete preprocessing pipeline"""
print("="*60)
print("VEHICLE DATA PREPROCESSING PIPELINE")
print("="*60)
# Load data
df = self.load_data()
# Clean data
df = self.clean_data(df)
# Apply filters
df = self.apply_moving_average(df, window=5)
# Engineer features
df = self.engineer_features(df)
# Normalize features
df = self.normalize_features(df, fit=True)
# Split data
train_df, val_df, test_df = self.split_data(df)
# Save processed data
self.save_processed_data(train_df, val_df, test_df)
print("\n" + "="*60)
print("PREPROCESSING COMPLETE!")
print("="*60)
return train_df, val_df, test_df
if __name__ == '__main__':
preprocessor = VehicleDataPreprocessor()
train_df, val_df, test_df = preprocessor.preprocess_pipeline()