VehicleDiagnosticsAgent / src /orchestrator.py
saadmannan's picture
Prepare project for Hugging Face Space deployment - Add app.py with Gradio interface - Update requirements.txt with torch dependencies - Configure LFS for large files (models, data) - Update README with comprehensive documentation
d2173d1
"""
Multi-Agent Orchestrator using LangGraph
Coordinates the execution of all diagnostic agents
"""
from typing import Dict, TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
from agents.data_ingestion_agent import DataIngestionAgent
from agents.anomaly_detection_agent import AnomalyDetectionAgent
from agents.root_cause_agent import RootCauseAnalysisAgent
from agents.maintenance_recommendation_agent import MaintenanceRecommendationAgent
from agents.report_generation_agent import ReportGenerationAgent
class DiagnosticState(TypedDict):
"""State object passed between agents"""
vehicle_id: int
n_readings: int
prepared_data: Dict
anomaly_result: Dict
root_cause_result: Dict
maintenance_result: Dict
report_result: Dict
error: str
class VehicleDiagnosticOrchestrator:
"""
Orchestrates the multi-agent vehicle diagnostic workflow using LangGraph
"""
def __init__(self):
self.ingestion_agent = DataIngestionAgent()
self.anomaly_agent = AnomalyDetectionAgent()
self.root_cause_agent = RootCauseAnalysisAgent()
self.maintenance_agent = MaintenanceRecommendationAgent()
self.report_agent = ReportGenerationAgent()
self.workflow = self._build_workflow()
def _build_workflow(self) -> StateGraph:
"""Build the LangGraph workflow"""
# Define the workflow graph
workflow = StateGraph(DiagnosticState)
# Add nodes for each agent
workflow.add_node("data_ingestion", self._run_data_ingestion)
workflow.add_node("anomaly_detection", self._run_anomaly_detection)
workflow.add_node("root_cause_analysis", self._run_root_cause_analysis)
workflow.add_node("maintenance_recommendation", self._run_maintenance_recommendation)
workflow.add_node("report_generation", self._run_report_generation)
# Define the workflow edges (sequential execution)
workflow.set_entry_point("data_ingestion")
workflow.add_edge("data_ingestion", "anomaly_detection")
workflow.add_edge("anomaly_detection", "root_cause_analysis")
workflow.add_edge("root_cause_analysis", "maintenance_recommendation")
workflow.add_edge("maintenance_recommendation", "report_generation")
workflow.add_edge("report_generation", END)
return workflow.compile()
def _run_data_ingestion(self, state: DiagnosticState) -> DiagnosticState:
"""Execute Data Ingestion Agent"""
try:
prepared_data = self.ingestion_agent.run(
state['vehicle_id'],
state.get('n_readings')
)
state['prepared_data'] = prepared_data
except Exception as e:
state['error'] = f"Data Ingestion Error: {str(e)}"
return state
def _run_anomaly_detection(self, state: DiagnosticState) -> DiagnosticState:
"""Execute Anomaly Detection Agent"""
try:
if 'error' not in state:
anomaly_result = self.anomaly_agent.run(state['prepared_data'])
state['anomaly_result'] = anomaly_result
except Exception as e:
state['error'] = f"Anomaly Detection Error: {str(e)}"
return state
def _run_root_cause_analysis(self, state: DiagnosticState) -> DiagnosticState:
"""Execute Root Cause Analysis Agent"""
try:
if 'error' not in state:
root_cause_result = self.root_cause_agent.run(state['anomaly_result'])
state['root_cause_result'] = root_cause_result
except Exception as e:
state['error'] = f"Root Cause Analysis Error: {str(e)}"
return state
def _run_maintenance_recommendation(self, state: DiagnosticState) -> DiagnosticState:
"""Execute Maintenance Recommendation Agent"""
try:
if 'error' not in state:
maintenance_result = self.maintenance_agent.run(state['root_cause_result'])
state['maintenance_result'] = maintenance_result
except Exception as e:
state['error'] = f"Maintenance Recommendation Error: {str(e)}"
return state
def _run_report_generation(self, state: DiagnosticState) -> DiagnosticState:
"""Execute Report Generation Agent"""
try:
if 'error' not in state:
report_result = self.report_agent.run(
state['vehicle_id'],
state['prepared_data'],
state['anomaly_result'],
state['root_cause_result'],
state['maintenance_result']
)
state['report_result'] = report_result
except Exception as e:
state['error'] = f"Report Generation Error: {str(e)}"
return state
def diagnose_vehicle(self, vehicle_id: int, n_readings: int = None) -> Dict:
"""
Run complete diagnostic workflow for a vehicle
Args:
vehicle_id: ID of the vehicle to diagnose
n_readings: Optional number of recent readings to analyze
Returns:
Dictionary containing complete diagnostic results
"""
print("\n" + "="*60)
print("VEHICLE DIAGNOSTIC ORCHESTRATOR")
print("="*60)
print(f"Starting diagnostic workflow for Vehicle {vehicle_id}")
print("="*60 + "\n")
# Initialize state
initial_state = {
'vehicle_id': vehicle_id,
'n_readings': n_readings
}
# Execute workflow
final_state = self.workflow.invoke(initial_state)
# Check for errors
if 'error' in final_state:
print(f"\n❌ Error occurred: {final_state['error']}")
return {
'success': False,
'error': final_state['error'],
'vehicle_id': vehicle_id
}
print("\n" + "="*60)
print("DIAGNOSTIC WORKFLOW COMPLETED SUCCESSFULLY")
print("="*60)
# Return comprehensive results
return {
'success': True,
'vehicle_id': vehicle_id,
'prepared_data': final_state.get('prepared_data'),
'anomaly_result': final_state.get('anomaly_result'),
'root_cause_result': final_state.get('root_cause_result'),
'maintenance_result': final_state.get('maintenance_result'),
'report': final_state.get('report_result')
}
def diagnose_multiple_vehicles(self, vehicle_ids: list, n_readings: int = None) -> Dict:
"""
Run diagnostics for multiple vehicles
Args:
vehicle_ids: List of vehicle IDs
n_readings: Optional number of recent readings to analyze
Returns:
Dictionary mapping vehicle IDs to diagnostic results
"""
results = {}
print(f"\n{'='*60}")
print(f"BATCH DIAGNOSTICS - {len(vehicle_ids)} vehicles")
print(f"{'='*60}\n")
for i, vehicle_id in enumerate(vehicle_ids, 1):
print(f"\nProcessing vehicle {i}/{len(vehicle_ids)}: {vehicle_id}")
results[vehicle_id] = self.diagnose_vehicle(vehicle_id, n_readings)
print(f"\n{'='*60}")
print(f"BATCH DIAGNOSTICS COMPLETED")
print(f"{'='*60}")
# Summary statistics
successful = sum(1 for r in results.values() if r['success'])
with_anomalies = sum(1 for r in results.values()
if r['success'] and r.get('anomaly_result', {}).get('anomaly_detected'))
print(f"\nSummary:")
print(f" Total vehicles: {len(vehicle_ids)}")
print(f" Successfully analyzed: {successful}")
print(f" Vehicles with anomalies: {with_anomalies}")
return results
def main():
"""Test the orchestrator"""
orchestrator = VehicleDiagnosticOrchestrator()
# Load test data to get vehicle IDs
from agents.data_ingestion_agent import DataIngestionAgent
ingestion_agent = DataIngestionAgent()
test_df = ingestion_agent.load_test_data()
# Get a vehicle with anomalies
test_vehicle_id = None
for vid in test_df['vehicle_id'].unique()[:10]:
if test_df[test_df['vehicle_id'] == vid]['anomaly'].sum() > 0:
test_vehicle_id = vid
break
if test_vehicle_id:
# Run single vehicle diagnostic
result = orchestrator.diagnose_vehicle(test_vehicle_id, n_readings=200)
if result['success']:
print("\n" + "="*60)
print("DIAGNOSTIC REPORT PREVIEW")
print("="*60)
report = result['report']['full_report']
print(report[:2000] + "\n...\n")
print("\nNatural Language Summary:")
print("-"*60)
print(result['report']['natural_language_summary'])
if __name__ == '__main__':
main()