File size: 6,890 Bytes
d2173d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""
Data Ingestion Agent - Loads and prepares sensor data for analysis
"""
import pandas as pd
import numpy as np
from pathlib import Path
import pickle
from typing import Dict, List, Optional


class DataIngestionAgent:
    """
    Agent responsible for loading and preparing vehicle sensor data
    """
    
    def __init__(self, data_dir='data/processed'):
        self.data_dir = Path(data_dir)
        self.scaler = None
        self.feature_columns = None
        self._load_preprocessing_artifacts()
    
    def _load_preprocessing_artifacts(self):
        """Load scaler and feature columns"""
        scaler_path = self.data_dir / 'scaler.pkl'
        features_path = self.data_dir / 'feature_columns.pkl'
        
        if scaler_path.exists():
            with open(scaler_path, 'rb') as f:
                self.scaler = pickle.load(f)
        
        if features_path.exists():
            with open(features_path, 'rb') as f:
                self.feature_columns = pickle.load(f)
    
    def load_test_data(self) -> pd.DataFrame:
        """Load test dataset"""
        test_path = self.data_dir / 'test.csv'
        if not test_path.exists():
            raise FileNotFoundError(f"Test data not found at {test_path}")
        
        df = pd.read_csv(test_path)
        return df
    
    def get_vehicle_data(self, vehicle_id: int, df: Optional[pd.DataFrame] = None) -> pd.DataFrame:
        """
        Get sensor data for a specific vehicle
        
        Args:
            vehicle_id: ID of the vehicle
            df: Optional dataframe to filter from, otherwise loads test data
            
        Returns:
            DataFrame with vehicle sensor data
        """
        if df is None:
            df = self.load_test_data()
        
        vehicle_data = df[df['vehicle_id'] == vehicle_id].copy()
        
        if len(vehicle_data) == 0:
            raise ValueError(f"No data found for vehicle_id {vehicle_id}")
        
        return vehicle_data
    
    def get_latest_readings(self, vehicle_id: int, n_readings: int = 50) -> pd.DataFrame:
        """
        Get the latest N sensor readings for a vehicle
        
        Args:
            vehicle_id: ID of the vehicle
            n_readings: Number of recent readings to retrieve
            
        Returns:
            DataFrame with latest sensor readings
        """
        vehicle_data = self.get_vehicle_data(vehicle_id)
        latest_data = vehicle_data.tail(n_readings)
        return latest_data
    
    def prepare_for_analysis(self, vehicle_data: pd.DataFrame) -> Dict:
        """
        Prepare vehicle data for downstream agents
        
        Args:
            vehicle_data: Raw vehicle sensor data
            
        Returns:
            Dictionary containing prepared data and metadata
        """
        vehicle_id = vehicle_data['vehicle_id'].iloc[0]
        
        # Extract features
        if self.feature_columns:
            features = vehicle_data[self.feature_columns].values
        else:
            # Fallback: use all numeric columns except metadata
            exclude_cols = ['vehicle_id', 'timestamp', 'anomaly']
            feature_cols = [col for col in vehicle_data.columns if col not in exclude_cols]
            features = vehicle_data[feature_cols].values
        
        # Get ground truth if available
        ground_truth = vehicle_data['anomaly'].values if 'anomaly' in vehicle_data.columns else None
        
        prepared_data = {
            'vehicle_id': vehicle_id,
            'features': features,
            'feature_names': self.feature_columns if self.feature_columns else feature_cols,
            'timestamps': vehicle_data['timestamp'].values,
            'raw_data': vehicle_data,
            'ground_truth': ground_truth,
            'num_readings': len(vehicle_data),
            'time_range': (vehicle_data['timestamp'].min(), vehicle_data['timestamp'].max())
        }
        
        return prepared_data
    
    def get_sensor_summary(self, vehicle_data: pd.DataFrame) -> Dict:
        """
        Get summary statistics for sensor readings
        
        Args:
            vehicle_data: Vehicle sensor data
            
        Returns:
            Dictionary with sensor statistics
        """
        sensor_cols = [col for col in vehicle_data.columns 
                      if col not in ['vehicle_id', 'timestamp', 'anomaly']]
        
        summary = {}
        for col in sensor_cols:
            summary[col] = {
                'mean': float(vehicle_data[col].mean()),
                'std': float(vehicle_data[col].std()),
                'min': float(vehicle_data[col].min()),
                'max': float(vehicle_data[col].max()),
                'latest': float(vehicle_data[col].iloc[-1])
            }
        
        return summary
    
    def run(self, vehicle_id: int, n_readings: Optional[int] = None) -> Dict:
        """
        Main execution method for the Data Ingestion Agent
        
        Args:
            vehicle_id: ID of the vehicle to analyze
            n_readings: Optional number of recent readings to analyze
            
        Returns:
            Dictionary containing prepared data for downstream agents
        """
        print(f"\n{'='*60}")
        print(f"DATA INGESTION AGENT - Vehicle {vehicle_id}")
        print(f"{'='*60}")
        
        # Load vehicle data
        if n_readings:
            vehicle_data = self.get_latest_readings(vehicle_id, n_readings)
            print(f"✓ Loaded latest {n_readings} readings for vehicle {vehicle_id}")
        else:
            vehicle_data = self.get_vehicle_data(vehicle_id)
            print(f"✓ Loaded all {len(vehicle_data)} readings for vehicle {vehicle_id}")
        
        # Prepare data for analysis
        prepared_data = self.prepare_for_analysis(vehicle_data)
        print(f"✓ Prepared {prepared_data['num_readings']} readings for analysis")
        print(f"  Time range: {prepared_data['time_range'][0]} to {prepared_data['time_range'][1]}")
        print(f"  Features: {len(prepared_data['feature_names'])}")
        
        # Get sensor summary
        sensor_summary = self.get_sensor_summary(vehicle_data)
        prepared_data['sensor_summary'] = sensor_summary
        
        print(f"✓ Generated sensor summary statistics")
        print(f"{'='*60}\n")
        
        return prepared_data


if __name__ == '__main__':
    # Test the Data Ingestion Agent
    agent = DataIngestionAgent()
    
    # Test with a vehicle from test set
    test_df = agent.load_test_data()
    test_vehicle_id = test_df['vehicle_id'].iloc[0]
    
    result = agent.run(test_vehicle_id, n_readings=100)
    
    print("\nSample sensor summary:")
    for sensor, stats in list(result['sensor_summary'].items())[:3]:
        print(f"  {sensor}: mean={stats['mean']:.2f}, std={stats['std']:.2f}")