VehicleDiagnosticsAgent / src /agents /data_ingestion_agent.py
saadmannan's picture
Prepare project for Hugging Face Space deployment - Add app.py with Gradio interface - Update requirements.txt with torch dependencies - Configure LFS for large files (models, data) - Update README with comprehensive documentation
d2173d1
"""
Data Ingestion Agent - Loads and prepares sensor data for analysis
"""
import pandas as pd
import numpy as np
from pathlib import Path
import pickle
from typing import Dict, List, Optional
class DataIngestionAgent:
"""
Agent responsible for loading and preparing vehicle sensor data
"""
def __init__(self, data_dir='data/processed'):
self.data_dir = Path(data_dir)
self.scaler = None
self.feature_columns = None
self._load_preprocessing_artifacts()
def _load_preprocessing_artifacts(self):
"""Load scaler and feature columns"""
scaler_path = self.data_dir / 'scaler.pkl'
features_path = self.data_dir / 'feature_columns.pkl'
if scaler_path.exists():
with open(scaler_path, 'rb') as f:
self.scaler = pickle.load(f)
if features_path.exists():
with open(features_path, 'rb') as f:
self.feature_columns = pickle.load(f)
def load_test_data(self) -> pd.DataFrame:
"""Load test dataset"""
test_path = self.data_dir / 'test.csv'
if not test_path.exists():
raise FileNotFoundError(f"Test data not found at {test_path}")
df = pd.read_csv(test_path)
return df
def get_vehicle_data(self, vehicle_id: int, df: Optional[pd.DataFrame] = None) -> pd.DataFrame:
"""
Get sensor data for a specific vehicle
Args:
vehicle_id: ID of the vehicle
df: Optional dataframe to filter from, otherwise loads test data
Returns:
DataFrame with vehicle sensor data
"""
if df is None:
df = self.load_test_data()
vehicle_data = df[df['vehicle_id'] == vehicle_id].copy()
if len(vehicle_data) == 0:
raise ValueError(f"No data found for vehicle_id {vehicle_id}")
return vehicle_data
def get_latest_readings(self, vehicle_id: int, n_readings: int = 50) -> pd.DataFrame:
"""
Get the latest N sensor readings for a vehicle
Args:
vehicle_id: ID of the vehicle
n_readings: Number of recent readings to retrieve
Returns:
DataFrame with latest sensor readings
"""
vehicle_data = self.get_vehicle_data(vehicle_id)
latest_data = vehicle_data.tail(n_readings)
return latest_data
def prepare_for_analysis(self, vehicle_data: pd.DataFrame) -> Dict:
"""
Prepare vehicle data for downstream agents
Args:
vehicle_data: Raw vehicle sensor data
Returns:
Dictionary containing prepared data and metadata
"""
vehicle_id = vehicle_data['vehicle_id'].iloc[0]
# Extract features
if self.feature_columns:
features = vehicle_data[self.feature_columns].values
else:
# Fallback: use all numeric columns except metadata
exclude_cols = ['vehicle_id', 'timestamp', 'anomaly']
feature_cols = [col for col in vehicle_data.columns if col not in exclude_cols]
features = vehicle_data[feature_cols].values
# Get ground truth if available
ground_truth = vehicle_data['anomaly'].values if 'anomaly' in vehicle_data.columns else None
prepared_data = {
'vehicle_id': vehicle_id,
'features': features,
'feature_names': self.feature_columns if self.feature_columns else feature_cols,
'timestamps': vehicle_data['timestamp'].values,
'raw_data': vehicle_data,
'ground_truth': ground_truth,
'num_readings': len(vehicle_data),
'time_range': (vehicle_data['timestamp'].min(), vehicle_data['timestamp'].max())
}
return prepared_data
def get_sensor_summary(self, vehicle_data: pd.DataFrame) -> Dict:
"""
Get summary statistics for sensor readings
Args:
vehicle_data: Vehicle sensor data
Returns:
Dictionary with sensor statistics
"""
sensor_cols = [col for col in vehicle_data.columns
if col not in ['vehicle_id', 'timestamp', 'anomaly']]
summary = {}
for col in sensor_cols:
summary[col] = {
'mean': float(vehicle_data[col].mean()),
'std': float(vehicle_data[col].std()),
'min': float(vehicle_data[col].min()),
'max': float(vehicle_data[col].max()),
'latest': float(vehicle_data[col].iloc[-1])
}
return summary
def run(self, vehicle_id: int, n_readings: Optional[int] = None) -> Dict:
"""
Main execution method for the Data Ingestion Agent
Args:
vehicle_id: ID of the vehicle to analyze
n_readings: Optional number of recent readings to analyze
Returns:
Dictionary containing prepared data for downstream agents
"""
print(f"\n{'='*60}")
print(f"DATA INGESTION AGENT - Vehicle {vehicle_id}")
print(f"{'='*60}")
# Load vehicle data
if n_readings:
vehicle_data = self.get_latest_readings(vehicle_id, n_readings)
print(f"✓ Loaded latest {n_readings} readings for vehicle {vehicle_id}")
else:
vehicle_data = self.get_vehicle_data(vehicle_id)
print(f"✓ Loaded all {len(vehicle_data)} readings for vehicle {vehicle_id}")
# Prepare data for analysis
prepared_data = self.prepare_for_analysis(vehicle_data)
print(f"✓ Prepared {prepared_data['num_readings']} readings for analysis")
print(f" Time range: {prepared_data['time_range'][0]} to {prepared_data['time_range'][1]}")
print(f" Features: {len(prepared_data['feature_names'])}")
# Get sensor summary
sensor_summary = self.get_sensor_summary(vehicle_data)
prepared_data['sensor_summary'] = sensor_summary
print(f"✓ Generated sensor summary statistics")
print(f"{'='*60}\n")
return prepared_data
if __name__ == '__main__':
# Test the Data Ingestion Agent
agent = DataIngestionAgent()
# Test with a vehicle from test set
test_df = agent.load_test_data()
test_vehicle_id = test_df['vehicle_id'].iloc[0]
result = agent.run(test_vehicle_id, n_readings=100)
print("\nSample sensor summary:")
for sensor, stats in list(result['sensor_summary'].items())[:3]:
print(f" {sensor}: mean={stats['mean']:.2f}, std={stats['std']:.2f}")