import logging from langgraph.graph import StateGraph, END from typing import Literal from .agents import router_node, planner_node, search_node from .agents import scrape_node, analyzer_node, ranker_node, synthesizer_node, verifier_node from .models import AgentState logger = logging.getLogger(__name__) def route_query(state: AgentState) -> Literal["planner", "synthesize"]: """Routes the query based on the router's decision.""" next_step = state.get("next_step", "research") if next_step in ("direct", "clarify"): logger.info("[WORKFLOW] Routing to direct answer (skipping research)") return "synthesize" return "planner" def should_skip_to_synthesize(state: AgentState) -> Literal["scrape", "synthesize"]: if state.get("error"): logger.warning(f"Error detected in state, skipping to synthesize: {state['error']}") return "synthesize" if not state.get("search_results"): logger.warning("No search results, skipping to synthesize") return "synthesize" return "scrape" def should_skip_to_synthesize_after_scrape(state: AgentState) -> Literal["analyzer", "synthesize"]: if state.get("error"): logger.warning(f"Error detected after scrape, skipping to synthesize") return "synthesize" if not state.get("scraped_content"): logger.warning("No scraped content, skipping analyzer") return "synthesize" return "analyzer" def should_retry_or_rank(state: AgentState) -> Literal["search", "ranker"]: # The analyzer_node sets analysis_round = 0 to explicitly signal a retry if state.get("analysis_round", 1) == 0: logger.info(f"[WORKFLOW] Retry triggered, going back to search (retry #{state.get('retry_count', 1)})") return "search" return "ranker" def check_reflexion(state: AgentState) -> Literal["synthesize", END]: """Routes back to synthesizer if verifier produced a critique.""" if state.get("status") == "pending_reflexion" and state.get("reflexion_steps", 0) < 3: logger.info(f"[WORKFLOW] Reflexion Loop triggered (Attempt {state.get('reflexion_steps')})") return "synthesize" return END def create_research_graph() -> StateGraph: workflow = StateGraph(AgentState) workflow.add_node("router", router_node) workflow.add_node("planner", planner_node) workflow.add_node("search", search_node) workflow.add_node("scrape", scrape_node) workflow.add_node("analyzer", analyzer_node) workflow.add_node("ranker", ranker_node) workflow.add_node("synthesize", synthesizer_node) workflow.add_node("verifier", verifier_node) # Entry point is now the router workflow.set_entry_point("router") # Conditional routing after the router workflow.add_conditional_edges("router", route_query) workflow.add_edge("planner", "search") workflow.add_conditional_edges("search", should_skip_to_synthesize) workflow.add_conditional_edges("scrape", should_skip_to_synthesize_after_scrape) workflow.add_conditional_edges("analyzer", should_retry_or_rank) workflow.add_edge("ranker", "synthesize") workflow.add_edge("synthesize", "verifier") workflow.add_conditional_edges("verifier", check_reflexion) return workflow.compile() research_graph = create_research_graph()