Ryan2219's picture
Upload 8 files
3efd9e8 verified
"""LangGraph definition — the core workflow graph with conditional routing."""
from __future__ import annotations
from collections.abc import Callable
from langgraph.graph import END, StateGraph
from config import MAX_INVESTIGATION_ROUNDS
from nodes.analyzer import analyze_findings
from nodes.annotator import annotate_crops
from nodes.consensus import consensus_review
from nodes.cropper import ProgressCallback, execute_crops
from nodes.ingest import ingest_pdf
from nodes.planner import plan_and_select
from nodes.synthesizer import synthesize_answer
from state import DrawingReaderState
from tools.crop_cache import CropCache
from tools.image_store import ImageStore
def _build_question_graph(
image_store: ImageStore,
crop_cache: CropCache | None = None,
progress_callback: ProgressCallback | None = None,
) -> StateGraph:
"""Build the per-question investigation graph.
The ``image_store``, ``crop_cache``, and ``progress_callback`` are
injected into nodes that need them via closures.
"""
# ---- Wrap nodes that need ImageStore / CropCache / callback ----
def _execute_crops(state: DrawingReaderState) -> dict:
return execute_crops(state, image_store, crop_cache, progress_callback)
def _annotate_crops(state: DrawingReaderState) -> dict:
return annotate_crops(state, image_store)
def _analyze_findings(state: DrawingReaderState) -> dict:
return analyze_findings(state, image_store)
def _consensus_review(state: DrawingReaderState) -> dict:
return consensus_review(state, image_store)
# ---- Build graph ----
graph = StateGraph(DrawingReaderState)
# Add nodes
graph.add_node("plan_and_select", plan_and_select)
graph.add_node("execute_crops", _execute_crops)
graph.add_node("annotate_crops", _annotate_crops)
graph.add_node("analyze_findings", _analyze_findings)
graph.add_node("consensus_review", _consensus_review)
graph.add_node("synthesize_answer", synthesize_answer)
# ---- Edges ----
graph.set_entry_point("plan_and_select")
graph.add_edge("plan_and_select", "execute_crops")
# Dynamic: skip annotate_crops when disabled or when no crops need annotation
def _after_crops(state: DrawingReaderState) -> str:
if not state.get("enable_annotation", True):
return "analyze_findings"
crop_tasks = state.get("crop_tasks", [])
if not any(t.get("annotate") and t.get("annotation_prompt") for t in crop_tasks):
return "analyze_findings"
return "annotate_crops"
graph.add_conditional_edges(
"execute_crops",
_after_crops,
{"annotate_crops": "annotate_crops", "analyze_findings": "analyze_findings"},
)
graph.add_edge("annotate_crops", "analyze_findings")
# Conditional: after analysis, either loop back to crops, go to consensus, or finish
def _after_analysis(state: DrawingReaderState) -> str:
needs_more = state.get("needs_more_investigation", False)
round_num = state.get("investigation_round", 0)
max_rounds = state.get("max_rounds", MAX_INVESTIGATION_ROUNDS)
enable_consensus = state.get("enable_consensus", False)
if needs_more and round_num < max_rounds:
return "execute_crops"
elif enable_consensus:
return "consensus_review"
else:
return "synthesize_answer"
graph.add_conditional_edges(
"analyze_findings",
_after_analysis,
{
"execute_crops": "execute_crops",
"consensus_review": "consensus_review",
"synthesize_answer": "synthesize_answer",
},
)
graph.add_edge("consensus_review", "synthesize_answer")
graph.add_edge("synthesize_answer", END)
return graph
def build_ingest_graph() -> StateGraph:
"""Build the one-time PDF ingestion graph."""
graph = StateGraph(DrawingReaderState)
graph.add_node("ingest_pdf", ingest_pdf)
graph.set_entry_point("ingest_pdf")
graph.add_edge("ingest_pdf", END)
return graph
def compile_question_graph(
image_store: ImageStore,
crop_cache: CropCache | None = None,
progress_callback: ProgressCallback | None = None,
):
"""Return a compiled, ready-to-invoke question graph."""
return _build_question_graph(image_store, crop_cache, progress_callback).compile()
def compile_ingest_graph():
"""Return a compiled, ready-to-invoke ingest graph."""
return build_ingest_graph().compile()