File size: 6,402 Bytes
e1ced8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""LangGraph definition β€” compliance workflow with parallel fan-out/fan-in and investigation loop."""
from __future__ import annotations

from langgraph.graph import END, StateGraph

from config import MAX_INVESTIGATION_ROUNDS
from nodes.code_lookup import initial_code_lookup, targeted_code_lookup
from nodes.compliance_analyst import compliance_analyst
from nodes.compliance_planner import compliance_planner
from nodes.cropper import ProgressCallback, execute_crops
from nodes.annotator import annotate_crops
from nodes.deliberation import deliberation
from nodes.final_verdict import final_verdict
from state import ComplianceState
from tools.crop_cache import CropCache
from tools.image_store import ImageStore


def _build_compliance_graph(

    image_store: ImageStore,

    crop_cache: CropCache | None = None,

    progress_callback: ProgressCallback | None = None,

) -> StateGraph:
    """Build the compliance analysis graph with parallel fan-out/fan-in.



    Architecture:

        compliance_planner

            β”œβ”€β”€ execute_crops ──► annotate_crops (optional) ──┐

            └── initial_code_lookup ──────────────────────────

                                                              β–Ό

                                                   compliance_analyst ◄──┐

                                                      β”‚    β”‚    β”‚        β”‚

                                                      β”‚    β”‚    β”‚        β”‚

                                               (crops)(code)(done)       β”‚

                                                      β”‚    β”‚    β”‚        β”‚

                                                      β””β”€β”€β”€β”€β”˜    β”‚        β”‚

                                                           β”‚    β”‚        β”‚

                                                           β–Ό    β–Ό        β”‚

                                                    deliberation (opt)   β”‚

                                                           β”‚             β”‚

                                                    final_verdict β”€β”€β”€β”€β”€β”€β”€β”˜

                                                           β”‚

                                                          END

    """

    # ---- Wrap nodes that need ImageStore / CropCache / callback ----
    def _execute_crops(state: ComplianceState) -> dict:
        return execute_crops(state, image_store, crop_cache, progress_callback)

    def _annotate_crops(state: ComplianceState) -> dict:
        return annotate_crops(state, image_store)

    def _compliance_analyst(state: ComplianceState) -> dict:
        return compliance_analyst(state, image_store)

    def _deliberation(state: ComplianceState) -> dict:
        return deliberation(state, image_store)

    # ---- Build graph ----
    graph = StateGraph(ComplianceState)

    # Add all nodes
    graph.add_node("compliance_planner", compliance_planner)
    graph.add_node("execute_crops", _execute_crops)
    graph.add_node("annotate_crops", _annotate_crops)
    graph.add_node("initial_code_lookup", initial_code_lookup)
    graph.add_node("compliance_analyst", _compliance_analyst)
    graph.add_node("targeted_code_lookup", targeted_code_lookup)
    graph.add_node("deliberation", _deliberation)
    graph.add_node("final_verdict", final_verdict)

    # ---- Edges ----

    # Entry: planner is always first
    graph.set_entry_point("compliance_planner")

    # Parallel fan-out: planner β†’ both execute_crops AND initial_code_lookup
    graph.add_edge("compliance_planner", "execute_crops")
    graph.add_edge("compliance_planner", "initial_code_lookup")

    # After crops: optionally annotate, then go to compliance_analyst
    def _after_crops(state: ComplianceState) -> str:
        if not state.get("enable_annotation", True):
            return "compliance_analyst"
        crop_tasks = state.get("crop_tasks", [])
        if not any(t.get("annotate") and t.get("annotation_prompt") for t in crop_tasks):
            return "compliance_analyst"
        return "annotate_crops"

    graph.add_conditional_edges(
        "execute_crops",
        _after_crops,
        {"annotate_crops": "annotate_crops", "compliance_analyst": "compliance_analyst"},
    )
    graph.add_edge("annotate_crops", "compliance_analyst")

    # Code lookup also feeds into compliance_analyst (fan-in)
    graph.add_edge("initial_code_lookup", "compliance_analyst")

    # After analysis: loop for more evidence, deliberate, or go to verdict
    def _after_analyst(state: ComplianceState) -> str:
        needs_more = state.get("needs_more_investigation", False)
        round_num = state.get("investigation_round", 0)
        max_rounds = state.get("max_rounds", MAX_INVESTIGATION_ROUNDS)
        has_additional_crops = bool(state.get("additional_crop_tasks", []))
        has_additional_code = bool(state.get("additional_code_queries", []))
        enable_consensus = state.get("enable_consensus", False)

        if needs_more and round_num < max_rounds:
            if has_additional_code:
                return "targeted_code_lookup"
            if has_additional_crops:
                return "execute_crops"
        if enable_consensus:
            return "deliberation"
        return "final_verdict"

    graph.add_conditional_edges(
        "compliance_analyst",
        _after_analyst,
        {
            "execute_crops": "execute_crops",
            "targeted_code_lookup": "targeted_code_lookup",
            "deliberation": "deliberation",
            "final_verdict": "final_verdict",
        },
    )

    # Targeted code lookup feeds back to analyst
    graph.add_edge("targeted_code_lookup", "compliance_analyst")

    # Deliberation β†’ verdict
    graph.add_edge("deliberation", "final_verdict")

    # Verdict β†’ END
    graph.add_edge("final_verdict", END)

    return graph


def compile_compliance_graph(

    image_store: ImageStore,

    crop_cache: CropCache | None = None,

    progress_callback: ProgressCallback | None = None,

):
    """Return a compiled, ready-to-invoke compliance graph."""
    return _build_compliance_graph(image_store, crop_cache, progress_callback).compile()