| """LangGraph definition — the core workflow graph with conditional routing."""
|
| from __future__ import annotations
|
|
|
| from collections.abc import Callable
|
|
|
| from langgraph.graph import END, StateGraph
|
|
|
| from config import MAX_INVESTIGATION_ROUNDS
|
| from nodes.analyzer import analyze_findings
|
| from nodes.annotator import annotate_crops
|
| from nodes.consensus import consensus_review
|
| from nodes.cropper import ProgressCallback, execute_crops
|
| from nodes.ingest import ingest_pdf
|
| from nodes.planner import plan_and_select
|
| from nodes.synthesizer import synthesize_answer
|
| from state import DrawingReaderState
|
| from tools.crop_cache import CropCache
|
| from tools.image_store import ImageStore
|
|
|
|
|
| def _build_question_graph(
|
| image_store: ImageStore,
|
| crop_cache: CropCache | None = None,
|
| progress_callback: ProgressCallback | None = None,
|
| ) -> StateGraph:
|
| """Build the per-question investigation graph.
|
|
|
| The ``image_store``, ``crop_cache``, and ``progress_callback`` are
|
| injected into nodes that need them via closures.
|
| """
|
|
|
|
|
| def _execute_crops(state: DrawingReaderState) -> dict:
|
| return execute_crops(state, image_store, crop_cache, progress_callback)
|
|
|
| def _annotate_crops(state: DrawingReaderState) -> dict:
|
| return annotate_crops(state, image_store)
|
|
|
| def _analyze_findings(state: DrawingReaderState) -> dict:
|
| return analyze_findings(state, image_store)
|
|
|
| def _consensus_review(state: DrawingReaderState) -> dict:
|
| return consensus_review(state, image_store)
|
|
|
|
|
| graph = StateGraph(DrawingReaderState)
|
|
|
|
|
| graph.add_node("plan_and_select", plan_and_select)
|
| graph.add_node("execute_crops", _execute_crops)
|
| graph.add_node("annotate_crops", _annotate_crops)
|
| graph.add_node("analyze_findings", _analyze_findings)
|
| graph.add_node("consensus_review", _consensus_review)
|
| graph.add_node("synthesize_answer", synthesize_answer)
|
|
|
|
|
|
|
| graph.set_entry_point("plan_and_select")
|
| graph.add_edge("plan_and_select", "execute_crops")
|
|
|
|
|
| def _after_crops(state: DrawingReaderState) -> str:
|
| if not state.get("enable_annotation", True):
|
| return "analyze_findings"
|
| crop_tasks = state.get("crop_tasks", [])
|
| if not any(t.get("annotate") and t.get("annotation_prompt") for t in crop_tasks):
|
| return "analyze_findings"
|
| return "annotate_crops"
|
|
|
| graph.add_conditional_edges(
|
| "execute_crops",
|
| _after_crops,
|
| {"annotate_crops": "annotate_crops", "analyze_findings": "analyze_findings"},
|
| )
|
| graph.add_edge("annotate_crops", "analyze_findings")
|
|
|
|
|
| def _after_analysis(state: DrawingReaderState) -> str:
|
| needs_more = state.get("needs_more_investigation", False)
|
| round_num = state.get("investigation_round", 0)
|
| max_rounds = state.get("max_rounds", MAX_INVESTIGATION_ROUNDS)
|
| enable_consensus = state.get("enable_consensus", False)
|
|
|
| if needs_more and round_num < max_rounds:
|
| return "execute_crops"
|
| elif enable_consensus:
|
| return "consensus_review"
|
| else:
|
| return "synthesize_answer"
|
|
|
| graph.add_conditional_edges(
|
| "analyze_findings",
|
| _after_analysis,
|
| {
|
| "execute_crops": "execute_crops",
|
| "consensus_review": "consensus_review",
|
| "synthesize_answer": "synthesize_answer",
|
| },
|
| )
|
|
|
| graph.add_edge("consensus_review", "synthesize_answer")
|
| graph.add_edge("synthesize_answer", END)
|
|
|
| return graph
|
|
|
|
|
| def build_ingest_graph() -> StateGraph:
|
| """Build the one-time PDF ingestion graph."""
|
| graph = StateGraph(DrawingReaderState)
|
| graph.add_node("ingest_pdf", ingest_pdf)
|
| graph.set_entry_point("ingest_pdf")
|
| graph.add_edge("ingest_pdf", END)
|
| return graph
|
|
|
|
|
| def compile_question_graph(
|
| image_store: ImageStore,
|
| crop_cache: CropCache | None = None,
|
| progress_callback: ProgressCallback | None = None,
|
| ):
|
| """Return a compiled, ready-to-invoke question graph."""
|
| return _build_question_graph(image_store, crop_cache, progress_callback).compile()
|
|
|
|
|
| def compile_ingest_graph():
|
| """Return a compiled, ready-to-invoke ingest graph."""
|
| return build_ingest_graph().compile()
|
|
|