File size: 6,402 Bytes
e1ced8e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | """LangGraph definition β compliance workflow with parallel fan-out/fan-in and investigation loop."""
from __future__ import annotations
from langgraph.graph import END, StateGraph
from config import MAX_INVESTIGATION_ROUNDS
from nodes.code_lookup import initial_code_lookup, targeted_code_lookup
from nodes.compliance_analyst import compliance_analyst
from nodes.compliance_planner import compliance_planner
from nodes.cropper import ProgressCallback, execute_crops
from nodes.annotator import annotate_crops
from nodes.deliberation import deliberation
from nodes.final_verdict import final_verdict
from state import ComplianceState
from tools.crop_cache import CropCache
from tools.image_store import ImageStore
def _build_compliance_graph(
image_store: ImageStore,
crop_cache: CropCache | None = None,
progress_callback: ProgressCallback | None = None,
) -> StateGraph:
"""Build the compliance analysis graph with parallel fan-out/fan-in.
Architecture:
compliance_planner
βββ execute_crops βββΊ annotate_crops (optional) βββ
βββ initial_code_lookup ββββββββββββββββββββββββββ€
βΌ
compliance_analyst ββββ
β β β β
β β β β
(crops)(code)(done) β
β β β β
ββββββ β β
β β β
βΌ βΌ β
deliberation (opt) β
β β
final_verdict ββββββββ
β
END
"""
# ---- Wrap nodes that need ImageStore / CropCache / callback ----
def _execute_crops(state: ComplianceState) -> dict:
return execute_crops(state, image_store, crop_cache, progress_callback)
def _annotate_crops(state: ComplianceState) -> dict:
return annotate_crops(state, image_store)
def _compliance_analyst(state: ComplianceState) -> dict:
return compliance_analyst(state, image_store)
def _deliberation(state: ComplianceState) -> dict:
return deliberation(state, image_store)
# ---- Build graph ----
graph = StateGraph(ComplianceState)
# Add all nodes
graph.add_node("compliance_planner", compliance_planner)
graph.add_node("execute_crops", _execute_crops)
graph.add_node("annotate_crops", _annotate_crops)
graph.add_node("initial_code_lookup", initial_code_lookup)
graph.add_node("compliance_analyst", _compliance_analyst)
graph.add_node("targeted_code_lookup", targeted_code_lookup)
graph.add_node("deliberation", _deliberation)
graph.add_node("final_verdict", final_verdict)
# ---- Edges ----
# Entry: planner is always first
graph.set_entry_point("compliance_planner")
# Parallel fan-out: planner β both execute_crops AND initial_code_lookup
graph.add_edge("compliance_planner", "execute_crops")
graph.add_edge("compliance_planner", "initial_code_lookup")
# After crops: optionally annotate, then go to compliance_analyst
def _after_crops(state: ComplianceState) -> str:
if not state.get("enable_annotation", True):
return "compliance_analyst"
crop_tasks = state.get("crop_tasks", [])
if not any(t.get("annotate") and t.get("annotation_prompt") for t in crop_tasks):
return "compliance_analyst"
return "annotate_crops"
graph.add_conditional_edges(
"execute_crops",
_after_crops,
{"annotate_crops": "annotate_crops", "compliance_analyst": "compliance_analyst"},
)
graph.add_edge("annotate_crops", "compliance_analyst")
# Code lookup also feeds into compliance_analyst (fan-in)
graph.add_edge("initial_code_lookup", "compliance_analyst")
# After analysis: loop for more evidence, deliberate, or go to verdict
def _after_analyst(state: ComplianceState) -> str:
needs_more = state.get("needs_more_investigation", False)
round_num = state.get("investigation_round", 0)
max_rounds = state.get("max_rounds", MAX_INVESTIGATION_ROUNDS)
has_additional_crops = bool(state.get("additional_crop_tasks", []))
has_additional_code = bool(state.get("additional_code_queries", []))
enable_consensus = state.get("enable_consensus", False)
if needs_more and round_num < max_rounds:
if has_additional_code:
return "targeted_code_lookup"
if has_additional_crops:
return "execute_crops"
if enable_consensus:
return "deliberation"
return "final_verdict"
graph.add_conditional_edges(
"compliance_analyst",
_after_analyst,
{
"execute_crops": "execute_crops",
"targeted_code_lookup": "targeted_code_lookup",
"deliberation": "deliberation",
"final_verdict": "final_verdict",
},
)
# Targeted code lookup feeds back to analyst
graph.add_edge("targeted_code_lookup", "compliance_analyst")
# Deliberation β verdict
graph.add_edge("deliberation", "final_verdict")
# Verdict β END
graph.add_edge("final_verdict", END)
return graph
def compile_compliance_graph(
image_store: ImageStore,
crop_cache: CropCache | None = None,
progress_callback: ProgressCallback | None = None,
):
"""Return a compiled, ready-to-invoke compliance graph."""
return _build_compliance_graph(image_store, crop_cache, progress_callback).compile()
|