Spaces:
Running
Running
| """ | |
| LangGraph multi-agent orchestration for SentinelAI. | |
| Compiles a small state machine that mirrors enrich β detect β correlate stages. | |
| """ | |
| from __future__ import annotations | |
| from typing import Any | |
| try: | |
| from langgraph.graph import END, StateGraph | |
| except ImportError: # pragma: no cover | |
| StateGraph = None # type: ignore[misc, assignment] | |
| END = None # type: ignore[misc, assignment] | |
| def build_soc_graph(handlers: dict[str, Any]): | |
| """handlers: optional async callables enrich/detect/correlate receiving mutable state dict.""" | |
| if StateGraph is None or END is None: | |
| return None | |
| graph = StateGraph(dict) | |
| async def enrich_node(state: dict[str, Any]) -> dict[str, Any]: | |
| fn = handlers.get("enrich") | |
| if fn: | |
| await fn(state) | |
| notes = list(state.get("notes", [])) | |
| notes.append("enrich_complete") | |
| return {**state, "notes": notes} | |
| async def detect_node(state: dict[str, Any]) -> dict[str, Any]: | |
| fn = handlers.get("detect") | |
| if fn: | |
| await fn(state) | |
| notes = list(state.get("notes", [])) | |
| notes.append("detect_complete") | |
| return {**state, "notes": notes} | |
| async def correlate_node(state: dict[str, Any]) -> dict[str, Any]: | |
| fn = handlers.get("correlate") | |
| if fn: | |
| await fn(state) | |
| notes = list(state.get("notes", [])) | |
| notes.append("correlate_complete") | |
| return {**state, "notes": notes} | |
| graph.add_node("enrich", enrich_node) | |
| graph.add_node("detect", detect_node) | |
| graph.add_node("correlate", correlate_node) | |
| graph.set_entry_point("enrich") | |
| graph.add_edge("enrich", "detect") | |
| graph.add_edge("detect", "correlate") | |
| graph.add_edge("correlate", END) | |
| return graph.compile() | |