Spaces:
Sleeping
Sleeping
File size: 7,755 Bytes
d2173d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
"""
Data preprocessing and feature engineering for vehicle sensor data
"""
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
from pathlib import Path
import pickle
class VehicleDataPreprocessor:
"""Preprocess and engineer features from vehicle sensor data"""
def __init__(self, data_path='data/raw/vehicle_sensor_data.csv'):
self.data_path = Path(data_path)
self.scaler = StandardScaler()
self.feature_columns = None
self.target_column = 'anomaly'
def load_data(self):
"""Load raw sensor data"""
print(f"Loading data from {self.data_path}...")
df = pd.read_csv(self.data_path)
print(f"β Loaded {len(df)} records for {df['vehicle_id'].nunique()} vehicles")
return df
def clean_data(self, df):
"""Clean and filter noisy data"""
print("Cleaning data...")
# Remove duplicates
df = df.drop_duplicates()
# Handle missing values
df = df.fillna(df.median(numeric_only=True))
# Remove outliers using IQR method for key sensors
sensor_cols = [col for col in df.columns if col not in ['vehicle_id', 'timestamp', 'anomaly']]
for col in sensor_cols:
Q1 = df[col].quantile(0.01)
Q3 = df[col].quantile(0.99)
IQR = Q3 - Q1
lower_bound = Q1 - 3 * IQR
upper_bound = Q3 + 3 * IQR
df[col] = df[col].clip(lower_bound, upper_bound)
print(f"β Cleaned data: {len(df)} records remaining")
return df
def apply_moving_average(self, df, window=5):
"""Apply moving average filter to reduce noise"""
print(f"Applying moving average filter (window={window})...")
sensor_cols = [col for col in df.columns if col not in ['vehicle_id', 'timestamp', 'anomaly']]
# Group by vehicle and apply rolling average
for col in sensor_cols:
df[f'{col}_ma'] = df.groupby('vehicle_id')[col].transform(
lambda x: x.rolling(window=window, min_periods=1).mean()
)
print(f"β Applied moving average to {len(sensor_cols)} sensors")
return df
def engineer_features(self, df):
"""Create domain-specific features"""
print("Engineering features...")
# Rate of change features
sensor_cols = [col for col in df.columns if col not in ['vehicle_id', 'timestamp', 'anomaly'] and not col.endswith('_ma')]
for col in sensor_cols:
# Rate of change
df[f'{col}_rate'] = df.groupby('vehicle_id')[col].diff()
# Rolling statistics
df[f'{col}_std'] = df.groupby('vehicle_id')[col].transform(
lambda x: x.rolling(window=10, min_periods=1).std()
)
# Domain-specific features
# Temperature differential
df['temp_differential'] = df['engine_temp'] - df['coolant_temp']
# Tire pressure imbalance
df['tire_pressure_imbalance'] = df[['tire_pressure_fl', 'tire_pressure_fr',
'tire_pressure_rl', 'tire_pressure_rr']].std(axis=1)
# Engine stress indicator
df['engine_stress'] = (df['rpm'] / 1000) * (df['engine_temp'] / 100)
# Battery health indicator
df['battery_health'] = df['battery_voltage'] / 12.6 # Normalized to ideal voltage
# Fill NaN values created by diff and rolling operations
df = df.fillna(0)
print(f"β Engineered features: {df.shape[1]} total columns")
return df
def normalize_features(self, df, fit=True):
"""Normalize sensor values"""
print("Normalizing features...")
# Select feature columns (exclude metadata and target)
exclude_cols = ['vehicle_id', 'timestamp', 'anomaly']
self.feature_columns = [col for col in df.columns if col not in exclude_cols]
if fit:
df[self.feature_columns] = self.scaler.fit_transform(df[self.feature_columns])
else:
df[self.feature_columns] = self.scaler.transform(df[self.feature_columns])
print(f"β Normalized {len(self.feature_columns)} features")
return df
def split_data(self, df, test_size=0.2, val_size=0.1):
"""Split data into train, validation, and test sets"""
print("Splitting data...")
# Split by vehicle to avoid data leakage
vehicle_ids = df['vehicle_id'].unique()
# First split: train+val vs test
train_val_ids, test_ids = train_test_split(
vehicle_ids, test_size=test_size, random_state=42
)
# Second split: train vs val
train_ids, val_ids = train_test_split(
train_val_ids, test_size=val_size/(1-test_size), random_state=42
)
train_df = df[df['vehicle_id'].isin(train_ids)]
val_df = df[df['vehicle_id'].isin(val_ids)]
test_df = df[df['vehicle_id'].isin(test_ids)]
print(f"β Train: {len(train_df)} records ({len(train_ids)} vehicles)")
print(f"β Val: {len(val_df)} records ({len(val_ids)} vehicles)")
print(f"β Test: {len(test_df)} records ({len(test_ids)} vehicles)")
return train_df, val_df, test_df
def save_processed_data(self, train_df, val_df, test_df, output_dir='data/processed'):
"""Save processed datasets"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
print(f"Saving processed data to {output_path}...")
train_df.to_csv(output_path / 'train.csv', index=False)
val_df.to_csv(output_path / 'val.csv', index=False)
test_df.to_csv(output_path / 'test.csv', index=False)
# Save scaler
with open(output_path / 'scaler.pkl', 'wb') as f:
pickle.dump(self.scaler, f)
# Save feature columns
with open(output_path / 'feature_columns.pkl', 'wb') as f:
pickle.dump(self.feature_columns, f)
print("β Saved all processed datasets and preprocessing artifacts")
# Print statistics
print("\nDataset Statistics:")
print(f"Train anomaly rate: {train_df['anomaly'].mean():.2%}")
print(f"Val anomaly rate: {val_df['anomaly'].mean():.2%}")
print(f"Test anomaly rate: {test_df['anomaly'].mean():.2%}")
def preprocess_pipeline(self):
"""Run complete preprocessing pipeline"""
print("="*60)
print("VEHICLE DATA PREPROCESSING PIPELINE")
print("="*60)
# Load data
df = self.load_data()
# Clean data
df = self.clean_data(df)
# Apply filters
df = self.apply_moving_average(df, window=5)
# Engineer features
df = self.engineer_features(df)
# Normalize features
df = self.normalize_features(df, fit=True)
# Split data
train_df, val_df, test_df = self.split_data(df)
# Save processed data
self.save_processed_data(train_df, val_df, test_df)
print("\n" + "="*60)
print("PREPROCESSING COMPLETE!")
print("="*60)
return train_df, val_df, test_df
if __name__ == '__main__':
preprocessor = VehicleDataPreprocessor()
train_df, val_df, test_df = preprocessor.preprocess_pipeline()
|