agentbee / src /agent /graph.py
mangubee's picture
fix: correct author name formatting in multiple files
e7b4937
"""
LangGraph Agent Core - StateGraph Definition
Author: @mangubee
Date: 2026-01-01
Stage 1: Skeleton with placeholder nodes
Stage 2: Tool integration (CURRENT)
Stage 3: Planning and reasoning logic implementation
Based on:
- Level 3: Sequential workflow with dynamic planning
- Level 4: Goal-based reasoning, coarse-grained generalist
- Level 6: LangGraph framework
"""
import logging
import os
from pathlib import Path
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph, END
from src.config import Settings
from src.tools import (
TOOLS,
search,
parse_file,
safe_eval,
analyze_image,
youtube_transcript,
transcribe_audio,
)
from src.agent.llm_client import (
plan_question,
select_tools_with_function_calling,
synthesize_answer,
)
# ============================================================================
# Logging Setup
# ============================================================================
logger = logging.getLogger(__name__)
# ============================================================================
# Helper Functions
# ============================================================================
def is_vision_question(question: str) -> bool:
"""
Detect if question requires vision analysis tool.
Vision questions typically contain keywords about visual content like images, videos, or YouTube links.
Args:
question: GAIA question text
Returns:
True if question likely requires vision tool, False otherwise
"""
vision_keywords = [
"image",
"video",
"youtube",
"photo",
"picture",
"watch",
"screenshot",
"visual",
]
return any(keyword in question.lower() for keyword in vision_keywords)
# ============================================================================
# Agent State Definition
# ============================================================================
class AgentState(TypedDict):
"""
State structure for GAIA agent workflow.
Tracks question processing from input through planning, execution, to final answer.
"""
question: str # Input question from GAIA
file_paths: Optional[List[str]] # Optional file paths for file-based questions
plan: Optional[str] # Generated execution plan (Stage 3)
tool_calls: List[dict] # Tool invocation tracking (Stage 3)
tool_results: List[dict] # Tool execution results (Stage 3)
evidence: List[str] # Evidence collected from tools (Stage 3)
answer: Optional[str] # Final factoid answer
errors: List[str] # Error messages from failures
# ============================================================================
# Environment Validation
# ============================================================================
def validate_environment() -> List[str]:
"""
Check which API keys are available at startup.
Returns:
List of missing API key names (empty if all present)
"""
missing = []
if not os.getenv("GOOGLE_API_KEY"):
missing.append("GOOGLE_API_KEY (Gemini)")
if not os.getenv("HF_TOKEN"):
missing.append("HF_TOKEN (HuggingFace)")
if not os.getenv("ANTHROPIC_API_KEY"):
missing.append("ANTHROPIC_API_KEY (Claude)")
if not os.getenv("TAVILY_API_KEY"):
missing.append("TAVILY_API_KEY (Search)")
return missing
# ============================================================================
# Helper Functions
# ============================================================================
def fallback_tool_selection(
question: str, plan: str, file_paths: Optional[List[str]] = None
) -> List[dict]:
"""
MVP Fallback: Simple keyword-based tool selection when LLM fails.
Enhanced to use actual file paths when available.
This is a temporary hack to get basic functionality working.
Uses simple keyword matching to select tools.
Args:
question: The user question
plan: The execution plan
file_paths: Optional list of downloaded file paths
Returns:
List of tool calls with basic parameters
"""
logger.info(
"[fallback_tool_selection] Using keyword-based fallback for tool selection"
)
tool_calls = []
question_lower = question.lower()
plan_lower = plan.lower()
combined = f"{question_lower} {plan_lower}"
# Search tool: keywords like "search", "find", "look up", "who", "what", "when", "where"
search_keywords = [
"search",
"find",
"look up",
"who is",
"what is",
"when",
"where",
"google",
]
if any(keyword in combined for keyword in search_keywords):
# Extract search query - use first sentence or full question
query = question.split(".")[0] if "." in question else question
tool_calls.append({"tool": "web_search", "params": {"query": query}})
logger.info(
f"[fallback_tool_selection] Added web_search tool with query: {query}"
)
# Math tool: keywords like "calculate", "compute", "+", "-", "*", "/", "="
math_keywords = [
"calculate",
"compute",
"math",
"sum",
"multiply",
"divide",
"+",
"-",
"*",
"/",
"=",
]
if any(keyword in combined for keyword in math_keywords):
# Try to extract expression - look for patterns with numbers and operators
import re
# Look for mathematical expressions
expr_match = re.search(r"[\d\s\+\-\*/\(\)\.]+", question)
if expr_match:
expression = expr_match.group().strip()
tool_calls.append(
{"tool": "calculator", "params": {"expression": expression}}
)
logger.info(
f"[fallback_tool_selection] Added calculator tool with expression: {expression}"
)
# File tool: if file_paths available, use them
if file_paths:
for file_path in file_paths:
# Determine file type and appropriate tool
file_ext = Path(file_path).suffix.lower()
if file_ext in [".png", ".jpg", ".jpeg"]:
tool_calls.append(
{"tool": "vision", "params": {"image_path": file_path}}
)
logger.info(
f"[fallback_tool_selection] Added vision tool for image: {file_path}"
)
elif file_ext in [
".pdf",
".xlsx",
".xls",
".csv",
".json",
".txt",
".docx",
".doc",
]:
tool_calls.append(
{"tool": "parse_file", "params": {"file_path": file_path}}
)
logger.info(
f"[fallback_tool_selection] Added parse_file tool for: {file_path}"
)
else:
# Keyword-based file detection (legacy)
file_keywords = ["file", "parse", "read", "csv", "json", "txt", "document"]
if any(keyword in combined for keyword in file_keywords):
logger.warning(
"[fallback_tool_selection] File operation detected but no file_paths available"
)
# Image tool: keywords like "image", "picture", "photo", "analyze", "vision"
image_keywords = ["image", "picture", "photo", "analyze image", "vision"]
if any(keyword in combined for keyword in image_keywords):
if file_paths:
# Already handled above in file_paths check
pass
else:
logger.warning(
"[fallback_tool_selection] Image operation detected but no file_paths available"
)
if not tool_calls:
logger.warning(
"[fallback_tool_selection] No tools selected by fallback - adding default search"
)
# Default: just search the question
tool_calls.append({"tool": "web_search", "params": {"query": question}})
logger.info(
f"[fallback_tool_selection] Fallback selected {len(tool_calls)} tool(s)"
)
return tool_calls
# ============================================================================
# Graph Node Functions (Placeholders for Stage 1)
# ============================================================================
def plan_node(state: AgentState) -> AgentState:
"""
Planning node: Analyze question and generate execution plan.
Stage 3: Dynamic planning with LLM
- LLM analyzes question and available tools
- Generates step-by-step execution plan
- Identifies which tools to use and in what order
Args:
state: Current agent state with question
Returns:
Updated state with execution plan
"""
try:
plan = plan_question(
question=state["question"],
available_tools=TOOLS,
file_paths=state.get("file_paths"),
)
state["plan"] = plan
logger.info(f"[plan] ✓ {len(plan)} chars")
except Exception as e:
logger.error(f"[plan] ✗ {type(e).__name__}: {str(e)}")
state["errors"].append(f"Planning error: {type(e).__name__}: {str(e)}")
state["plan"] = "Error: Unable to create plan"
return state
def execute_node(state: AgentState) -> AgentState:
"""Execution node: Execute tools based on plan."""
# Map tool names to actual functions
TOOL_FUNCTIONS = {
"web_search": search,
"parse_file": parse_file,
"calculator": safe_eval,
"vision": analyze_image,
"youtube_transcript": youtube_transcript,
"transcribe_audio": transcribe_audio,
}
tool_results = []
evidence = []
tool_calls = []
try:
tool_calls = select_tools_with_function_calling(
question=state["question"],
plan=state["plan"],
available_tools=TOOLS,
file_paths=state.get("file_paths"),
)
# Validate tool_calls result
if not tool_calls:
logger.warning("[execute] No tools selected, using fallback")
state["errors"].append("Tool selection returned no tools - using fallback")
tool_calls = fallback_tool_selection(
state["question"], state["plan"], state.get("file_paths")
)
elif not isinstance(tool_calls, list):
logger.error(f"[execute] Invalid type: {type(tool_calls)}, using fallback")
state["errors"].append(
f"Tool selection returned invalid type: {type(tool_calls)}"
)
tool_calls = fallback_tool_selection(
state["question"], state["plan"], state.get("file_paths")
)
else:
logger.info(f"[execute] {len(tool_calls)} tool(s) selected")
# Execute each tool call
for idx, tool_call in enumerate(tool_calls, 1):
tool_name = tool_call["tool"]
params = tool_call["params"]
try:
tool_func = TOOL_FUNCTIONS.get(tool_name)
if not tool_func:
raise ValueError(f"Tool '{tool_name}' not found in TOOL_FUNCTIONS")
result = tool_func(**params)
logger.info(f"[{idx}/{len(tool_calls)}] {tool_name} ✓")
tool_results.append(
{
"tool": tool_name,
"params": params,
"result": result,
"status": "success",
}
)
# Extract evidence - handle different result formats
if isinstance(result, dict):
# Vision tool returns {"answer": "..."}
if "answer" in result:
evidence.append(result["answer"])
# Search tools return {"results": [...], "source": "...", "query": "..."}
elif "results" in result:
# Format search results as readable text
results_list = result.get("results", [])
if results_list:
# Take first 3 results and format them
formatted = []
for r in results_list[:3]:
title = r.get("title", "")[:100]
url = r.get("url", "")[:100]
snippet = r.get("snippet", "")[:200]
formatted.append(
f"Title: {title}\nURL: {url}\nSnippet: {snippet}"
)
evidence.append("\n\n".join(formatted))
else:
evidence.append(str(result))
else:
evidence.append(str(result))
elif isinstance(result, str):
evidence.append(result)
else:
evidence.append(str(result))
except Exception as tool_error:
logger.error(f"[execute] ✗ {tool_name}: {tool_error}")
tool_results.append(
{
"tool": tool_name,
"params": params,
"error": str(tool_error),
"status": "failed",
}
)
if tool_name == "vision" and (
"quota" in str(tool_error).lower() or "429" in str(tool_error)
):
state["errors"].append(f"Vision failed: LLM quota exhausted")
else:
state["errors"].append(f"{tool_name}: {type(tool_error).__name__}")
logger.info(f"[execute] {len(tool_results)} tools, {len(evidence)} evidence")
except Exception as e:
logger.error(f"[execute] ✗ {type(e).__name__}: {str(e)}")
if is_vision_question(state["question"]) and (
"quota" in str(e).lower() or "429" in str(e)
):
state["errors"].append("Vision unavailable (quota exhausted)")
else:
state["errors"].append(f"Execution error: {type(e).__name__}")
# Try fallback if we don't have any tool_calls yet
if not tool_calls:
try:
tool_calls = fallback_tool_selection(
state["question"], state.get("plan", ""), state.get("file_paths")
)
TOOL_FUNCTIONS = {
"web_search": search,
"parse_file": parse_file,
"calculator": safe_eval,
"vision": analyze_image,
"youtube_transcript": youtube_transcript,
"transcribe_audio": transcribe_audio,
}
for tool_call in tool_calls:
try:
tool_name = tool_call["tool"]
params = tool_call["params"]
tool_func = TOOL_FUNCTIONS.get(tool_name)
if tool_func:
result = tool_func(**params)
tool_results.append(
{
"tool": tool_name,
"params": params,
"result": result,
"status": "success",
}
)
if isinstance(result, dict):
if "answer" in result:
evidence.append(result["answer"])
elif "results" in result:
results_list = result.get("results", [])
if results_list:
formatted = []
for r in results_list[:3]:
title = r.get("title", "")[:100]
url = r.get("url", "")[:100]
snippet = r.get("snippet", "")[:200]
formatted.append(
f"Title: {title}\nURL: {url}\nSnippet: {snippet}"
)
evidence.append("\n\n".join(formatted))
else:
evidence.append(str(result))
else:
evidence.append(str(result))
elif isinstance(result, str):
evidence.append(result)
else:
evidence.append(str(result))
logger.info(f"[execute] Fallback {tool_name} ✓")
except Exception as tool_error:
logger.error(f"[execute] Fallback {tool_name}{tool_error}")
except Exception as fallback_error:
logger.error(f"[execute] Fallback failed: {fallback_error}")
# Always update state, even if there were errors
state["tool_calls"] = tool_calls
state["tool_results"] = tool_results
state["evidence"] = evidence
return state
def answer_node(state: AgentState) -> AgentState:
"""Answer synthesis node: Generate final factoid answer from evidence."""
if state["errors"]:
logger.warning(f"[answer] Errors: {state['errors']}")
try:
if not state["evidence"]:
error_summary = (
"; ".join(state["errors"]) if state["errors"] else "No errors logged"
)
state["answer"] = f"ERROR: No evidence. {error_summary}"
logger.error(f"[answer] ✗ No evidence - {error_summary}")
return state
answer = synthesize_answer(
question=state["question"], evidence=state["evidence"]
)
state["answer"] = answer
logger.info(f"[answer] ✓ {answer}")
except Exception as e:
logger.error(f"[answer] ✗ {type(e).__name__}: {str(e)}")
state["errors"].append(f"Answer synthesis error: {type(e).__name__}: {str(e)}")
state["answer"] = (
f"ERROR: Answer synthesis failed - {type(e).__name__}: {str(e)}"
)
return state
# ============================================================================
# StateGraph Construction
# ============================================================================
def create_gaia_graph() -> StateGraph:
"""
Create LangGraph StateGraph for GAIA agent.
Implements sequential workflow (Level 3 decision):
question → plan → execute → answer
Returns:
Compiled StateGraph ready for execution
"""
settings = Settings()
# Initialize StateGraph with AgentState
graph = StateGraph(AgentState)
# Add nodes (placeholder implementations)
graph.add_node("plan", plan_node)
graph.add_node("execute", execute_node)
graph.add_node("answer", answer_node)
# Define sequential workflow edges
graph.set_entry_point("plan")
graph.add_edge("plan", "execute")
graph.add_edge("execute", "answer")
graph.add_edge("answer", END)
# Compile graph
compiled_graph = graph.compile()
print("[create_gaia_graph] StateGraph compiled successfully")
return compiled_graph
# ============================================================================
# Agent Wrapper Class
# ============================================================================
class GAIAAgent:
"""
GAIA Benchmark Agent - Main interface.
Wraps LangGraph StateGraph and provides simple call interface.
Compatible with existing BasicAgent interface in app.py.
"""
def __init__(self):
"""Initialize agent and compile StateGraph."""
print("GAIAAgent initializing...")
# Validate environment - check API keys
missing_keys = validate_environment()
if missing_keys:
warning_msg = f"⚠️ WARNING: Missing API keys: {', '.join(missing_keys)}"
print(warning_msg)
logger.warning(warning_msg)
print(
" Agent may fail to answer questions. Set keys in environment variables."
)
else:
print("✓ All API keys present")
self.graph = create_gaia_graph()
self.last_state = None # Store last execution state for diagnostics
print("GAIAAgent initialized successfully")
def __call__(self, question: str, file_path: Optional[str] = None) -> str:
"""
Process question and return answer.
Supports optional file attachment for file-based questions.
Args:
question: GAIA question text
file_path: Optional path to downloaded file attachment
Returns:
Factoid answer string
"""
print(f"GAIAAgent processing question (first 50 chars): {question[:50]}...")
if file_path:
print(f"GAIAAgent processing file: {file_path}")
# Initialize state
initial_state: AgentState = {
"question": question,
"file_paths": [file_path] if file_path else None,
"plan": None,
"tool_calls": [],
"tool_results": [],
"evidence": [],
"answer": None,
"errors": [],
}
# Invoke graph
final_state = self.graph.invoke(initial_state)
# Store state for diagnostics
self.last_state = final_state
# Extract answer
answer = final_state.get("answer", "Error: No answer generated")
print(f"GAIAAgent returning answer: {answer}")
return answer