Debashis commited on
Commit
b0a4e08
Β·
1 Parent(s): a6952ae

Simplify LangGraph orchestrator - focus on core example

Browse files
backend/src/agents/langgraph_orchestrator.py ADDED
@@ -0,0 +1,248 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ LangGraph-based Multi-Agent Orchestration for Incident Management
3
+ """
4
+
5
+ from typing import TypedDict, Optional, List
6
+ from datetime import datetime
7
+ import logging
8
+ import json
9
+ from langgraph.graph import StateGraph, END
10
+ from langgraph.checkpoint.memory import MemorySaver
11
+ from langchain_core.messages import HumanMessage
12
+ from langchain_ollama import ChatOllama
13
+
14
+ logger = logging.getLogger(__name__)
15
+
16
+
17
+ class AlertState(TypedDict):
18
+ """Shared state between agents"""
19
+ raw_alert: dict
20
+ normalized_alert: Optional[dict]
21
+ alert_id: Optional[str]
22
+ similar_alerts: List[dict]
23
+ incident_id: Optional[str]
24
+ root_cause: Optional[str]
25
+ confidence: float
26
+ recommendations: List[str]
27
+ execution_log: List[str]
28
+
29
+
30
+ class AlertIngestionAgent:
31
+ """Normalize and deduplicate alerts"""
32
+
33
+ async def __call__(self, state: AlertState) -> dict:
34
+ alert = state["raw_alert"]
35
+ normalized = {
36
+ "id": f"alert_{hash(str(alert))}",
37
+ "timestamp": alert.get("timestamp"),
38
+ "source": alert.get("source"),
39
+ "severity": alert.get("severity", "medium"),
40
+ "message": alert.get("message", ""),
41
+ }
42
+
43
+ log_msg = f"βœ“ Ingested alert from {alert.get('source')}"
44
+ return {
45
+ "normalized_alert": normalized,
46
+ "alert_id": normalized["id"],
47
+ "execution_log": state.get("execution_log", []) + [log_msg],
48
+ }
49
+
50
+
51
+ class CorrelationAgent:
52
+ """Find similar alerts and create incidents"""
53
+
54
+ async def __call__(self, state: AlertState) -> dict:
55
+ alert = state["normalized_alert"]
56
+ similar = [alert] if alert else []
57
+ incident_id = f"incident_{hash(alert.get('message', ''))}" if alert else None
58
+
59
+ log_msg = f"βœ“ Found {len(similar)} related alerts, incident: {incident_id}"
60
+ return {
61
+ "similar_alerts": similar,
62
+ "incident_id": incident_id,
63
+ "execution_log": state.get("execution_log", []) + [log_msg],
64
+ }
65
+
66
+
67
+ class AnalysisAgent:
68
+ """Analyze with Ollama LLM"""
69
+
70
+ def __init__(self):
71
+ self.llm = ChatOllama(model="mistral", base_url="http://localhost:11434")
72
+
73
+ async def __call__(self, state: AlertState) -> dict:
74
+ alert = state["normalized_alert"]
75
+ if not alert:
76
+ return {"root_cause": None, "confidence": 0.0, "recommendations": []}
77
+
78
+ prompt = f"""Analyze this incident and provide root cause:
79
+
80
+ Alert: {alert.get('message')}
81
+ Severity: {alert.get('severity')}
82
+ Source: {alert.get('source')}
83
+
84
+ Respond in JSON: {{"root_cause": "...", "confidence": 0.8, "recommendations": ["..."]}}"""
85
+
86
+ try:
87
+ response = self.llm.invoke(prompt)
88
+ result = json.loads(response.content)
89
+ log_msg = "βœ“ Analysis complete"
90
+ except Exception as e:
91
+ result = {"root_cause": "Analysis error", "confidence": 0.0, "recommendations": []}
92
+ log_msg = f"βœ— Analysis failed: {str(e)}"
93
+
94
+ return {
95
+ "root_cause": result.get("root_cause"),
96
+ "confidence": result.get("confidence", 0.0),
97
+ "recommendations": result.get("recommendations", []),
98
+ "execution_log": state.get("execution_log", []) + [log_msg],
99
+ }
100
+
101
+
102
+ class ResponseAgent:
103
+ """Send notifications and publish results"""
104
+
105
+ async def __call__(self, state: AlertState) -> dict:
106
+ log_msg = f"βœ“ Response sent: {state.get('incident_id')}"
107
+
108
+ response_data = {
109
+ "incident_id": state.get("incident_id"),
110
+ "root_cause": state.get("root_cause"),
111
+ "confidence": state.get("confidence"),
112
+ "recommendations": state.get("recommendations"),
113
+ }
114
+
115
+ logger.info(f"Response: {json.dumps(response_data, indent=2)}")
116
+
117
+ return {
118
+ "execution_log": state.get("execution_log", []) + [log_msg],
119
+ }
120
+
121
+
122
+ class IncidentManagementWorkflow:
123
+ """LangGraph workflow orchestrator"""
124
+
125
+ def __init__(self):
126
+ self.ingestion_agent = AlertIngestionAgent()
127
+ self.correlation_agent = CorrelationAgent()
128
+ self.analysis_agent = AnalysisAgent()
129
+ self.response_agent = ResponseAgent()
130
+ self.graph = self._build_graph()
131
+
132
+ def _build_graph(self):
133
+ workflow = StateGraph(AlertState)
134
+
135
+ # Add nodes
136
+ workflow.add_node("ingest", self._ingest_node)
137
+ workflow.add_node("correlate", self._correlate_node)
138
+ workflow.add_node("analyze", self._analyze_node)
139
+ workflow.add_node("respond", self._respond_node)
140
+
141
+ # Add edges
142
+ workflow.add_edge("ingest", "correlate")
143
+ workflow.add_edge("correlate", "analyze")
144
+ workflow.add_edge("analyze", "respond")
145
+ workflow.add_edge("respond", END)
146
+
147
+ # Set entry point
148
+ workflow.set_entry_point("ingest")
149
+
150
+ # Compile with memory checkpoint
151
+ return workflow.compile(checkpointer=MemorySaver())
152
+
153
+ async def _ingest_node(self, state: AlertState):
154
+ return await self.ingestion_agent(state)
155
+
156
+ async def _correlate_node(self, state: AlertState):
157
+ return await self.correlation_agent(state)
158
+
159
+ async def _analyze_node(self, state: AlertState):
160
+ return await self.analysis_agent(state)
161
+
162
+ async def _respond_node(self, state: AlertState):
163
+ return await self.response_agent(state)
164
+
165
+ async def process_alert(self, raw_alert: dict):
166
+ """Execute complete workflow"""
167
+ initial_state = AlertState(
168
+ raw_alert=raw_alert,
169
+ normalized_alert=None,
170
+ alert_id=None,
171
+ similar_alerts=[],
172
+ incident_id=None,
173
+ root_cause=None,
174
+ confidence=0.0,
175
+ recommendations=[],
176
+ execution_log=[],
177
+ )
178
+
179
+ result = await self.graph.ainvoke(initial_state)
180
+ return result
181
+
182
+ def visualize_workflow(self):
183
+ """Print ASCII workflow diagram"""
184
+ return """
185
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
186
+ β”‚ LangGraph Incident Management Workflow β”‚
187
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
188
+
189
+ Alert Input
190
+ β”‚
191
+ β–Ό
192
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
193
+ β”‚ 1. Ingest Agent β”‚ β†’ Normalize & deduplicate
194
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
195
+ β”‚
196
+ β–Ό
197
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
198
+ β”‚ 2. Correlate Agent β”‚ β†’ Find related alerts
199
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
200
+ β”‚
201
+ β–Ό
202
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
203
+ β”‚ 3. Analysis Agent β”‚ β†’ Ollama LLM analysis
204
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
205
+ β”‚
206
+ β–Ό
207
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
208
+ β”‚ 4. Response Agent β”‚ β†’ Send notifications
209
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
210
+ β”‚
211
+ β–Ό
212
+ Response Output
213
+ """
214
+
215
+
216
+ # Usage Example
217
+ if __name__ == "__main__":
218
+ import asyncio
219
+
220
+ async def main():
221
+ workflow = IncidentManagementWorkflow()
222
+
223
+ # Print workflow
224
+ print(workflow.visualize_workflow())
225
+
226
+ # Sample alert
227
+ sample_alert = {
228
+ "timestamp": "2024-01-15T10:30:00Z",
229
+ "source": "prometheus",
230
+ "severity": "high",
231
+ "message": "CPU usage exceeded 90% on prod-server-01",
232
+ }
233
+
234
+ # Execute workflow
235
+ print("Processing alert...")
236
+ result = await workflow.process_alert(sample_alert)
237
+
238
+ # Display results
239
+ print("\nβœ“ Workflow Complete!")
240
+ print(f"Incident ID: {result.get('incident_id')}")
241
+ print(f"Root Cause: {result.get('root_cause')}")
242
+ print(f"Confidence: {result.get('confidence'):.0%}")
243
+ print(f"Recommendations: {result.get('recommendations')}")
244
+ print("\nExecution Log:")
245
+ for log in result.get("execution_log", []):
246
+ print(f" {log}")
247
+
248
+ asyncio.run(main())