| | """ |
| | LangGraph Agent Core - StateGraph Definition |
| | Author: @mangobee |
| | Date: 2026-01-01 |
| | |
| | Stage 1: Skeleton with placeholder nodes |
| | Stage 2: Tool integration (CURRENT) |
| | Stage 3: Planning and reasoning logic implementation |
| | |
| | Based on: |
| | - Level 3: Sequential workflow with dynamic planning |
| | - Level 4: Goal-based reasoning, coarse-grained generalist |
| | - Level 6: LangGraph framework |
| | """ |
| |
|
| | import logging |
| | import os |
| | from typing import TypedDict, List, Optional |
| | from langgraph.graph import StateGraph, END |
| | from src.config import Settings |
| | from src.tools import TOOLS, search, parse_file, safe_eval, analyze_image |
| | from src.agent.llm_client import ( |
| | plan_question, |
| | select_tools_with_function_calling, |
| | synthesize_answer, |
| | ) |
| |
|
| | |
| | |
| | |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | |
| | |
| |
|
| | def is_vision_question(question: str) -> bool: |
| | """ |
| | Detect if question requires vision analysis tool. |
| | |
| | Vision questions typically contain keywords about visual content like images, videos, or YouTube links. |
| | |
| | Args: |
| | question: GAIA question text |
| | |
| | Returns: |
| | True if question likely requires vision tool, False otherwise |
| | """ |
| | vision_keywords = ["image", "video", "youtube", "photo", "picture", "watch", "screenshot", "visual"] |
| | return any(keyword in question.lower() for keyword in vision_keywords) |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | class AgentState(TypedDict): |
| | """ |
| | State structure for GAIA agent workflow. |
| | |
| | Tracks question processing from input through planning, execution, to final answer. |
| | """ |
| |
|
| | question: str |
| | file_paths: Optional[List[str]] |
| | plan: Optional[str] |
| | tool_calls: List[dict] |
| | tool_results: List[dict] |
| | evidence: List[str] |
| | answer: Optional[str] |
| | errors: List[str] |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def validate_environment() -> List[str]: |
| | """ |
| | Check which API keys are available at startup. |
| | |
| | Returns: |
| | List of missing API key names (empty if all present) |
| | """ |
| | missing = [] |
| | if not os.getenv("GOOGLE_API_KEY"): |
| | missing.append("GOOGLE_API_KEY (Gemini)") |
| | if not os.getenv("HF_TOKEN"): |
| | missing.append("HF_TOKEN (HuggingFace)") |
| | if not os.getenv("ANTHROPIC_API_KEY"): |
| | missing.append("ANTHROPIC_API_KEY (Claude)") |
| | if not os.getenv("TAVILY_API_KEY"): |
| | missing.append("TAVILY_API_KEY (Search)") |
| | return missing |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def fallback_tool_selection(question: str, plan: str) -> List[dict]: |
| | """ |
| | MVP Fallback: Simple keyword-based tool selection when LLM fails. |
| | |
| | This is a temporary hack to get basic functionality working. |
| | Uses simple keyword matching to select tools. |
| | |
| | Args: |
| | question: The user question |
| | plan: The execution plan |
| | |
| | Returns: |
| | List of tool calls with basic parameters |
| | """ |
| | logger.info("[fallback_tool_selection] Using keyword-based fallback for tool selection") |
| |
|
| | tool_calls = [] |
| | question_lower = question.lower() |
| | plan_lower = plan.lower() |
| | combined = f"{question_lower} {plan_lower}" |
| |
|
| | |
| | search_keywords = ["search", "find", "look up", "who is", "what is", "when", "where", "google"] |
| | if any(keyword in combined for keyword in search_keywords): |
| | |
| | query = question.split('.')[0] if '.' in question else question |
| | tool_calls.append({ |
| | "tool": "web_search", |
| | "params": {"query": query} |
| | }) |
| | logger.info(f"[fallback_tool_selection] Added web_search tool with query: {query}") |
| |
|
| | |
| | math_keywords = ["calculate", "compute", "math", "sum", "multiply", "divide", "+", "-", "*", "/", "="] |
| | if any(keyword in combined for keyword in math_keywords): |
| | |
| | import re |
| | |
| | expr_match = re.search(r'[\d\s\+\-\*/\(\)\.]+', question) |
| | if expr_match: |
| | expression = expr_match.group().strip() |
| | tool_calls.append({ |
| | "tool": "calculator", |
| | "params": {"expression": expression} |
| | }) |
| | logger.info(f"[fallback_tool_selection] Added calculator tool with expression: {expression}") |
| |
|
| | |
| | file_keywords = ["file", "parse", "read", "csv", "json", "txt", "document"] |
| | if any(keyword in combined for keyword in file_keywords): |
| | |
| | logger.warning("[fallback_tool_selection] File operation detected but cannot extract filename") |
| |
|
| | |
| | image_keywords = ["image", "picture", "photo", "analyze image", "vision"] |
| | if any(keyword in combined for keyword in image_keywords): |
| | |
| | logger.warning("[fallback_tool_selection] Image operation detected but cannot extract image path") |
| |
|
| | if not tool_calls: |
| | logger.warning("[fallback_tool_selection] No tools selected by fallback - adding default search") |
| | |
| | tool_calls.append({ |
| | "tool": "web_search", |
| | "params": {"query": question} |
| | }) |
| |
|
| | logger.info(f"[fallback_tool_selection] Fallback selected {len(tool_calls)} tool(s)") |
| | return tool_calls |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def plan_node(state: AgentState) -> AgentState: |
| | """ |
| | Planning node: Analyze question and generate execution plan. |
| | |
| | Stage 3: Dynamic planning with LLM |
| | - LLM analyzes question and available tools |
| | - Generates step-by-step execution plan |
| | - Identifies which tools to use and in what order |
| | |
| | Args: |
| | state: Current agent state with question |
| | |
| | Returns: |
| | Updated state with execution plan |
| | """ |
| | logger.info(f"[plan_node] ========== PLAN NODE START ==========") |
| | logger.info(f"[plan_node] Question: {state['question']}") |
| | logger.info(f"[plan_node] File paths: {state.get('file_paths')}") |
| | logger.info(f"[plan_node] Available tools: {list(TOOLS.keys())}") |
| |
|
| | try: |
| | |
| | logger.info(f"[plan_node] Calling plan_question() with LLM...") |
| | plan = plan_question( |
| | question=state["question"], |
| | available_tools=TOOLS, |
| | file_paths=state.get("file_paths"), |
| | ) |
| |
|
| | state["plan"] = plan |
| | logger.info(f"[plan_node] ✓ Plan created successfully ({len(plan)} chars)") |
| | logger.debug(f"[plan_node] Plan content: {plan}") |
| |
|
| | except Exception as e: |
| | logger.error(f"[plan_node] ✗ Planning failed: {type(e).__name__}: {str(e)}", exc_info=True) |
| | state["errors"].append(f"Planning error: {type(e).__name__}: {str(e)}") |
| | state["plan"] = "Error: Unable to create plan" |
| |
|
| | logger.info(f"[plan_node] ========== PLAN NODE END ==========") |
| | return state |
| |
|
| |
|
| | def execute_node(state: AgentState) -> AgentState: |
| | """ |
| | Execution node: Execute tools based on plan. |
| | |
| | Stage 3: Dynamic tool selection and execution |
| | - LLM selects tools via function calling |
| | - Extracts parameters from question |
| | - Executes tools and collects results |
| | - Handles errors with retry logic (in tools) |
| | |
| | Args: |
| | state: Current agent state with plan |
| | |
| | Returns: |
| | Updated state with tool execution results and evidence |
| | """ |
| | logger.info(f"[execute_node] ========== EXECUTE NODE START ==========") |
| | logger.info(f"[execute_node] Plan: {state['plan']}") |
| | logger.info(f"[execute_node] Question: {state['question']}") |
| |
|
| | |
| | |
| | TOOL_FUNCTIONS = { |
| | "web_search": search, |
| | "parse_file": parse_file, |
| | "calculator": safe_eval, |
| | "vision": analyze_image, |
| | } |
| |
|
| | |
| | tool_results = [] |
| | evidence = [] |
| | tool_calls = [] |
| |
|
| | try: |
| | |
| | logger.info(f"[execute_node] Calling select_tools_with_function_calling()...") |
| | tool_calls = select_tools_with_function_calling( |
| | question=state["question"], plan=state["plan"], available_tools=TOOLS |
| | ) |
| |
|
| | |
| | if not tool_calls: |
| | logger.warning(f"[execute_node] ⚠ LLM returned empty tool_calls list - using fallback") |
| | state["errors"].append("Tool selection returned no tools - using fallback keyword matching") |
| | |
| | tool_calls = fallback_tool_selection(state["question"], state["plan"]) |
| | logger.info(f"[execute_node] Fallback returned {len(tool_calls)} tool(s)") |
| | elif not isinstance(tool_calls, list): |
| | logger.error(f"[execute_node] ✗ Invalid tool_calls type: {type(tool_calls)} - using fallback") |
| | state["errors"].append(f"Tool selection returned invalid type: {type(tool_calls)} - using fallback") |
| | |
| | tool_calls = fallback_tool_selection(state["question"], state["plan"]) |
| | else: |
| | logger.info(f"[execute_node] ✓ LLM selected {len(tool_calls)} tool(s)") |
| | logger.debug(f"[execute_node] Tool calls: {tool_calls}") |
| |
|
| | |
| | for idx, tool_call in enumerate(tool_calls, 1): |
| | tool_name = tool_call["tool"] |
| | params = tool_call["params"] |
| |
|
| | logger.info(f"[execute_node] --- Tool {idx}/{len(tool_calls)}: {tool_name} ---") |
| | logger.info(f"[execute_node] Parameters: {params}") |
| |
|
| | try: |
| | |
| | tool_func = TOOL_FUNCTIONS.get(tool_name) |
| | if not tool_func: |
| | raise ValueError(f"Tool '{tool_name}' not found in TOOL_FUNCTIONS") |
| |
|
| | |
| | logger.info(f"[execute_node] Executing {tool_name}...") |
| | result = tool_func(**params) |
| | logger.info(f"[execute_node] ✓ {tool_name} completed successfully") |
| | logger.debug(f"[execute_node] Result: {result[:200] if isinstance(result, str) else result}...") |
| |
|
| | |
| | tool_results.append( |
| | { |
| | "tool": tool_name, |
| | "params": params, |
| | "result": result, |
| | "status": "success", |
| | } |
| | ) |
| |
|
| | |
| | evidence.append(f"[{tool_name}] {result}") |
| |
|
| | except Exception as tool_error: |
| | logger.error(f"[execute_node] ✗ Tool {tool_name} failed: {type(tool_error).__name__}: {str(tool_error)}", exc_info=True) |
| | tool_results.append( |
| | { |
| | "tool": tool_name, |
| | "params": params, |
| | "error": str(tool_error), |
| | "status": "failed", |
| | } |
| | ) |
| |
|
| | |
| | if tool_name == "vision" and ("quota" in str(tool_error).lower() or "429" in str(tool_error)): |
| | state["errors"].append(f"Vision analysis failed: LLM quota exhausted. Vision requires multimodal LLM (Gemini/Claude).") |
| | else: |
| | state["errors"].append(f"Tool {tool_name} failed: {type(tool_error).__name__}: {str(tool_error)}") |
| |
|
| | logger.info(f"[execute_node] Summary: {len(tool_results)} tool(s) executed, {len(evidence)} evidence items collected") |
| | logger.debug(f"[execute_node] Evidence: {evidence}") |
| |
|
| | except Exception as e: |
| | logger.error(f"[execute_node] ✗ Execution failed: {type(e).__name__}: {str(e)}", exc_info=True) |
| |
|
| | |
| | if is_vision_question(state["question"]) and ("quota" in str(e).lower() or "429" in str(e)): |
| | logger.warning(f"[execute_node] Vision question detected with quota error - providing graceful skip") |
| | state["errors"].append("Vision analysis unavailable (LLM quota exhausted). Vision questions require multimodal LLMs.") |
| | else: |
| | state["errors"].append(f"Execution error: {type(e).__name__}: {str(e)}") |
| |
|
| | |
| | if not tool_calls: |
| | logger.info(f"[execute_node] Attempting fallback after exception...") |
| | try: |
| | tool_calls = fallback_tool_selection(state["question"], state.get("plan", "")) |
| | logger.info(f"[execute_node] Fallback after exception returned {len(tool_calls)} tool(s)") |
| |
|
| | |
| | |
| | TOOL_FUNCTIONS = { |
| | "web_search": search, |
| | "parse_file": parse_file, |
| | "calculator": safe_eval, |
| | "vision": analyze_image, |
| | } |
| |
|
| | for tool_call in tool_calls: |
| | try: |
| | tool_name = tool_call["tool"] |
| | params = tool_call["params"] |
| | tool_func = TOOL_FUNCTIONS.get(tool_name) |
| | if tool_func: |
| | result = tool_func(**params) |
| | tool_results.append({ |
| | "tool": tool_name, |
| | "params": params, |
| | "result": result, |
| | "status": "success" |
| | }) |
| | evidence.append(f"[{tool_name}] {result}") |
| | logger.info(f"[execute_node] Fallback tool {tool_name} executed successfully") |
| | except Exception as tool_error: |
| | logger.error(f"[execute_node] Fallback tool {tool_name} failed: {tool_error}") |
| | except Exception as fallback_error: |
| | logger.error(f"[execute_node] Fallback also failed: {fallback_error}") |
| |
|
| | |
| | state["tool_calls"] = tool_calls |
| | state["tool_results"] = tool_results |
| | state["evidence"] = evidence |
| |
|
| | logger.info(f"[execute_node] ========== EXECUTE NODE END ==========") |
| | return state |
| |
|
| |
|
| | def answer_node(state: AgentState) -> AgentState: |
| | """ |
| | Answer synthesis node: Generate final factoid answer. |
| | |
| | Stage 3: Synthesize answer from evidence |
| | - LLM analyzes collected evidence |
| | - Resolves conflicts if present |
| | - Generates factoid answer in GAIA format |
| | |
| | Args: |
| | state: Current agent state with evidence from tools |
| | |
| | Returns: |
| | Updated state with final factoid answer |
| | """ |
| | logger.info(f"[answer_node] ========== ANSWER NODE START ==========") |
| | logger.info(f"[answer_node] Evidence items collected: {len(state['evidence'])}") |
| | logger.debug(f"[answer_node] Evidence: {state['evidence']}") |
| | logger.info(f"[answer_node] Errors accumulated: {len(state['errors'])}") |
| | if state["errors"]: |
| | logger.warning(f"[answer_node] Error list: {state['errors']}") |
| |
|
| | try: |
| | |
| | if not state["evidence"]: |
| | logger.warning( |
| | "[answer_node] ✗ No evidence collected, cannot generate answer" |
| | ) |
| | |
| | error_summary = "; ".join(state["errors"]) if state["errors"] else "No errors logged - check API keys and logs" |
| | state["answer"] = f"ERROR: No evidence collected. Details: {error_summary}" |
| | logger.error(f"[answer_node] Returning error answer: {state['answer']}") |
| | return state |
| |
|
| | |
| | logger.info(f"[answer_node] Calling synthesize_answer() with {len(state['evidence'])} evidence items...") |
| | answer = synthesize_answer( |
| | question=state["question"], evidence=state["evidence"] |
| | ) |
| |
|
| | state["answer"] = answer |
| | logger.info(f"[answer_node] ✓ Answer generated successfully: {answer}") |
| |
|
| | except Exception as e: |
| | logger.error(f"[answer_node] ✗ Answer synthesis failed: {type(e).__name__}: {str(e)}", exc_info=True) |
| | state["errors"].append(f"Answer synthesis error: {type(e).__name__}: {str(e)}") |
| | state["answer"] = f"ERROR: Answer synthesis failed - {type(e).__name__}: {str(e)}" |
| |
|
| | logger.info(f"[answer_node] ========== ANSWER NODE END ==========") |
| | return state |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def create_gaia_graph() -> StateGraph: |
| | """ |
| | Create LangGraph StateGraph for GAIA agent. |
| | |
| | Implements sequential workflow (Level 3 decision): |
| | question → plan → execute → answer |
| | |
| | Returns: |
| | Compiled StateGraph ready for execution |
| | """ |
| | settings = Settings() |
| |
|
| | |
| | graph = StateGraph(AgentState) |
| |
|
| | |
| | graph.add_node("plan", plan_node) |
| | graph.add_node("execute", execute_node) |
| | graph.add_node("answer", answer_node) |
| |
|
| | |
| | graph.set_entry_point("plan") |
| | graph.add_edge("plan", "execute") |
| | graph.add_edge("execute", "answer") |
| | graph.add_edge("answer", END) |
| |
|
| | |
| | compiled_graph = graph.compile() |
| |
|
| | print("[create_gaia_graph] StateGraph compiled successfully") |
| | return compiled_graph |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | class GAIAAgent: |
| | """ |
| | GAIA Benchmark Agent - Main interface. |
| | |
| | Wraps LangGraph StateGraph and provides simple call interface. |
| | Compatible with existing BasicAgent interface in app.py. |
| | """ |
| |
|
| | def __init__(self): |
| | """Initialize agent and compile StateGraph.""" |
| | print("GAIAAgent initializing...") |
| |
|
| | |
| | missing_keys = validate_environment() |
| | if missing_keys: |
| | warning_msg = f"⚠️ WARNING: Missing API keys: {', '.join(missing_keys)}" |
| | print(warning_msg) |
| | logger.warning(warning_msg) |
| | print(" Agent may fail to answer questions. Set keys in environment variables.") |
| | else: |
| | print("✓ All API keys present") |
| |
|
| | self.graph = create_gaia_graph() |
| | self.last_state = None |
| | print("GAIAAgent initialized successfully") |
| |
|
| | def __call__(self, question: str) -> str: |
| | """ |
| | Process question and return answer. |
| | |
| | Args: |
| | question: GAIA question text |
| | |
| | Returns: |
| | Factoid answer string |
| | """ |
| | print(f"GAIAAgent processing question (first 50 chars): {question[:50]}...") |
| |
|
| | |
| | initial_state: AgentState = { |
| | "question": question, |
| | "file_paths": None, |
| | "plan": None, |
| | "tool_calls": [], |
| | "tool_results": [], |
| | "evidence": [], |
| | "answer": None, |
| | "errors": [], |
| | } |
| |
|
| | |
| | final_state = self.graph.invoke(initial_state) |
| |
|
| | |
| | self.last_state = final_state |
| |
|
| | |
| | answer = final_state.get("answer", "Error: No answer generated") |
| | print(f"GAIAAgent returning answer: {answer}") |
| |
|
| | return answer |
| |
|