"""LangGraph definition — compliance workflow with parallel fan-out/fan-in and investigation loop.""" from __future__ import annotations from langgraph.graph import END, StateGraph from config import MAX_INVESTIGATION_ROUNDS from nodes.code_lookup import initial_code_lookup, targeted_code_lookup from nodes.compliance_analyst import compliance_analyst from nodes.compliance_planner import compliance_planner from nodes.cropper import ProgressCallback, execute_crops from nodes.annotator import annotate_crops from nodes.deliberation import deliberation from nodes.final_verdict import final_verdict from state import ComplianceState from tools.crop_cache import CropCache from tools.image_store import ImageStore def _build_compliance_graph( image_store: ImageStore, crop_cache: CropCache | None = None, progress_callback: ProgressCallback | None = None, ) -> StateGraph: """Build the compliance analysis graph with parallel fan-out/fan-in. Architecture: compliance_planner ├── execute_crops ──► annotate_crops (optional) ──┐ └── initial_code_lookup ─────────────────────────┤ ▼ compliance_analyst ◄──┐ │ │ │ │ │ │ │ │ (crops)(code)(done) │ │ │ │ │ └────┘ │ │ │ │ │ ▼ ▼ │ deliberation (opt) │ │ │ final_verdict ───────┘ │ END """ # ---- Wrap nodes that need ImageStore / CropCache / callback ---- def _execute_crops(state: ComplianceState) -> dict: return execute_crops(state, image_store, crop_cache, progress_callback) def _annotate_crops(state: ComplianceState) -> dict: return annotate_crops(state, image_store) def _compliance_analyst(state: ComplianceState) -> dict: return compliance_analyst(state, image_store) def _deliberation(state: ComplianceState) -> dict: return deliberation(state, image_store) # ---- Build graph ---- graph = StateGraph(ComplianceState) # Add all nodes graph.add_node("compliance_planner", compliance_planner) graph.add_node("execute_crops", _execute_crops) graph.add_node("annotate_crops", _annotate_crops) graph.add_node("initial_code_lookup", initial_code_lookup) graph.add_node("compliance_analyst", _compliance_analyst) graph.add_node("targeted_code_lookup", targeted_code_lookup) graph.add_node("deliberation", _deliberation) graph.add_node("final_verdict", final_verdict) # ---- Edges ---- # Entry: planner is always first graph.set_entry_point("compliance_planner") # Parallel fan-out: planner → both execute_crops AND initial_code_lookup graph.add_edge("compliance_planner", "execute_crops") graph.add_edge("compliance_planner", "initial_code_lookup") # After crops: optionally annotate, then go to compliance_analyst def _after_crops(state: ComplianceState) -> str: if not state.get("enable_annotation", True): return "compliance_analyst" crop_tasks = state.get("crop_tasks", []) if not any(t.get("annotate") and t.get("annotation_prompt") for t in crop_tasks): return "compliance_analyst" return "annotate_crops" graph.add_conditional_edges( "execute_crops", _after_crops, {"annotate_crops": "annotate_crops", "compliance_analyst": "compliance_analyst"}, ) graph.add_edge("annotate_crops", "compliance_analyst") # Code lookup also feeds into compliance_analyst (fan-in) graph.add_edge("initial_code_lookup", "compliance_analyst") # After analysis: loop for more evidence, deliberate, or go to verdict def _after_analyst(state: ComplianceState) -> str: needs_more = state.get("needs_more_investigation", False) round_num = state.get("investigation_round", 0) max_rounds = state.get("max_rounds", MAX_INVESTIGATION_ROUNDS) has_additional_crops = bool(state.get("additional_crop_tasks", [])) has_additional_code = bool(state.get("additional_code_queries", [])) enable_consensus = state.get("enable_consensus", False) if needs_more and round_num < max_rounds: if has_additional_code: return "targeted_code_lookup" if has_additional_crops: return "execute_crops" if enable_consensus: return "deliberation" return "final_verdict" graph.add_conditional_edges( "compliance_analyst", _after_analyst, { "execute_crops": "execute_crops", "targeted_code_lookup": "targeted_code_lookup", "deliberation": "deliberation", "final_verdict": "final_verdict", }, ) # Targeted code lookup feeds back to analyst graph.add_edge("targeted_code_lookup", "compliance_analyst") # Deliberation → verdict graph.add_edge("deliberation", "final_verdict") # Verdict → END graph.add_edge("final_verdict", END) return graph def compile_compliance_graph( image_store: ImageStore, crop_cache: CropCache | None = None, progress_callback: ProgressCallback | None = None, ): """Return a compiled, ready-to-invoke compliance graph.""" return _build_compliance_graph(image_store, crop_cache, progress_callback).compile()