| | |
| | """ |
| | GAIA Agent LangGraph Workflow |
| | Main orchestration workflow for the GAIA benchmark agent system |
| | """ |
| |
|
| | import logging |
| | from typing import Dict, Any, List, Literal |
| | from langgraph.graph import StateGraph, END |
| | from langgraph.checkpoint.memory import MemorySaver |
| |
|
| | from agents.state import GAIAAgentState, AgentRole, QuestionType |
| | from agents.router import RouterAgent |
| | from agents.web_researcher import WebResearchAgent |
| | from agents.file_processor_agent import FileProcessorAgent |
| | from agents.reasoning_agent import ReasoningAgent |
| | from agents.synthesizer import SynthesizerAgent |
| | from models.qwen_client import QwenClient |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class GAIAWorkflow: |
| | """ |
| | Main GAIA agent workflow using LangGraph |
| | Orchestrates router β specialized agents β synthesizer pipeline |
| | """ |
| | |
| | def __init__(self, llm_client: QwenClient): |
| | self.llm_client = llm_client |
| | |
| | |
| | self.router = RouterAgent(llm_client) |
| | self.web_researcher = WebResearchAgent(llm_client) |
| | self.file_processor = FileProcessorAgent(llm_client) |
| | self.reasoning_agent = ReasoningAgent(llm_client) |
| | self.synthesizer = SynthesizerAgent(llm_client) |
| | |
| | |
| | self.workflow = self._create_workflow() |
| | |
| | |
| | self.app = self.workflow.compile(checkpointer=MemorySaver()) |
| | |
| | def _create_workflow(self) -> StateGraph: |
| | """Create the LangGraph workflow""" |
| | |
| | |
| | workflow = StateGraph(GAIAAgentState) |
| | |
| | |
| | workflow.add_node("router", self._router_node) |
| | workflow.add_node("web_researcher", self._web_researcher_node) |
| | workflow.add_node("file_processor", self._file_processor_node) |
| | workflow.add_node("reasoning_agent", self._reasoning_agent_node) |
| | workflow.add_node("synthesizer", self._synthesizer_node) |
| | |
| | |
| | workflow.set_entry_point("router") |
| | |
| | |
| | workflow.add_conditional_edges( |
| | "router", |
| | self._route_to_agents, |
| | { |
| | "web_researcher": "web_researcher", |
| | "file_processor": "file_processor", |
| | "reasoning_agent": "reasoning_agent", |
| | "multi_agent": "web_researcher", |
| | "synthesizer": "synthesizer" |
| | } |
| | ) |
| | |
| | |
| | workflow.add_edge("web_researcher", "synthesizer") |
| | workflow.add_edge("file_processor", "synthesizer") |
| | workflow.add_edge("reasoning_agent", "synthesizer") |
| | |
| | |
| | workflow.add_conditional_edges( |
| | "synthesizer", |
| | self._check_if_complete, |
| | { |
| | "complete": END, |
| | "need_more_agents": "file_processor" |
| | } |
| | ) |
| | |
| | return workflow |
| | |
| | def _router_node(self, state: GAIAAgentState) -> GAIAAgentState: |
| | """Router node - classifies question and selects agents""" |
| | logger.info("π§ Executing router node") |
| | return self.router.route_question(state) |
| | |
| | def _web_researcher_node(self, state: GAIAAgentState) -> GAIAAgentState: |
| | """Web researcher node""" |
| | logger.info("π Executing web researcher node") |
| | return self.web_researcher.process(state) |
| | |
| | def _file_processor_node(self, state: GAIAAgentState) -> GAIAAgentState: |
| | """File processor node""" |
| | logger.info("π Executing file processor node") |
| | return self.file_processor.process(state) |
| | |
| | def _reasoning_agent_node(self, state: GAIAAgentState) -> GAIAAgentState: |
| | """Reasoning agent node""" |
| | logger.info("π§ Executing reasoning agent node") |
| | return self.reasoning_agent.process(state) |
| | |
| | def _synthesizer_node(self, state: GAIAAgentState) -> GAIAAgentState: |
| | """Synthesizer node - combines agent results""" |
| | logger.info("π Executing synthesizer node") |
| | return self.synthesizer.process(state) |
| | |
| | def _route_to_agents(self, state: GAIAAgentState) -> str: |
| | """Determine which agent(s) to route to based on router decision""" |
| | |
| | selected_agents = state.selected_agents |
| | |
| | |
| | agent_roles = [agent for agent in selected_agents if agent != AgentRole.SYNTHESIZER] |
| | |
| | if not agent_roles: |
| | |
| | return "synthesizer" |
| | elif len(agent_roles) == 1: |
| | |
| | agent = agent_roles[0] |
| | if agent == AgentRole.WEB_RESEARCHER: |
| | return "web_researcher" |
| | elif agent == AgentRole.FILE_PROCESSOR: |
| | return "file_processor" |
| | elif agent == AgentRole.REASONING_AGENT: |
| | return "reasoning_agent" |
| | else: |
| | return "synthesizer" |
| | else: |
| | |
| | |
| | return "multi_agent" |
| | |
| | def _check_if_complete(self, state: GAIAAgentState) -> str: |
| | """Check if processing is complete or if more agents are needed""" |
| | |
| | |
| | if state.is_complete: |
| | return "complete" |
| | |
| | |
| | selected_agents = state.selected_agents |
| | executed_agents = set(state.agent_results.keys()) |
| | |
| | |
| | remaining_agents = [ |
| | agent for agent in selected_agents |
| | if agent not in executed_agents and agent != AgentRole.SYNTHESIZER |
| | ] |
| | |
| | if remaining_agents: |
| | |
| | next_agent = remaining_agents[0] |
| | if next_agent == AgentRole.FILE_PROCESSOR: |
| | return "need_more_agents" |
| | elif next_agent == AgentRole.REASONING_AGENT: |
| | return "need_more_agents" |
| | else: |
| | return "complete" |
| | else: |
| | return "complete" |
| | |
| | def process_question(self, question: str, file_path: str = None, file_name: str = None, |
| | task_id: str = None, difficulty_level: int = 1) -> GAIAAgentState: |
| | """ |
| | Process a GAIA question through the complete workflow |
| | |
| | Args: |
| | question: The question to process |
| | file_path: Optional path to associated file |
| | file_name: Optional name of associated file |
| | task_id: Optional task identifier |
| | difficulty_level: Question difficulty (1-3) |
| | |
| | Returns: |
| | GAIAAgentState with final results |
| | """ |
| | |
| | logger.info(f"π Processing question: {question[:100]}...") |
| | |
| | |
| | initial_state = GAIAAgentState() |
| | initial_state.task_id = task_id or f"workflow_{hash(question) % 10000}" |
| | initial_state.question = question |
| | initial_state.file_path = file_path |
| | initial_state.file_name = file_name |
| | initial_state.difficulty_level = difficulty_level |
| | |
| | try: |
| | |
| | final_state = self.app.invoke( |
| | initial_state, |
| | config={"configurable": {"thread_id": initial_state.task_id}} |
| | ) |
| | |
| | logger.info(f"β
Workflow complete: {final_state.final_answer[:100]}...") |
| | return final_state |
| | |
| | except Exception as e: |
| | error_msg = f"Workflow execution failed: {str(e)}" |
| | logger.error(error_msg) |
| | |
| | |
| | initial_state.add_error(error_msg) |
| | initial_state.final_answer = "Workflow execution failed" |
| | initial_state.final_confidence = 0.0 |
| | initial_state.final_reasoning = error_msg |
| | initial_state.is_complete = True |
| | initial_state.requires_human_review = True |
| | |
| | return initial_state |
| | |
| | def get_workflow_visualization(self) -> str: |
| | """Get a text representation of the workflow""" |
| | return """ |
| | GAIA Agent Workflow: |
| | |
| | βββββββββββββββ |
| | β Router β β Entry Point |
| | ββββββββ¬βββββββ |
| | β |
| | ββ Web Researcher βββ |
| | ββ File Processor βββ€ |
| | ββ Reasoning Agent ββ€ |
| | β β |
| | βΌ βΌ |
| | βββββββββββββββ ββββββββββββββββ |
| | β Synthesizer β ββββ€ Agent Results β |
| | ββββββββ¬βββββββ ββββββββββββββββ |
| | β |
| | βΌ |
| | βββββββββββββββ |
| | β END β |
| | βββββββββββββββ |
| | |
| | Flow: |
| | 1. Router classifies question and selects appropriate agent(s) |
| | 2. Selected agents process question in parallel/sequence |
| | 3. Synthesizer combines results into final answer |
| | 4. Workflow completes with final state |
| | """ |
| |
|
| | |
| | class SimpleGAIAWorkflow: |
| | """ |
| | Simplified workflow that doesn't require LangGraph for basic cases |
| | Useful for testing and lightweight deployments |
| | """ |
| | |
| | def __init__(self, llm_client: QwenClient): |
| | self.llm_client = llm_client |
| | self.router = RouterAgent(llm_client) |
| | self.web_researcher = WebResearchAgent(llm_client) |
| | self.file_processor = FileProcessorAgent(llm_client) |
| | self.reasoning_agent = ReasoningAgent(llm_client) |
| | self.synthesizer = SynthesizerAgent(llm_client) |
| | |
| | def process_question(self, question: str, file_path: str = None, file_name: str = None, |
| | task_id: str = None, difficulty_level: int = 1) -> GAIAAgentState: |
| | """Process question with simplified sequential workflow""" |
| | |
| | |
| | state = GAIAAgentState() |
| | state.task_id = task_id or f"simple_{hash(question) % 10000}" |
| | state.question = question |
| | state.file_path = file_path |
| | state.file_name = file_name |
| | state.difficulty_level = difficulty_level |
| | |
| | try: |
| | |
| | state = self.router.route_question(state) |
| | |
| | |
| | for agent_role in state.selected_agents: |
| | if agent_role == AgentRole.WEB_RESEARCHER: |
| | state = self.web_researcher.process(state) |
| | elif agent_role == AgentRole.FILE_PROCESSOR: |
| | state = self.file_processor.process(state) |
| | elif agent_role == AgentRole.REASONING_AGENT: |
| | state = self.reasoning_agent.process(state) |
| | |
| | |
| | |
| | state = self.synthesizer.process(state) |
| | |
| | return state |
| | |
| | except Exception as e: |
| | error_msg = f"Simple workflow failed: {str(e)}" |
| | state.add_error(error_msg) |
| | state.final_answer = "Processing failed" |
| | state.final_confidence = 0.0 |
| | state.final_reasoning = error_msg |
| | state.is_complete = True |
| | return state |