File size: 3,318 Bytes
f10fe83
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import logging
from langgraph.graph import StateGraph, END
from typing import Literal

from .agents import router_node, planner_node, search_node
from .agents import scrape_node, analyzer_node, ranker_node, synthesizer_node, verifier_node
from .models import AgentState

logger = logging.getLogger(__name__)


def route_query(state: AgentState) -> Literal["planner", "synthesize"]:
    """Routes the query based on the router's decision."""
    next_step = state.get("next_step", "research")
    if next_step in ("direct", "clarify"):
        logger.info("[WORKFLOW] Routing to direct answer (skipping research)")
        return "synthesize"
    return "planner"


def should_skip_to_synthesize(state: AgentState) -> Literal["scrape", "synthesize"]:
    if state.get("error"):
        logger.warning(f"Error detected in state, skipping to synthesize: {state['error']}")
        return "synthesize"
    if not state.get("search_results"):
        logger.warning("No search results, skipping to synthesize")
        return "synthesize"
    return "scrape"


def should_skip_to_synthesize_after_scrape(state: AgentState) -> Literal["analyzer", "synthesize"]:
    if state.get("error"):
        logger.warning(f"Error detected after scrape, skipping to synthesize")
        return "synthesize"
    if not state.get("scraped_content"):
        logger.warning("No scraped content, skipping analyzer")
        return "synthesize"
    return "analyzer"


def should_retry_or_rank(state: AgentState) -> Literal["search", "ranker"]:
    # The analyzer_node sets analysis_round = 0 to explicitly signal a retry
    if state.get("analysis_round", 1) == 0:
        logger.info(f"[WORKFLOW] Retry triggered, going back to search (retry #{state.get('retry_count', 1)})")
        return "search"
    return "ranker"


def check_reflexion(state: AgentState) -> Literal["synthesize", END]:
    """Routes back to synthesizer if verifier produced a critique."""
    if state.get("status") == "pending_reflexion" and state.get("reflexion_steps", 0) < 3:
        logger.info(f"[WORKFLOW] Reflexion Loop triggered (Attempt {state.get('reflexion_steps')})")
        return "synthesize"
    return END

def create_research_graph() -> StateGraph:
    workflow = StateGraph(AgentState)

    workflow.add_node("router", router_node)
    workflow.add_node("planner", planner_node)
    workflow.add_node("search", search_node)
    workflow.add_node("scrape", scrape_node)
    workflow.add_node("analyzer", analyzer_node)
    workflow.add_node("ranker", ranker_node)
    workflow.add_node("synthesize", synthesizer_node)
    workflow.add_node("verifier", verifier_node)

    # Entry point is now the router
    workflow.set_entry_point("router")

    # Conditional routing after the router
    workflow.add_conditional_edges("router", route_query)

    workflow.add_edge("planner", "search")
    workflow.add_conditional_edges("search", should_skip_to_synthesize)
    workflow.add_conditional_edges("scrape", should_skip_to_synthesize_after_scrape)
    workflow.add_conditional_edges("analyzer", should_retry_or_rank)
    workflow.add_edge("ranker", "synthesize")
    workflow.add_edge("synthesize", "verifier")
    workflow.add_conditional_edges("verifier", check_reflexion)

    return workflow.compile()


research_graph = create_research_graph()