Ryan2219's picture
Upload 70 files
e1ced8e verified
"""LangGraph definition β€” compliance workflow with parallel fan-out/fan-in and investigation loop."""
from __future__ import annotations
from langgraph.graph import END, StateGraph
from config import MAX_INVESTIGATION_ROUNDS
from nodes.code_lookup import initial_code_lookup, targeted_code_lookup
from nodes.compliance_analyst import compliance_analyst
from nodes.compliance_planner import compliance_planner
from nodes.cropper import ProgressCallback, execute_crops
from nodes.annotator import annotate_crops
from nodes.deliberation import deliberation
from nodes.final_verdict import final_verdict
from state import ComplianceState
from tools.crop_cache import CropCache
from tools.image_store import ImageStore
def _build_compliance_graph(
image_store: ImageStore,
crop_cache: CropCache | None = None,
progress_callback: ProgressCallback | None = None,
) -> StateGraph:
"""Build the compliance analysis graph with parallel fan-out/fan-in.
Architecture:
compliance_planner
β”œβ”€β”€ execute_crops ──► annotate_crops (optional) ──┐
└── initial_code_lookup ──────────────────────────
β–Ό
compliance_analyst ◄──┐
β”‚ β”‚ β”‚ β”‚
β”‚ β”‚ β”‚ β”‚
(crops)(code)(done) β”‚
β”‚ β”‚ β”‚ β”‚
β””β”€β”€β”€β”€β”˜ β”‚ β”‚
β”‚ β”‚ β”‚
β–Ό β–Ό β”‚
deliberation (opt) β”‚
β”‚ β”‚
final_verdict β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
END
"""
# ---- Wrap nodes that need ImageStore / CropCache / callback ----
def _execute_crops(state: ComplianceState) -> dict:
return execute_crops(state, image_store, crop_cache, progress_callback)
def _annotate_crops(state: ComplianceState) -> dict:
return annotate_crops(state, image_store)
def _compliance_analyst(state: ComplianceState) -> dict:
return compliance_analyst(state, image_store)
def _deliberation(state: ComplianceState) -> dict:
return deliberation(state, image_store)
# ---- Build graph ----
graph = StateGraph(ComplianceState)
# Add all nodes
graph.add_node("compliance_planner", compliance_planner)
graph.add_node("execute_crops", _execute_crops)
graph.add_node("annotate_crops", _annotate_crops)
graph.add_node("initial_code_lookup", initial_code_lookup)
graph.add_node("compliance_analyst", _compliance_analyst)
graph.add_node("targeted_code_lookup", targeted_code_lookup)
graph.add_node("deliberation", _deliberation)
graph.add_node("final_verdict", final_verdict)
# ---- Edges ----
# Entry: planner is always first
graph.set_entry_point("compliance_planner")
# Parallel fan-out: planner β†’ both execute_crops AND initial_code_lookup
graph.add_edge("compliance_planner", "execute_crops")
graph.add_edge("compliance_planner", "initial_code_lookup")
# After crops: optionally annotate, then go to compliance_analyst
def _after_crops(state: ComplianceState) -> str:
if not state.get("enable_annotation", True):
return "compliance_analyst"
crop_tasks = state.get("crop_tasks", [])
if not any(t.get("annotate") and t.get("annotation_prompt") for t in crop_tasks):
return "compliance_analyst"
return "annotate_crops"
graph.add_conditional_edges(
"execute_crops",
_after_crops,
{"annotate_crops": "annotate_crops", "compliance_analyst": "compliance_analyst"},
)
graph.add_edge("annotate_crops", "compliance_analyst")
# Code lookup also feeds into compliance_analyst (fan-in)
graph.add_edge("initial_code_lookup", "compliance_analyst")
# After analysis: loop for more evidence, deliberate, or go to verdict
def _after_analyst(state: ComplianceState) -> str:
needs_more = state.get("needs_more_investigation", False)
round_num = state.get("investigation_round", 0)
max_rounds = state.get("max_rounds", MAX_INVESTIGATION_ROUNDS)
has_additional_crops = bool(state.get("additional_crop_tasks", []))
has_additional_code = bool(state.get("additional_code_queries", []))
enable_consensus = state.get("enable_consensus", False)
if needs_more and round_num < max_rounds:
if has_additional_code:
return "targeted_code_lookup"
if has_additional_crops:
return "execute_crops"
if enable_consensus:
return "deliberation"
return "final_verdict"
graph.add_conditional_edges(
"compliance_analyst",
_after_analyst,
{
"execute_crops": "execute_crops",
"targeted_code_lookup": "targeted_code_lookup",
"deliberation": "deliberation",
"final_verdict": "final_verdict",
},
)
# Targeted code lookup feeds back to analyst
graph.add_edge("targeted_code_lookup", "compliance_analyst")
# Deliberation β†’ verdict
graph.add_edge("deliberation", "final_verdict")
# Verdict β†’ END
graph.add_edge("final_verdict", END)
return graph
def compile_compliance_graph(
image_store: ImageStore,
crop_cache: CropCache | None = None,
progress_callback: ProgressCallback | None = None,
):
"""Return a compiled, ready-to-invoke compliance graph."""
return _build_compliance_graph(image_store, crop_cache, progress_callback).compile()