|
|
|
|
|
""" |
|
|
GAIA Agent LangGraph Workflow |
|
|
Main orchestration workflow for the GAIA benchmark agent system |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from typing import Dict, Any, List, Literal |
|
|
from langgraph.graph import StateGraph, END |
|
|
from langgraph.checkpoint.memory import MemorySaver |
|
|
|
|
|
from agents.state import GAIAAgentState, AgentRole, QuestionType |
|
|
from agents.router import RouterAgent |
|
|
from agents.web_researcher import WebResearchAgent |
|
|
from agents.file_processor_agent import FileProcessorAgent |
|
|
from agents.reasoning_agent import ReasoningAgent |
|
|
from agents.synthesizer import SynthesizerAgent |
|
|
from models.qwen_client import QwenClient |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class GAIAWorkflow: |
|
|
""" |
|
|
Main GAIA agent workflow using LangGraph |
|
|
Orchestrates router → specialized agents → synthesizer pipeline |
|
|
""" |
|
|
|
|
|
def __init__(self, llm_client: QwenClient): |
|
|
self.llm_client = llm_client |
|
|
|
|
|
|
|
|
self.router = RouterAgent(llm_client) |
|
|
self.web_researcher = WebResearchAgent(llm_client) |
|
|
self.file_processor = FileProcessorAgent(llm_client) |
|
|
self.reasoning_agent = ReasoningAgent(llm_client) |
|
|
self.synthesizer = SynthesizerAgent(llm_client) |
|
|
|
|
|
|
|
|
self.workflow = self._create_workflow() |
|
|
|
|
|
|
|
|
self.app = self.workflow.compile(checkpointer=MemorySaver()) |
|
|
|
|
|
def _create_workflow(self) -> StateGraph: |
|
|
"""Create the LangGraph workflow""" |
|
|
|
|
|
|
|
|
workflow = StateGraph(GAIAAgentState) |
|
|
|
|
|
|
|
|
workflow.add_node("router", self._router_node) |
|
|
workflow.add_node("web_researcher", self._web_researcher_node) |
|
|
workflow.add_node("file_processor", self._file_processor_node) |
|
|
workflow.add_node("reasoning_agent", self._reasoning_agent_node) |
|
|
workflow.add_node("synthesizer", self._synthesizer_node) |
|
|
|
|
|
|
|
|
workflow.set_entry_point("router") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"router", |
|
|
self._route_to_agents, |
|
|
{ |
|
|
"web_researcher": "web_researcher", |
|
|
"file_processor": "file_processor", |
|
|
"reasoning_agent": "reasoning_agent", |
|
|
"multi_agent": "web_researcher", |
|
|
"synthesizer": "synthesizer" |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_edge("web_researcher", "synthesizer") |
|
|
workflow.add_edge("file_processor", "synthesizer") |
|
|
workflow.add_edge("reasoning_agent", "synthesizer") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"synthesizer", |
|
|
self._check_if_complete, |
|
|
{ |
|
|
"complete": END, |
|
|
"need_more_agents": "file_processor" |
|
|
} |
|
|
) |
|
|
|
|
|
return workflow |
|
|
|
|
|
def _router_node(self, state: GAIAAgentState) -> GAIAAgentState: |
|
|
"""Router node - classifies question and selects agents""" |
|
|
logger.info("🧭 Executing router node") |
|
|
return self.router.route_question(state) |
|
|
|
|
|
def _web_researcher_node(self, state: GAIAAgentState) -> GAIAAgentState: |
|
|
"""Web researcher node""" |
|
|
logger.info("🌐 Executing web researcher node") |
|
|
return self.web_researcher.process(state) |
|
|
|
|
|
def _file_processor_node(self, state: GAIAAgentState) -> GAIAAgentState: |
|
|
"""File processor node""" |
|
|
logger.info("📁 Executing file processor node") |
|
|
return self.file_processor.process(state) |
|
|
|
|
|
def _reasoning_agent_node(self, state: GAIAAgentState) -> GAIAAgentState: |
|
|
"""Reasoning agent node""" |
|
|
logger.info("🧠 Executing reasoning agent node") |
|
|
return self.reasoning_agent.process(state) |
|
|
|
|
|
def _synthesizer_node(self, state: GAIAAgentState) -> GAIAAgentState: |
|
|
"""Synthesizer node - combines agent results""" |
|
|
logger.info("🔗 Executing synthesizer node") |
|
|
return self.synthesizer.process(state) |
|
|
|
|
|
def _route_to_agents(self, state: GAIAAgentState) -> str: |
|
|
"""Determine which agent(s) to route to based on router decision""" |
|
|
|
|
|
selected_agents = state.selected_agents |
|
|
|
|
|
|
|
|
agent_roles = [agent for agent in selected_agents if agent != AgentRole.SYNTHESIZER] |
|
|
|
|
|
if not agent_roles: |
|
|
|
|
|
return "synthesizer" |
|
|
elif len(agent_roles) == 1: |
|
|
|
|
|
agent = agent_roles[0] |
|
|
if agent == AgentRole.WEB_RESEARCHER: |
|
|
return "web_researcher" |
|
|
elif agent == AgentRole.FILE_PROCESSOR: |
|
|
return "file_processor" |
|
|
elif agent == AgentRole.REASONING_AGENT: |
|
|
return "reasoning_agent" |
|
|
else: |
|
|
return "synthesizer" |
|
|
else: |
|
|
|
|
|
|
|
|
return "multi_agent" |
|
|
|
|
|
def _check_if_complete(self, state: GAIAAgentState) -> str: |
|
|
"""Check if processing is complete or if more agents are needed""" |
|
|
|
|
|
|
|
|
if state.is_complete: |
|
|
return "complete" |
|
|
|
|
|
|
|
|
selected_agents = state.selected_agents |
|
|
executed_agents = set(result.agent_role for result in state.agent_results) |
|
|
|
|
|
|
|
|
remaining_agents = [ |
|
|
agent for agent in selected_agents |
|
|
if agent not in executed_agents and agent != AgentRole.SYNTHESIZER |
|
|
] |
|
|
|
|
|
if remaining_agents: |
|
|
|
|
|
next_agent = remaining_agents[0] |
|
|
if next_agent == AgentRole.FILE_PROCESSOR: |
|
|
return "need_more_agents" |
|
|
elif next_agent == AgentRole.REASONING_AGENT: |
|
|
return "need_more_agents" |
|
|
else: |
|
|
return "complete" |
|
|
else: |
|
|
return "complete" |
|
|
|
|
|
def process_question(self, question: str, file_path: str = None, file_name: str = None, |
|
|
task_id: str = None, difficulty_level: int = 1) -> GAIAAgentState: |
|
|
""" |
|
|
Process a GAIA question through the complete workflow |
|
|
|
|
|
Args: |
|
|
question: The question to process |
|
|
file_path: Optional path to associated file |
|
|
file_name: Optional name of associated file |
|
|
task_id: Optional task identifier |
|
|
difficulty_level: Question difficulty (1-3) |
|
|
|
|
|
Returns: |
|
|
GAIAAgentState with final results |
|
|
""" |
|
|
|
|
|
logger.info(f"🚀 Processing question: {question[:100]}...") |
|
|
|
|
|
|
|
|
initial_state = GAIAAgentState( |
|
|
question=question, |
|
|
question_id=task_id or f"workflow_{hash(question) % 10000}", |
|
|
file_name=file_name, |
|
|
file_content=None |
|
|
) |
|
|
initial_state.file_path = file_path |
|
|
initial_state.difficulty_level = difficulty_level |
|
|
|
|
|
try: |
|
|
|
|
|
final_state = self.app.invoke( |
|
|
initial_state, |
|
|
config={"configurable": {"thread_id": initial_state.task_id}} |
|
|
) |
|
|
|
|
|
logger.info(f"✅ Workflow complete: {final_state.final_answer[:100]}...") |
|
|
return final_state |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Workflow execution failed: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
|
|
|
|
|
|
initial_state.add_error(error_msg) |
|
|
initial_state.final_answer = "Workflow execution failed" |
|
|
initial_state.final_confidence = 0.0 |
|
|
initial_state.final_reasoning = error_msg |
|
|
initial_state.is_complete = True |
|
|
initial_state.requires_human_review = True |
|
|
|
|
|
return initial_state |
|
|
|
|
|
def get_workflow_visualization(self) -> str: |
|
|
"""Get a text representation of the workflow""" |
|
|
return """ |
|
|
GAIA Agent Workflow: |
|
|
|
|
|
┌─────────────┐ |
|
|
│ Router │ ← Entry Point |
|
|
└──────┬──────┘ |
|
|
│ |
|
|
├─ Web Researcher ──┐ |
|
|
├─ File Processor ──┤ |
|
|
├─ Reasoning Agent ─┤ |
|
|
│ │ |
|
|
▼ ▼ |
|
|
┌─────────────┐ ┌──────────────┐ |
|
|
│ Synthesizer │ ←──┤ Agent Results │ |
|
|
└──────┬──────┘ └──────────────┘ |
|
|
│ |
|
|
▼ |
|
|
┌─────────────┐ |
|
|
│ END │ |
|
|
└─────────────┘ |
|
|
|
|
|
Flow: |
|
|
1. Router classifies question and selects appropriate agent(s) |
|
|
2. Selected agents process question in parallel/sequence |
|
|
3. Synthesizer combines results into final answer |
|
|
4. Workflow completes with final state |
|
|
""" |
|
|
|
|
|
|
|
|
class SimpleGAIAWorkflow: |
|
|
""" |
|
|
Simplified workflow that doesn't require LangGraph for basic cases |
|
|
Useful for testing and lightweight deployments |
|
|
""" |
|
|
|
|
|
def __init__(self, llm_client: QwenClient): |
|
|
self.llm_client = llm_client |
|
|
self.router = RouterAgent(llm_client) |
|
|
self.web_researcher = WebResearchAgent(llm_client) |
|
|
self.file_processor = FileProcessorAgent(llm_client) |
|
|
self.reasoning_agent = ReasoningAgent(llm_client) |
|
|
self.synthesizer = SynthesizerAgent(llm_client) |
|
|
|
|
|
def process_question(self, question: str, file_path: str = None, file_name: str = None, |
|
|
task_id: str = None, difficulty_level: int = 1) -> GAIAAgentState: |
|
|
"""Process question with simplified sequential workflow""" |
|
|
|
|
|
|
|
|
state = GAIAAgentState( |
|
|
question=question, |
|
|
question_id=task_id or f"simple_{hash(question) % 10000}", |
|
|
file_name=file_name, |
|
|
file_content=None |
|
|
) |
|
|
state.file_path = file_path |
|
|
state.difficulty_level = difficulty_level |
|
|
|
|
|
try: |
|
|
|
|
|
state = self.router.route_question(state) |
|
|
|
|
|
|
|
|
for agent_role in state.selected_agents: |
|
|
if agent_role == AgentRole.WEB_RESEARCHER: |
|
|
state = self.web_researcher.process(state) |
|
|
elif agent_role == AgentRole.FILE_PROCESSOR: |
|
|
state = self.file_processor.process(state) |
|
|
elif agent_role == AgentRole.REASONING_AGENT: |
|
|
state = self.reasoning_agent.process(state) |
|
|
|
|
|
|
|
|
|
|
|
state = self.synthesizer.process(state) |
|
|
|
|
|
return state |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Simple workflow failed: {str(e)}" |
|
|
state.add_error(error_msg) |
|
|
state.final_answer = "Processing failed" |
|
|
state.final_confidence = 0.0 |
|
|
state.final_reasoning = error_msg |
|
|
state.is_complete = True |
|
|
return state |
|
|
|
|
|
def create_gaia_workflow(llm_client, tools_dict): |
|
|
""" |
|
|
Create an enhanced GAIA workflow with multi-phase planning and iterative refinement |
|
|
""" |
|
|
|
|
|
|
|
|
router = RouterAgent(llm_client) |
|
|
web_researcher = WebResearchAgent(llm_client) |
|
|
file_processor = FileProcessorAgent(llm_client) |
|
|
reasoning_agent = ReasoningAgent(llm_client) |
|
|
synthesizer = SynthesizerAgent(llm_client) |
|
|
|
|
|
|
|
|
def router_node(state: GAIAAgentState) -> GAIAAgentState: |
|
|
"""Enhanced router with multi-phase analysis""" |
|
|
logger.info("🧭 Router: Starting multi-phase analysis") |
|
|
return router.process(state) |
|
|
|
|
|
def web_researcher_node(state: GAIAAgentState) -> GAIAAgentState: |
|
|
"""Web researcher with multi-step planning""" |
|
|
logger.info("🌐 Web Researcher: Starting enhanced research") |
|
|
return web_researcher.process(state) |
|
|
|
|
|
def file_processor_node(state: GAIAAgentState) -> GAIAAgentState: |
|
|
"""File processor with step-by-step analysis""" |
|
|
logger.info("📁 File Processor: Starting file analysis") |
|
|
return file_processor.process(state) |
|
|
|
|
|
def reasoning_agent_node(state: GAIAAgentState) -> GAIAAgentState: |
|
|
"""Reasoning agent with systematic approach""" |
|
|
logger.info("🧠 Reasoning Agent: Starting analysis") |
|
|
return reasoning_agent.process(state) |
|
|
|
|
|
def synthesizer_node(state: GAIAAgentState) -> GAIAAgentState: |
|
|
"""Enhanced synthesizer with verification""" |
|
|
logger.info("🎯 Synthesizer: Starting GAIA-compliant synthesis") |
|
|
return synthesizer.process(state) |
|
|
|
|
|
def should_continue_to_next_agent(state: GAIAAgentState) -> str: |
|
|
""" |
|
|
Enhanced routing logic that follows the planned agent sequence |
|
|
""" |
|
|
|
|
|
agent_sequence = getattr(state, 'agent_sequence', []) |
|
|
|
|
|
if not agent_sequence: |
|
|
logger.warning("No agent sequence found, using fallback routing") |
|
|
|
|
|
if not state.agent_results: |
|
|
return "web_researcher" |
|
|
return "synthesizer" |
|
|
|
|
|
|
|
|
executed_count = len(state.agent_results) |
|
|
|
|
|
|
|
|
if executed_count >= len(agent_sequence): |
|
|
return "synthesizer" |
|
|
|
|
|
|
|
|
next_agent = agent_sequence[executed_count] |
|
|
|
|
|
|
|
|
agent_mapping = { |
|
|
'web_researcher': 'web_researcher', |
|
|
'file_processor': 'file_processor', |
|
|
'reasoning_agent': 'reasoning_agent', |
|
|
'synthesizer': 'synthesizer' |
|
|
} |
|
|
|
|
|
return agent_mapping.get(next_agent, 'synthesizer') |
|
|
|
|
|
def check_quality_and_refinement(state: GAIAAgentState) -> str: |
|
|
""" |
|
|
Check if results need refinement before synthesis |
|
|
""" |
|
|
if not state.agent_results: |
|
|
return "synthesizer" |
|
|
|
|
|
|
|
|
avg_confidence = sum(r.confidence for r in state.agent_results) / len(state.agent_results) |
|
|
|
|
|
|
|
|
if avg_confidence < 0.3 and not getattr(state, 'refinement_attempted', False): |
|
|
logger.info(f"Low confidence ({avg_confidence:.2f}), attempting refinement") |
|
|
state.refinement_attempted = True |
|
|
return "refine_approach" |
|
|
|
|
|
return "synthesizer" |
|
|
|
|
|
def refinement_node(state: GAIAAgentState) -> GAIAAgentState: |
|
|
""" |
|
|
Attempt to refine the approach when initial results are poor |
|
|
""" |
|
|
logger.info("🔄 Attempting result refinement") |
|
|
state.add_processing_step("Workflow: Attempting refinement due to low confidence") |
|
|
|
|
|
|
|
|
router_analysis = getattr(state, 'router_analysis', {}) |
|
|
|
|
|
if router_analysis: |
|
|
|
|
|
strategy = router_analysis.get('strategy', {}) |
|
|
fallback_strategies = strategy.get('fallback_needed', True) |
|
|
|
|
|
if fallback_strategies: |
|
|
|
|
|
if not any(r.agent_role == AgentRole.WEB_RESEARCHER for r in state.agent_results): |
|
|
return web_researcher.process(state) |
|
|
|
|
|
elif not any(r.agent_role == AgentRole.REASONING_AGENT for r in state.agent_results): |
|
|
return reasoning_agent.process(state) |
|
|
|
|
|
|
|
|
return reasoning_agent.process(state) |
|
|
|
|
|
|
|
|
workflow = StateGraph(GAIAAgentState) |
|
|
|
|
|
|
|
|
workflow.add_node("router", router_node) |
|
|
workflow.add_node("web_researcher", web_researcher_node) |
|
|
workflow.add_node("file_processor", file_processor_node) |
|
|
workflow.add_node("reasoning_agent", reasoning_agent_node) |
|
|
workflow.add_node("refine_approach", refinement_node) |
|
|
workflow.add_node("synthesizer", synthesizer_node) |
|
|
|
|
|
|
|
|
workflow.set_entry_point("router") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"router", |
|
|
should_continue_to_next_agent, |
|
|
{ |
|
|
"web_researcher": "web_researcher", |
|
|
"file_processor": "file_processor", |
|
|
"reasoning_agent": "reasoning_agent", |
|
|
"synthesizer": "synthesizer" |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"web_researcher", |
|
|
should_continue_to_next_agent, |
|
|
{ |
|
|
"file_processor": "file_processor", |
|
|
"reasoning_agent": "reasoning_agent", |
|
|
"synthesizer": "synthesizer", |
|
|
"refine_approach": "refine_approach" |
|
|
} |
|
|
) |
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"file_processor", |
|
|
should_continue_to_next_agent, |
|
|
{ |
|
|
"web_researcher": "web_researcher", |
|
|
"reasoning_agent": "reasoning_agent", |
|
|
"synthesizer": "synthesizer", |
|
|
"refine_approach": "refine_approach" |
|
|
} |
|
|
) |
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"reasoning_agent", |
|
|
should_continue_to_next_agent, |
|
|
{ |
|
|
"web_researcher": "web_researcher", |
|
|
"file_processor": "file_processor", |
|
|
"synthesizer": "synthesizer", |
|
|
"refine_approach": "refine_approach" |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"refine_approach", |
|
|
check_quality_and_refinement, |
|
|
{ |
|
|
"synthesizer": "synthesizer", |
|
|
"refine_approach": "refine_approach" |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_edge("synthesizer", END) |
|
|
|
|
|
return workflow.compile() |
|
|
|
|
|
def create_simple_workflow(llm_client, tools_dict): |
|
|
""" |
|
|
Enhanced simple workflow with better planning and execution |
|
|
""" |
|
|
|
|
|
router = RouterAgent(llm_client) |
|
|
web_researcher = WebResearchAgent(llm_client) |
|
|
reasoning_agent = ReasoningAgent(llm_client) |
|
|
synthesizer = SynthesizerAgent(llm_client) |
|
|
|
|
|
def process_with_planning(state: GAIAAgentState) -> GAIAAgentState: |
|
|
"""Simple but systematic processing with planning""" |
|
|
|
|
|
logger.info("🚀 Starting simple workflow with enhanced planning") |
|
|
|
|
|
|
|
|
state = router.process(state) |
|
|
|
|
|
|
|
|
agent_sequence = getattr(state, 'agent_sequence', ['web_researcher', 'reasoning_agent']) |
|
|
|
|
|
for agent_name in agent_sequence: |
|
|
if agent_name == 'web_researcher': |
|
|
state = web_researcher.process(state) |
|
|
elif agent_name == 'reasoning_agent': |
|
|
state = reasoning_agent.process(state) |
|
|
elif agent_name == 'synthesizer': |
|
|
break |
|
|
|
|
|
|
|
|
if state.agent_results and state.agent_results[-1].confidence > 0.8: |
|
|
logger.info("High confidence result achieved, proceeding to synthesis") |
|
|
break |
|
|
|
|
|
|
|
|
state = synthesizer.process(state) |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
workflow = StateGraph(GAIAAgentState) |
|
|
workflow.add_node("process", process_with_planning) |
|
|
workflow.set_entry_point("process") |
|
|
workflow.add_edge("process", END) |
|
|
|
|
|
return workflow.compile() |