Spaces:
Sleeping
Sleeping
File size: 9,209 Bytes
d2173d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
"""
Multi-Agent Orchestrator using LangGraph
Coordinates the execution of all diagnostic agents
"""
from typing import Dict, TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
from agents.data_ingestion_agent import DataIngestionAgent
from agents.anomaly_detection_agent import AnomalyDetectionAgent
from agents.root_cause_agent import RootCauseAnalysisAgent
from agents.maintenance_recommendation_agent import MaintenanceRecommendationAgent
from agents.report_generation_agent import ReportGenerationAgent
class DiagnosticState(TypedDict):
"""State object passed between agents"""
vehicle_id: int
n_readings: int
prepared_data: Dict
anomaly_result: Dict
root_cause_result: Dict
maintenance_result: Dict
report_result: Dict
error: str
class VehicleDiagnosticOrchestrator:
"""
Orchestrates the multi-agent vehicle diagnostic workflow using LangGraph
"""
def __init__(self):
self.ingestion_agent = DataIngestionAgent()
self.anomaly_agent = AnomalyDetectionAgent()
self.root_cause_agent = RootCauseAnalysisAgent()
self.maintenance_agent = MaintenanceRecommendationAgent()
self.report_agent = ReportGenerationAgent()
self.workflow = self._build_workflow()
def _build_workflow(self) -> StateGraph:
"""Build the LangGraph workflow"""
# Define the workflow graph
workflow = StateGraph(DiagnosticState)
# Add nodes for each agent
workflow.add_node("data_ingestion", self._run_data_ingestion)
workflow.add_node("anomaly_detection", self._run_anomaly_detection)
workflow.add_node("root_cause_analysis", self._run_root_cause_analysis)
workflow.add_node("maintenance_recommendation", self._run_maintenance_recommendation)
workflow.add_node("report_generation", self._run_report_generation)
# Define the workflow edges (sequential execution)
workflow.set_entry_point("data_ingestion")
workflow.add_edge("data_ingestion", "anomaly_detection")
workflow.add_edge("anomaly_detection", "root_cause_analysis")
workflow.add_edge("root_cause_analysis", "maintenance_recommendation")
workflow.add_edge("maintenance_recommendation", "report_generation")
workflow.add_edge("report_generation", END)
return workflow.compile()
def _run_data_ingestion(self, state: DiagnosticState) -> DiagnosticState:
"""Execute Data Ingestion Agent"""
try:
prepared_data = self.ingestion_agent.run(
state['vehicle_id'],
state.get('n_readings')
)
state['prepared_data'] = prepared_data
except Exception as e:
state['error'] = f"Data Ingestion Error: {str(e)}"
return state
def _run_anomaly_detection(self, state: DiagnosticState) -> DiagnosticState:
"""Execute Anomaly Detection Agent"""
try:
if 'error' not in state:
anomaly_result = self.anomaly_agent.run(state['prepared_data'])
state['anomaly_result'] = anomaly_result
except Exception as e:
state['error'] = f"Anomaly Detection Error: {str(e)}"
return state
def _run_root_cause_analysis(self, state: DiagnosticState) -> DiagnosticState:
"""Execute Root Cause Analysis Agent"""
try:
if 'error' not in state:
root_cause_result = self.root_cause_agent.run(state['anomaly_result'])
state['root_cause_result'] = root_cause_result
except Exception as e:
state['error'] = f"Root Cause Analysis Error: {str(e)}"
return state
def _run_maintenance_recommendation(self, state: DiagnosticState) -> DiagnosticState:
"""Execute Maintenance Recommendation Agent"""
try:
if 'error' not in state:
maintenance_result = self.maintenance_agent.run(state['root_cause_result'])
state['maintenance_result'] = maintenance_result
except Exception as e:
state['error'] = f"Maintenance Recommendation Error: {str(e)}"
return state
def _run_report_generation(self, state: DiagnosticState) -> DiagnosticState:
"""Execute Report Generation Agent"""
try:
if 'error' not in state:
report_result = self.report_agent.run(
state['vehicle_id'],
state['prepared_data'],
state['anomaly_result'],
state['root_cause_result'],
state['maintenance_result']
)
state['report_result'] = report_result
except Exception as e:
state['error'] = f"Report Generation Error: {str(e)}"
return state
def diagnose_vehicle(self, vehicle_id: int, n_readings: int = None) -> Dict:
"""
Run complete diagnostic workflow for a vehicle
Args:
vehicle_id: ID of the vehicle to diagnose
n_readings: Optional number of recent readings to analyze
Returns:
Dictionary containing complete diagnostic results
"""
print("\n" + "="*60)
print("VEHICLE DIAGNOSTIC ORCHESTRATOR")
print("="*60)
print(f"Starting diagnostic workflow for Vehicle {vehicle_id}")
print("="*60 + "\n")
# Initialize state
initial_state = {
'vehicle_id': vehicle_id,
'n_readings': n_readings
}
# Execute workflow
final_state = self.workflow.invoke(initial_state)
# Check for errors
if 'error' in final_state:
print(f"\n❌ Error occurred: {final_state['error']}")
return {
'success': False,
'error': final_state['error'],
'vehicle_id': vehicle_id
}
print("\n" + "="*60)
print("DIAGNOSTIC WORKFLOW COMPLETED SUCCESSFULLY")
print("="*60)
# Return comprehensive results
return {
'success': True,
'vehicle_id': vehicle_id,
'prepared_data': final_state.get('prepared_data'),
'anomaly_result': final_state.get('anomaly_result'),
'root_cause_result': final_state.get('root_cause_result'),
'maintenance_result': final_state.get('maintenance_result'),
'report': final_state.get('report_result')
}
def diagnose_multiple_vehicles(self, vehicle_ids: list, n_readings: int = None) -> Dict:
"""
Run diagnostics for multiple vehicles
Args:
vehicle_ids: List of vehicle IDs
n_readings: Optional number of recent readings to analyze
Returns:
Dictionary mapping vehicle IDs to diagnostic results
"""
results = {}
print(f"\n{'='*60}")
print(f"BATCH DIAGNOSTICS - {len(vehicle_ids)} vehicles")
print(f"{'='*60}\n")
for i, vehicle_id in enumerate(vehicle_ids, 1):
print(f"\nProcessing vehicle {i}/{len(vehicle_ids)}: {vehicle_id}")
results[vehicle_id] = self.diagnose_vehicle(vehicle_id, n_readings)
print(f"\n{'='*60}")
print(f"BATCH DIAGNOSTICS COMPLETED")
print(f"{'='*60}")
# Summary statistics
successful = sum(1 for r in results.values() if r['success'])
with_anomalies = sum(1 for r in results.values()
if r['success'] and r.get('anomaly_result', {}).get('anomaly_detected'))
print(f"\nSummary:")
print(f" Total vehicles: {len(vehicle_ids)}")
print(f" Successfully analyzed: {successful}")
print(f" Vehicles with anomalies: {with_anomalies}")
return results
def main():
"""Test the orchestrator"""
orchestrator = VehicleDiagnosticOrchestrator()
# Load test data to get vehicle IDs
from agents.data_ingestion_agent import DataIngestionAgent
ingestion_agent = DataIngestionAgent()
test_df = ingestion_agent.load_test_data()
# Get a vehicle with anomalies
test_vehicle_id = None
for vid in test_df['vehicle_id'].unique()[:10]:
if test_df[test_df['vehicle_id'] == vid]['anomaly'].sum() > 0:
test_vehicle_id = vid
break
if test_vehicle_id:
# Run single vehicle diagnostic
result = orchestrator.diagnose_vehicle(test_vehicle_id, n_readings=200)
if result['success']:
print("\n" + "="*60)
print("DIAGNOSTIC REPORT PREVIEW")
print("="*60)
report = result['report']['full_report']
print(report[:2000] + "\n...\n")
print("\nNatural Language Summary:")
print("-"*60)
print(result['report']['natural_language_summary'])
if __name__ == '__main__':
main()
|