VehicleDiagnosticsAgent / src /agents /root_cause_agent.py
saadmannan's picture
Prepare project for Hugging Face Space deployment - Add app.py with Gradio interface - Update requirements.txt with torch dependencies - Configure LFS for large files (models, data) - Update README with comprehensive documentation
d2173d1
"""
Root Cause Analysis Agent - Identifies the root cause of detected anomalies
"""
import numpy as np
from typing import Dict, List, Tuple
class RootCauseAnalysisAgent:
"""
Agent responsible for determining the root cause of detected anomalies
"""
def __init__(self):
# Define fault patterns and their associated root causes
self.fault_patterns = {
'engine_overheating': {
'sensors': ['engine_temp', 'coolant_temp', 'temp_differential'],
'thresholds': {'engine_temp': 1.5, 'coolant_temp': 1.5, 'temp_differential': 1.0},
'description': 'Engine temperature exceeds safe operating limits',
'severity': 'critical',
'fault_codes': ['P0217', 'P0218', 'P0219']
},
'cooling_system_failure': {
'sensors': ['coolant_temp', 'engine_temp'],
'thresholds': {'coolant_temp': 2.0, 'engine_temp': 1.8},
'description': 'Cooling system not maintaining proper temperature',
'severity': 'critical',
'fault_codes': ['P0217', 'P0128']
},
'oil_pressure_low': {
'sensors': ['oil_pressure'],
'thresholds': {'oil_pressure': -1.5},
'description': 'Oil pressure below safe operating range',
'severity': 'critical',
'fault_codes': ['P0520', 'P0521', 'P0522']
},
'battery_degradation': {
'sensors': ['battery_voltage', 'battery_health'],
'thresholds': {'battery_voltage': -1.0, 'battery_health': -1.0},
'description': 'Battery voltage or health declining',
'severity': 'high',
'fault_codes': ['P0560', 'P0562', 'P0563']
},
'tire_pressure_issue': {
'sensors': ['tire_pressure_fl', 'tire_pressure_fr', 'tire_pressure_rl', 'tire_pressure_rr', 'tire_pressure_imbalance'],
'thresholds': {'tire_pressure_fl': -1.5, 'tire_pressure_fr': -1.5,
'tire_pressure_rl': -1.5, 'tire_pressure_rr': -1.5,
'tire_pressure_imbalance': 1.5},
'description': 'One or more tires have incorrect pressure',
'severity': 'medium',
'fault_codes': ['C1234', 'C1235']
},
'excessive_vibration': {
'sensors': ['vibration_level'],
'thresholds': {'vibration_level': 2.0},
'description': 'Abnormal vibration detected',
'severity': 'high',
'fault_codes': ['P0300', 'P0301']
},
'fuel_system_issue': {
'sensors': ['fuel_pressure'],
'thresholds': {'fuel_pressure': -1.5},
'description': 'Fuel pressure outside normal range',
'severity': 'high',
'fault_codes': ['P0087', 'P0088']
},
'engine_stress': {
'sensors': ['engine_stress', 'rpm', 'engine_temp'],
'thresholds': {'engine_stress': 2.0, 'rpm': 2.0},
'description': 'Engine operating under excessive stress',
'severity': 'medium',
'fault_codes': ['P0101', 'P0102']
}
}
def analyze_sensor_patterns(self, anomalous_sensors: Dict, raw_data) -> List[Dict]:
"""
Analyze anomalous sensor patterns to identify root causes
Args:
anomalous_sensors: Dictionary of sensors showing anomalous behavior
raw_data: Raw sensor data DataFrame
Returns:
List of identified root causes with confidence scores
"""
identified_causes = []
for fault_name, fault_info in self.fault_patterns.items():
# Check if any of the fault's sensors are anomalous
matching_sensors = []
confidence_scores = []
for sensor in fault_info['sensors']:
if sensor in anomalous_sensors:
matching_sensors.append(sensor)
# Calculate confidence based on deviation
deviation = anomalous_sensors[sensor]['deviation']
confidence = min(deviation / 5.0, 1.0) # Normalize to 0-1
confidence_scores.append(confidence)
# Also check if sensor values exceed thresholds
elif sensor in raw_data.columns:
threshold = fault_info['thresholds'].get(sensor)
if threshold is not None:
# Check recent values
recent_values = raw_data[sensor].tail(20)
if threshold > 0:
exceeds = (recent_values > threshold).sum() / len(recent_values)
else:
exceeds = (recent_values < threshold).sum() / len(recent_values)
if exceeds > 0.3: # If 30% of recent values exceed threshold
matching_sensors.append(sensor)
confidence_scores.append(exceeds)
# If we have matching sensors, this is a potential root cause
if matching_sensors:
avg_confidence = np.mean(confidence_scores)
identified_causes.append({
'fault_name': fault_name,
'description': fault_info['description'],
'severity': fault_info['severity'],
'confidence': float(avg_confidence),
'affected_sensors': matching_sensors,
'fault_codes': fault_info['fault_codes'],
'num_sensors_affected': len(matching_sensors)
})
# Sort by confidence
identified_causes.sort(key=lambda x: x['confidence'], reverse=True)
return identified_causes
def correlate_sensor_failures(self, anomalous_sensors: Dict) -> List[Tuple[str, str, float]]:
"""
Find correlations between anomalous sensors
Args:
anomalous_sensors: Dictionary of anomalous sensors
Returns:
List of correlated sensor pairs with correlation strength
"""
correlations = []
# Known sensor correlations
known_correlations = [
('engine_temp', 'coolant_temp', 0.9),
('engine_temp', 'oil_pressure', -0.7),
('rpm', 'engine_temp', 0.6),
('battery_voltage', 'battery_health', 0.95),
('tire_pressure_fl', 'tire_pressure_fr', 0.8),
('tire_pressure_rl', 'tire_pressure_rr', 0.8),
]
for sensor1, sensor2, corr_strength in known_correlations:
if sensor1 in anomalous_sensors and sensor2 in anomalous_sensors:
correlations.append((sensor1, sensor2, corr_strength))
return correlations
def determine_failure_sequence(self, anomaly_indices: List[int],
anomalous_sensors: Dict,
timestamps: np.ndarray) -> Dict:
"""
Determine the sequence of failures
Args:
anomaly_indices: Indices where anomalies occurred
anomalous_sensors: Dictionary of anomalous sensors
timestamps: Array of timestamps
Returns:
Dictionary describing failure sequence
"""
if not anomaly_indices:
return {'sequence': [], 'duration': 0}
first_anomaly = min(anomaly_indices)
last_anomaly = max(anomaly_indices)
duration = last_anomaly - first_anomaly
sequence = {
'first_anomaly_time': int(timestamps[first_anomaly]),
'last_anomaly_time': int(timestamps[last_anomaly]),
'duration': int(duration),
'progression': 'gradual' if duration > 50 else 'sudden',
'affected_sensors': list(anomalous_sensors.keys())
}
return sequence
def run(self, anomaly_result: Dict) -> Dict:
"""
Main execution method for the Root Cause Analysis Agent
Args:
anomaly_result: Results from Anomaly Detection Agent
Returns:
Dictionary containing root cause analysis
"""
print(f"\n{'='*60}")
print(f"ROOT CAUSE ANALYSIS AGENT - Vehicle {anomaly_result['vehicle_id']}")
print(f"{'='*60}")
if not anomaly_result['anomaly_detected']:
print("✓ No anomalies detected - no root cause analysis needed")
print(f"{'='*60}\n")
return {
'vehicle_id': anomaly_result['vehicle_id'],
'root_causes': [],
'correlations': [],
'failure_sequence': {},
'analysis_summary': 'No anomalies detected'
}
anomalous_sensors = anomaly_result['anomalous_sensors']
raw_data = anomaly_result['raw_data']
anomaly_indices = anomaly_result['anomaly_indices']
timestamps = anomaly_result['timestamps']
print(f"Analyzing {len(anomalous_sensors)} anomalous sensors...")
# Identify root causes
root_causes = self.analyze_sensor_patterns(anomalous_sensors, raw_data)
print(f"✓ Identified {len(root_causes)} potential root causes")
if root_causes:
print("\nTop root causes:")
for i, cause in enumerate(root_causes[:3], 1):
print(f" {i}. {cause['fault_name']} ({cause['severity']} severity)")
print(f" Confidence: {cause['confidence']:.2%}")
print(f" Description: {cause['description']}")
print(f" Fault codes: {', '.join(cause['fault_codes'])}")
# Find sensor correlations
correlations = self.correlate_sensor_failures(anomalous_sensors)
if correlations:
print(f"\n✓ Found {len(correlations)} correlated sensor failures")
for sensor1, sensor2, strength in correlations:
print(f" - {sensor1}{sensor2} (correlation: {strength:.2f})")
# Determine failure sequence
failure_sequence = self.determine_failure_sequence(
anomaly_indices, anomalous_sensors, timestamps
)
print(f"\n✓ Failure progression: {failure_sequence.get('progression', 'unknown')}")
print(f" Duration: {failure_sequence.get('duration', 0)} timesteps")
# Generate analysis summary
if root_causes:
primary_cause = root_causes[0]
summary = (f"Primary issue: {primary_cause['description']} "
f"({primary_cause['severity']} severity, "
f"{primary_cause['confidence']:.0%} confidence)")
else:
summary = "Anomalies detected but root cause unclear"
print(f"\n✓ Analysis summary: {summary}")
print(f"{'='*60}\n")
result = {
'vehicle_id': anomaly_result['vehicle_id'],
'root_causes': root_causes,
'correlations': correlations,
'failure_sequence': failure_sequence,
'analysis_summary': summary,
'primary_cause': root_causes[0] if root_causes else None
}
return result
if __name__ == '__main__':
# Test the Root Cause Analysis Agent
from data_ingestion_agent import DataIngestionAgent
from anomaly_detection_agent import AnomalyDetectionAgent
# Load and prepare data
ingestion_agent = DataIngestionAgent()
test_df = ingestion_agent.load_test_data()
# Find a vehicle with anomalies
test_vehicle_id = None
for vid in test_df['vehicle_id'].unique()[:10]:
if test_df[test_df['vehicle_id'] == vid]['anomaly'].sum() > 0:
test_vehicle_id = vid
break
if test_vehicle_id:
prepared_data = ingestion_agent.run(test_vehicle_id)
# Detect anomalies
detection_agent = AnomalyDetectionAgent()
anomaly_result = detection_agent.run(prepared_data)
# Analyze root cause
rca_agent = RootCauseAnalysisAgent()
result = rca_agent.run(anomaly_result)
print(f"\nRoot Cause Analysis Summary:")
print(f" Primary cause: {result['primary_cause']['fault_name'] if result['primary_cause'] else 'None'}")
print(f" Root causes found: {len(result['root_causes'])}")