""" LangGraph Agent Core - StateGraph Definition Author: @mangubee Date: 2026-01-01 Stage 1: Skeleton with placeholder nodes Stage 2: Tool integration (CURRENT) Stage 3: Planning and reasoning logic implementation Based on: - Level 3: Sequential workflow with dynamic planning - Level 4: Goal-based reasoning, coarse-grained generalist - Level 6: LangGraph framework """ import logging import os from pathlib import Path from typing import TypedDict, List, Optional from langgraph.graph import StateGraph, END from src.config import Settings from src.tools import ( TOOLS, search, parse_file, safe_eval, analyze_image, youtube_transcript, transcribe_audio, ) from src.agent.llm_client import ( plan_question, select_tools_with_function_calling, synthesize_answer, ) # ============================================================================ # Logging Setup # ============================================================================ logger = logging.getLogger(__name__) # ============================================================================ # Helper Functions # ============================================================================ def is_vision_question(question: str) -> bool: """ Detect if question requires vision analysis tool. Vision questions typically contain keywords about visual content like images, videos, or YouTube links. Args: question: GAIA question text Returns: True if question likely requires vision tool, False otherwise """ vision_keywords = [ "image", "video", "youtube", "photo", "picture", "watch", "screenshot", "visual", ] return any(keyword in question.lower() for keyword in vision_keywords) # ============================================================================ # Agent State Definition # ============================================================================ class AgentState(TypedDict): """ State structure for GAIA agent workflow. Tracks question processing from input through planning, execution, to final answer. """ question: str # Input question from GAIA file_paths: Optional[List[str]] # Optional file paths for file-based questions plan: Optional[str] # Generated execution plan (Stage 3) tool_calls: List[dict] # Tool invocation tracking (Stage 3) tool_results: List[dict] # Tool execution results (Stage 3) evidence: List[str] # Evidence collected from tools (Stage 3) answer: Optional[str] # Final factoid answer errors: List[str] # Error messages from failures # ============================================================================ # Environment Validation # ============================================================================ def validate_environment() -> List[str]: """ Check which API keys are available at startup. Returns: List of missing API key names (empty if all present) """ missing = [] if not os.getenv("GOOGLE_API_KEY"): missing.append("GOOGLE_API_KEY (Gemini)") if not os.getenv("HF_TOKEN"): missing.append("HF_TOKEN (HuggingFace)") if not os.getenv("ANTHROPIC_API_KEY"): missing.append("ANTHROPIC_API_KEY (Claude)") if not os.getenv("TAVILY_API_KEY"): missing.append("TAVILY_API_KEY (Search)") return missing # ============================================================================ # Helper Functions # ============================================================================ def fallback_tool_selection( question: str, plan: str, file_paths: Optional[List[str]] = None ) -> List[dict]: """ MVP Fallback: Simple keyword-based tool selection when LLM fails. Enhanced to use actual file paths when available. This is a temporary hack to get basic functionality working. Uses simple keyword matching to select tools. Args: question: The user question plan: The execution plan file_paths: Optional list of downloaded file paths Returns: List of tool calls with basic parameters """ logger.info( "[fallback_tool_selection] Using keyword-based fallback for tool selection" ) tool_calls = [] question_lower = question.lower() plan_lower = plan.lower() combined = f"{question_lower} {plan_lower}" # Search tool: keywords like "search", "find", "look up", "who", "what", "when", "where" search_keywords = [ "search", "find", "look up", "who is", "what is", "when", "where", "google", ] if any(keyword in combined for keyword in search_keywords): # Extract search query - use first sentence or full question query = question.split(".")[0] if "." in question else question tool_calls.append({"tool": "web_search", "params": {"query": query}}) logger.info( f"[fallback_tool_selection] Added web_search tool with query: {query}" ) # Math tool: keywords like "calculate", "compute", "+", "-", "*", "/", "=" math_keywords = [ "calculate", "compute", "math", "sum", "multiply", "divide", "+", "-", "*", "/", "=", ] if any(keyword in combined for keyword in math_keywords): # Try to extract expression - look for patterns with numbers and operators import re # Look for mathematical expressions expr_match = re.search(r"[\d\s\+\-\*/\(\)\.]+", question) if expr_match: expression = expr_match.group().strip() tool_calls.append( {"tool": "calculator", "params": {"expression": expression}} ) logger.info( f"[fallback_tool_selection] Added calculator tool with expression: {expression}" ) # File tool: if file_paths available, use them if file_paths: for file_path in file_paths: # Determine file type and appropriate tool file_ext = Path(file_path).suffix.lower() if file_ext in [".png", ".jpg", ".jpeg"]: tool_calls.append( {"tool": "vision", "params": {"image_path": file_path}} ) logger.info( f"[fallback_tool_selection] Added vision tool for image: {file_path}" ) elif file_ext in [ ".pdf", ".xlsx", ".xls", ".csv", ".json", ".txt", ".docx", ".doc", ]: tool_calls.append( {"tool": "parse_file", "params": {"file_path": file_path}} ) logger.info( f"[fallback_tool_selection] Added parse_file tool for: {file_path}" ) else: # Keyword-based file detection (legacy) file_keywords = ["file", "parse", "read", "csv", "json", "txt", "document"] if any(keyword in combined for keyword in file_keywords): logger.warning( "[fallback_tool_selection] File operation detected but no file_paths available" ) # Image tool: keywords like "image", "picture", "photo", "analyze", "vision" image_keywords = ["image", "picture", "photo", "analyze image", "vision"] if any(keyword in combined for keyword in image_keywords): if file_paths: # Already handled above in file_paths check pass else: logger.warning( "[fallback_tool_selection] Image operation detected but no file_paths available" ) if not tool_calls: logger.warning( "[fallback_tool_selection] No tools selected by fallback - adding default search" ) # Default: just search the question tool_calls.append({"tool": "web_search", "params": {"query": question}}) logger.info( f"[fallback_tool_selection] Fallback selected {len(tool_calls)} tool(s)" ) return tool_calls # ============================================================================ # Graph Node Functions (Placeholders for Stage 1) # ============================================================================ def plan_node(state: AgentState) -> AgentState: """ Planning node: Analyze question and generate execution plan. Stage 3: Dynamic planning with LLM - LLM analyzes question and available tools - Generates step-by-step execution plan - Identifies which tools to use and in what order Args: state: Current agent state with question Returns: Updated state with execution plan """ try: plan = plan_question( question=state["question"], available_tools=TOOLS, file_paths=state.get("file_paths"), ) state["plan"] = plan logger.info(f"[plan] ✓ {len(plan)} chars") except Exception as e: logger.error(f"[plan] ✗ {type(e).__name__}: {str(e)}") state["errors"].append(f"Planning error: {type(e).__name__}: {str(e)}") state["plan"] = "Error: Unable to create plan" return state def execute_node(state: AgentState) -> AgentState: """Execution node: Execute tools based on plan.""" # Map tool names to actual functions TOOL_FUNCTIONS = { "web_search": search, "parse_file": parse_file, "calculator": safe_eval, "vision": analyze_image, "youtube_transcript": youtube_transcript, "transcribe_audio": transcribe_audio, } tool_results = [] evidence = [] tool_calls = [] try: tool_calls = select_tools_with_function_calling( question=state["question"], plan=state["plan"], available_tools=TOOLS, file_paths=state.get("file_paths"), ) # Validate tool_calls result if not tool_calls: logger.warning("[execute] No tools selected, using fallback") state["errors"].append("Tool selection returned no tools - using fallback") tool_calls = fallback_tool_selection( state["question"], state["plan"], state.get("file_paths") ) elif not isinstance(tool_calls, list): logger.error(f"[execute] Invalid type: {type(tool_calls)}, using fallback") state["errors"].append( f"Tool selection returned invalid type: {type(tool_calls)}" ) tool_calls = fallback_tool_selection( state["question"], state["plan"], state.get("file_paths") ) else: logger.info(f"[execute] {len(tool_calls)} tool(s) selected") # Execute each tool call for idx, tool_call in enumerate(tool_calls, 1): tool_name = tool_call["tool"] params = tool_call["params"] try: tool_func = TOOL_FUNCTIONS.get(tool_name) if not tool_func: raise ValueError(f"Tool '{tool_name}' not found in TOOL_FUNCTIONS") result = tool_func(**params) logger.info(f"[{idx}/{len(tool_calls)}] {tool_name} ✓") tool_results.append( { "tool": tool_name, "params": params, "result": result, "status": "success", } ) # Extract evidence - handle different result formats if isinstance(result, dict): # Vision tool returns {"answer": "..."} if "answer" in result: evidence.append(result["answer"]) # Search tools return {"results": [...], "source": "...", "query": "..."} elif "results" in result: # Format search results as readable text results_list = result.get("results", []) if results_list: # Take first 3 results and format them formatted = [] for r in results_list[:3]: title = r.get("title", "")[:100] url = r.get("url", "")[:100] snippet = r.get("snippet", "")[:200] formatted.append( f"Title: {title}\nURL: {url}\nSnippet: {snippet}" ) evidence.append("\n\n".join(formatted)) else: evidence.append(str(result)) else: evidence.append(str(result)) elif isinstance(result, str): evidence.append(result) else: evidence.append(str(result)) except Exception as tool_error: logger.error(f"[execute] ✗ {tool_name}: {tool_error}") tool_results.append( { "tool": tool_name, "params": params, "error": str(tool_error), "status": "failed", } ) if tool_name == "vision" and ( "quota" in str(tool_error).lower() or "429" in str(tool_error) ): state["errors"].append(f"Vision failed: LLM quota exhausted") else: state["errors"].append(f"{tool_name}: {type(tool_error).__name__}") logger.info(f"[execute] {len(tool_results)} tools, {len(evidence)} evidence") except Exception as e: logger.error(f"[execute] ✗ {type(e).__name__}: {str(e)}") if is_vision_question(state["question"]) and ( "quota" in str(e).lower() or "429" in str(e) ): state["errors"].append("Vision unavailable (quota exhausted)") else: state["errors"].append(f"Execution error: {type(e).__name__}") # Try fallback if we don't have any tool_calls yet if not tool_calls: try: tool_calls = fallback_tool_selection( state["question"], state.get("plan", ""), state.get("file_paths") ) TOOL_FUNCTIONS = { "web_search": search, "parse_file": parse_file, "calculator": safe_eval, "vision": analyze_image, "youtube_transcript": youtube_transcript, "transcribe_audio": transcribe_audio, } for tool_call in tool_calls: try: tool_name = tool_call["tool"] params = tool_call["params"] tool_func = TOOL_FUNCTIONS.get(tool_name) if tool_func: result = tool_func(**params) tool_results.append( { "tool": tool_name, "params": params, "result": result, "status": "success", } ) if isinstance(result, dict): if "answer" in result: evidence.append(result["answer"]) elif "results" in result: results_list = result.get("results", []) if results_list: formatted = [] for r in results_list[:3]: title = r.get("title", "")[:100] url = r.get("url", "")[:100] snippet = r.get("snippet", "")[:200] formatted.append( f"Title: {title}\nURL: {url}\nSnippet: {snippet}" ) evidence.append("\n\n".join(formatted)) else: evidence.append(str(result)) else: evidence.append(str(result)) elif isinstance(result, str): evidence.append(result) else: evidence.append(str(result)) logger.info(f"[execute] Fallback {tool_name} ✓") except Exception as tool_error: logger.error(f"[execute] Fallback {tool_name} ✗ {tool_error}") except Exception as fallback_error: logger.error(f"[execute] Fallback failed: {fallback_error}") # Always update state, even if there were errors state["tool_calls"] = tool_calls state["tool_results"] = tool_results state["evidence"] = evidence return state def answer_node(state: AgentState) -> AgentState: """Answer synthesis node: Generate final factoid answer from evidence.""" if state["errors"]: logger.warning(f"[answer] Errors: {state['errors']}") try: if not state["evidence"]: error_summary = ( "; ".join(state["errors"]) if state["errors"] else "No errors logged" ) state["answer"] = f"ERROR: No evidence. {error_summary}" logger.error(f"[answer] ✗ No evidence - {error_summary}") return state answer = synthesize_answer( question=state["question"], evidence=state["evidence"] ) state["answer"] = answer logger.info(f"[answer] ✓ {answer}") except Exception as e: logger.error(f"[answer] ✗ {type(e).__name__}: {str(e)}") state["errors"].append(f"Answer synthesis error: {type(e).__name__}: {str(e)}") state["answer"] = ( f"ERROR: Answer synthesis failed - {type(e).__name__}: {str(e)}" ) return state # ============================================================================ # StateGraph Construction # ============================================================================ def create_gaia_graph() -> StateGraph: """ Create LangGraph StateGraph for GAIA agent. Implements sequential workflow (Level 3 decision): question → plan → execute → answer Returns: Compiled StateGraph ready for execution """ settings = Settings() # Initialize StateGraph with AgentState graph = StateGraph(AgentState) # Add nodes (placeholder implementations) graph.add_node("plan", plan_node) graph.add_node("execute", execute_node) graph.add_node("answer", answer_node) # Define sequential workflow edges graph.set_entry_point("plan") graph.add_edge("plan", "execute") graph.add_edge("execute", "answer") graph.add_edge("answer", END) # Compile graph compiled_graph = graph.compile() print("[create_gaia_graph] StateGraph compiled successfully") return compiled_graph # ============================================================================ # Agent Wrapper Class # ============================================================================ class GAIAAgent: """ GAIA Benchmark Agent - Main interface. Wraps LangGraph StateGraph and provides simple call interface. Compatible with existing BasicAgent interface in app.py. """ def __init__(self): """Initialize agent and compile StateGraph.""" print("GAIAAgent initializing...") # Validate environment - check API keys missing_keys = validate_environment() if missing_keys: warning_msg = f"⚠️ WARNING: Missing API keys: {', '.join(missing_keys)}" print(warning_msg) logger.warning(warning_msg) print( " Agent may fail to answer questions. Set keys in environment variables." ) else: print("✓ All API keys present") self.graph = create_gaia_graph() self.last_state = None # Store last execution state for diagnostics print("GAIAAgent initialized successfully") def __call__(self, question: str, file_path: Optional[str] = None) -> str: """ Process question and return answer. Supports optional file attachment for file-based questions. Args: question: GAIA question text file_path: Optional path to downloaded file attachment Returns: Factoid answer string """ print(f"GAIAAgent processing question (first 50 chars): {question[:50]}...") if file_path: print(f"GAIAAgent processing file: {file_path}") # Initialize state initial_state: AgentState = { "question": question, "file_paths": [file_path] if file_path else None, "plan": None, "tool_calls": [], "tool_results": [], "evidence": [], "answer": None, "errors": [], } # Invoke graph final_state = self.graph.invoke(initial_state) # Store state for diagnostics self.last_state = final_state # Extract answer answer = final_state.get("answer", "Error: No answer generated") print(f"GAIAAgent returning answer: {answer}") return answer