NextQuest.ai / src /workflow.py
ajeet9843
Deploy to Hugging Face Spaces
f10fe83
import logging
from langgraph.graph import StateGraph, END
from typing import Literal
from .agents import router_node, planner_node, search_node
from .agents import scrape_node, analyzer_node, ranker_node, synthesizer_node, verifier_node
from .models import AgentState
logger = logging.getLogger(__name__)
def route_query(state: AgentState) -> Literal["planner", "synthesize"]:
"""Routes the query based on the router's decision."""
next_step = state.get("next_step", "research")
if next_step in ("direct", "clarify"):
logger.info("[WORKFLOW] Routing to direct answer (skipping research)")
return "synthesize"
return "planner"
def should_skip_to_synthesize(state: AgentState) -> Literal["scrape", "synthesize"]:
if state.get("error"):
logger.warning(f"Error detected in state, skipping to synthesize: {state['error']}")
return "synthesize"
if not state.get("search_results"):
logger.warning("No search results, skipping to synthesize")
return "synthesize"
return "scrape"
def should_skip_to_synthesize_after_scrape(state: AgentState) -> Literal["analyzer", "synthesize"]:
if state.get("error"):
logger.warning(f"Error detected after scrape, skipping to synthesize")
return "synthesize"
if not state.get("scraped_content"):
logger.warning("No scraped content, skipping analyzer")
return "synthesize"
return "analyzer"
def should_retry_or_rank(state: AgentState) -> Literal["search", "ranker"]:
# The analyzer_node sets analysis_round = 0 to explicitly signal a retry
if state.get("analysis_round", 1) == 0:
logger.info(f"[WORKFLOW] Retry triggered, going back to search (retry #{state.get('retry_count', 1)})")
return "search"
return "ranker"
def check_reflexion(state: AgentState) -> Literal["synthesize", END]:
"""Routes back to synthesizer if verifier produced a critique."""
if state.get("status") == "pending_reflexion" and state.get("reflexion_steps", 0) < 3:
logger.info(f"[WORKFLOW] Reflexion Loop triggered (Attempt {state.get('reflexion_steps')})")
return "synthesize"
return END
def create_research_graph() -> StateGraph:
workflow = StateGraph(AgentState)
workflow.add_node("router", router_node)
workflow.add_node("planner", planner_node)
workflow.add_node("search", search_node)
workflow.add_node("scrape", scrape_node)
workflow.add_node("analyzer", analyzer_node)
workflow.add_node("ranker", ranker_node)
workflow.add_node("synthesize", synthesizer_node)
workflow.add_node("verifier", verifier_node)
# Entry point is now the router
workflow.set_entry_point("router")
# Conditional routing after the router
workflow.add_conditional_edges("router", route_query)
workflow.add_edge("planner", "search")
workflow.add_conditional_edges("search", should_skip_to_synthesize)
workflow.add_conditional_edges("scrape", should_skip_to_synthesize_after_scrape)
workflow.add_conditional_edges("analyzer", should_retry_or_rank)
workflow.add_edge("ranker", "synthesize")
workflow.add_edge("synthesize", "verifier")
workflow.add_conditional_edges("verifier", check_reflexion)
return workflow.compile()
research_graph = create_research_graph()