|
|
""" |
|
|
LangGraph Agent Core - StateGraph Definition |
|
|
Author: @mangubee |
|
|
Date: 2026-01-01 |
|
|
|
|
|
Stage 1: Skeleton with placeholder nodes |
|
|
Stage 2: Tool integration (CURRENT) |
|
|
Stage 3: Planning and reasoning logic implementation |
|
|
|
|
|
Based on: |
|
|
- Level 3: Sequential workflow with dynamic planning |
|
|
- Level 4: Goal-based reasoning, coarse-grained generalist |
|
|
- Level 6: LangGraph framework |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import os |
|
|
from pathlib import Path |
|
|
from typing import TypedDict, List, Optional |
|
|
from langgraph.graph import StateGraph, END |
|
|
from src.config import Settings |
|
|
from src.tools import ( |
|
|
TOOLS, |
|
|
search, |
|
|
parse_file, |
|
|
safe_eval, |
|
|
analyze_image, |
|
|
youtube_transcript, |
|
|
transcribe_audio, |
|
|
) |
|
|
from src.agent.llm_client import ( |
|
|
plan_question, |
|
|
select_tools_with_function_calling, |
|
|
synthesize_answer, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_vision_question(question: str) -> bool: |
|
|
""" |
|
|
Detect if question requires vision analysis tool. |
|
|
|
|
|
Vision questions typically contain keywords about visual content like images, videos, or YouTube links. |
|
|
|
|
|
Args: |
|
|
question: GAIA question text |
|
|
|
|
|
Returns: |
|
|
True if question likely requires vision tool, False otherwise |
|
|
""" |
|
|
vision_keywords = [ |
|
|
"image", |
|
|
"video", |
|
|
"youtube", |
|
|
"photo", |
|
|
"picture", |
|
|
"watch", |
|
|
"screenshot", |
|
|
"visual", |
|
|
] |
|
|
return any(keyword in question.lower() for keyword in vision_keywords) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AgentState(TypedDict): |
|
|
""" |
|
|
State structure for GAIA agent workflow. |
|
|
|
|
|
Tracks question processing from input through planning, execution, to final answer. |
|
|
""" |
|
|
|
|
|
question: str |
|
|
file_paths: Optional[List[str]] |
|
|
plan: Optional[str] |
|
|
tool_calls: List[dict] |
|
|
tool_results: List[dict] |
|
|
evidence: List[str] |
|
|
answer: Optional[str] |
|
|
errors: List[str] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_environment() -> List[str]: |
|
|
""" |
|
|
Check which API keys are available at startup. |
|
|
|
|
|
Returns: |
|
|
List of missing API key names (empty if all present) |
|
|
""" |
|
|
missing = [] |
|
|
if not os.getenv("GOOGLE_API_KEY"): |
|
|
missing.append("GOOGLE_API_KEY (Gemini)") |
|
|
if not os.getenv("HF_TOKEN"): |
|
|
missing.append("HF_TOKEN (HuggingFace)") |
|
|
if not os.getenv("ANTHROPIC_API_KEY"): |
|
|
missing.append("ANTHROPIC_API_KEY (Claude)") |
|
|
if not os.getenv("TAVILY_API_KEY"): |
|
|
missing.append("TAVILY_API_KEY (Search)") |
|
|
return missing |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fallback_tool_selection( |
|
|
question: str, plan: str, file_paths: Optional[List[str]] = None |
|
|
) -> List[dict]: |
|
|
""" |
|
|
MVP Fallback: Simple keyword-based tool selection when LLM fails. |
|
|
Enhanced to use actual file paths when available. |
|
|
|
|
|
This is a temporary hack to get basic functionality working. |
|
|
Uses simple keyword matching to select tools. |
|
|
|
|
|
Args: |
|
|
question: The user question |
|
|
plan: The execution plan |
|
|
file_paths: Optional list of downloaded file paths |
|
|
|
|
|
Returns: |
|
|
List of tool calls with basic parameters |
|
|
""" |
|
|
logger.info( |
|
|
"[fallback_tool_selection] Using keyword-based fallback for tool selection" |
|
|
) |
|
|
|
|
|
tool_calls = [] |
|
|
question_lower = question.lower() |
|
|
plan_lower = plan.lower() |
|
|
combined = f"{question_lower} {plan_lower}" |
|
|
|
|
|
|
|
|
search_keywords = [ |
|
|
"search", |
|
|
"find", |
|
|
"look up", |
|
|
"who is", |
|
|
"what is", |
|
|
"when", |
|
|
"where", |
|
|
"google", |
|
|
] |
|
|
if any(keyword in combined for keyword in search_keywords): |
|
|
|
|
|
query = question.split(".")[0] if "." in question else question |
|
|
tool_calls.append({"tool": "web_search", "params": {"query": query}}) |
|
|
logger.info( |
|
|
f"[fallback_tool_selection] Added web_search tool with query: {query}" |
|
|
) |
|
|
|
|
|
|
|
|
math_keywords = [ |
|
|
"calculate", |
|
|
"compute", |
|
|
"math", |
|
|
"sum", |
|
|
"multiply", |
|
|
"divide", |
|
|
"+", |
|
|
"-", |
|
|
"*", |
|
|
"/", |
|
|
"=", |
|
|
] |
|
|
if any(keyword in combined for keyword in math_keywords): |
|
|
|
|
|
import re |
|
|
|
|
|
|
|
|
expr_match = re.search(r"[\d\s\+\-\*/\(\)\.]+", question) |
|
|
if expr_match: |
|
|
expression = expr_match.group().strip() |
|
|
tool_calls.append( |
|
|
{"tool": "calculator", "params": {"expression": expression}} |
|
|
) |
|
|
logger.info( |
|
|
f"[fallback_tool_selection] Added calculator tool with expression: {expression}" |
|
|
) |
|
|
|
|
|
|
|
|
if file_paths: |
|
|
for file_path in file_paths: |
|
|
|
|
|
file_ext = Path(file_path).suffix.lower() |
|
|
if file_ext in [".png", ".jpg", ".jpeg"]: |
|
|
tool_calls.append( |
|
|
{"tool": "vision", "params": {"image_path": file_path}} |
|
|
) |
|
|
logger.info( |
|
|
f"[fallback_tool_selection] Added vision tool for image: {file_path}" |
|
|
) |
|
|
elif file_ext in [ |
|
|
".pdf", |
|
|
".xlsx", |
|
|
".xls", |
|
|
".csv", |
|
|
".json", |
|
|
".txt", |
|
|
".docx", |
|
|
".doc", |
|
|
]: |
|
|
tool_calls.append( |
|
|
{"tool": "parse_file", "params": {"file_path": file_path}} |
|
|
) |
|
|
logger.info( |
|
|
f"[fallback_tool_selection] Added parse_file tool for: {file_path}" |
|
|
) |
|
|
else: |
|
|
|
|
|
file_keywords = ["file", "parse", "read", "csv", "json", "txt", "document"] |
|
|
if any(keyword in combined for keyword in file_keywords): |
|
|
logger.warning( |
|
|
"[fallback_tool_selection] File operation detected but no file_paths available" |
|
|
) |
|
|
|
|
|
|
|
|
image_keywords = ["image", "picture", "photo", "analyze image", "vision"] |
|
|
if any(keyword in combined for keyword in image_keywords): |
|
|
if file_paths: |
|
|
|
|
|
pass |
|
|
else: |
|
|
logger.warning( |
|
|
"[fallback_tool_selection] Image operation detected but no file_paths available" |
|
|
) |
|
|
|
|
|
if not tool_calls: |
|
|
logger.warning( |
|
|
"[fallback_tool_selection] No tools selected by fallback - adding default search" |
|
|
) |
|
|
|
|
|
tool_calls.append({"tool": "web_search", "params": {"query": question}}) |
|
|
|
|
|
logger.info( |
|
|
f"[fallback_tool_selection] Fallback selected {len(tool_calls)} tool(s)" |
|
|
) |
|
|
return tool_calls |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def plan_node(state: AgentState) -> AgentState: |
|
|
""" |
|
|
Planning node: Analyze question and generate execution plan. |
|
|
|
|
|
Stage 3: Dynamic planning with LLM |
|
|
- LLM analyzes question and available tools |
|
|
- Generates step-by-step execution plan |
|
|
- Identifies which tools to use and in what order |
|
|
|
|
|
Args: |
|
|
state: Current agent state with question |
|
|
|
|
|
Returns: |
|
|
Updated state with execution plan |
|
|
""" |
|
|
try: |
|
|
plan = plan_question( |
|
|
question=state["question"], |
|
|
available_tools=TOOLS, |
|
|
file_paths=state.get("file_paths"), |
|
|
) |
|
|
state["plan"] = plan |
|
|
logger.info(f"[plan] ✓ {len(plan)} chars") |
|
|
except Exception as e: |
|
|
logger.error(f"[plan] ✗ {type(e).__name__}: {str(e)}") |
|
|
state["errors"].append(f"Planning error: {type(e).__name__}: {str(e)}") |
|
|
state["plan"] = "Error: Unable to create plan" |
|
|
return state |
|
|
|
|
|
|
|
|
def execute_node(state: AgentState) -> AgentState: |
|
|
"""Execution node: Execute tools based on plan.""" |
|
|
|
|
|
TOOL_FUNCTIONS = { |
|
|
"web_search": search, |
|
|
"parse_file": parse_file, |
|
|
"calculator": safe_eval, |
|
|
"vision": analyze_image, |
|
|
"youtube_transcript": youtube_transcript, |
|
|
"transcribe_audio": transcribe_audio, |
|
|
} |
|
|
|
|
|
tool_results = [] |
|
|
evidence = [] |
|
|
tool_calls = [] |
|
|
|
|
|
try: |
|
|
tool_calls = select_tools_with_function_calling( |
|
|
question=state["question"], |
|
|
plan=state["plan"], |
|
|
available_tools=TOOLS, |
|
|
file_paths=state.get("file_paths"), |
|
|
) |
|
|
|
|
|
|
|
|
if not tool_calls: |
|
|
logger.warning("[execute] No tools selected, using fallback") |
|
|
state["errors"].append("Tool selection returned no tools - using fallback") |
|
|
tool_calls = fallback_tool_selection( |
|
|
state["question"], state["plan"], state.get("file_paths") |
|
|
) |
|
|
elif not isinstance(tool_calls, list): |
|
|
logger.error(f"[execute] Invalid type: {type(tool_calls)}, using fallback") |
|
|
state["errors"].append( |
|
|
f"Tool selection returned invalid type: {type(tool_calls)}" |
|
|
) |
|
|
tool_calls = fallback_tool_selection( |
|
|
state["question"], state["plan"], state.get("file_paths") |
|
|
) |
|
|
else: |
|
|
logger.info(f"[execute] {len(tool_calls)} tool(s) selected") |
|
|
|
|
|
|
|
|
for idx, tool_call in enumerate(tool_calls, 1): |
|
|
tool_name = tool_call["tool"] |
|
|
params = tool_call["params"] |
|
|
|
|
|
try: |
|
|
tool_func = TOOL_FUNCTIONS.get(tool_name) |
|
|
if not tool_func: |
|
|
raise ValueError(f"Tool '{tool_name}' not found in TOOL_FUNCTIONS") |
|
|
|
|
|
result = tool_func(**params) |
|
|
logger.info(f"[{idx}/{len(tool_calls)}] {tool_name} ✓") |
|
|
|
|
|
tool_results.append( |
|
|
{ |
|
|
"tool": tool_name, |
|
|
"params": params, |
|
|
"result": result, |
|
|
"status": "success", |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
if isinstance(result, dict): |
|
|
|
|
|
if "answer" in result: |
|
|
evidence.append(result["answer"]) |
|
|
|
|
|
elif "results" in result: |
|
|
|
|
|
results_list = result.get("results", []) |
|
|
if results_list: |
|
|
|
|
|
formatted = [] |
|
|
for r in results_list[:3]: |
|
|
title = r.get("title", "")[:100] |
|
|
url = r.get("url", "")[:100] |
|
|
snippet = r.get("snippet", "")[:200] |
|
|
formatted.append( |
|
|
f"Title: {title}\nURL: {url}\nSnippet: {snippet}" |
|
|
) |
|
|
evidence.append("\n\n".join(formatted)) |
|
|
else: |
|
|
evidence.append(str(result)) |
|
|
else: |
|
|
evidence.append(str(result)) |
|
|
elif isinstance(result, str): |
|
|
evidence.append(result) |
|
|
else: |
|
|
evidence.append(str(result)) |
|
|
|
|
|
except Exception as tool_error: |
|
|
logger.error(f"[execute] ✗ {tool_name}: {tool_error}") |
|
|
tool_results.append( |
|
|
{ |
|
|
"tool": tool_name, |
|
|
"params": params, |
|
|
"error": str(tool_error), |
|
|
"status": "failed", |
|
|
} |
|
|
) |
|
|
if tool_name == "vision" and ( |
|
|
"quota" in str(tool_error).lower() or "429" in str(tool_error) |
|
|
): |
|
|
state["errors"].append(f"Vision failed: LLM quota exhausted") |
|
|
else: |
|
|
state["errors"].append(f"{tool_name}: {type(tool_error).__name__}") |
|
|
|
|
|
logger.info(f"[execute] {len(tool_results)} tools, {len(evidence)} evidence") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[execute] ✗ {type(e).__name__}: {str(e)}") |
|
|
|
|
|
if is_vision_question(state["question"]) and ( |
|
|
"quota" in str(e).lower() or "429" in str(e) |
|
|
): |
|
|
state["errors"].append("Vision unavailable (quota exhausted)") |
|
|
else: |
|
|
state["errors"].append(f"Execution error: {type(e).__name__}") |
|
|
|
|
|
|
|
|
if not tool_calls: |
|
|
try: |
|
|
tool_calls = fallback_tool_selection( |
|
|
state["question"], state.get("plan", ""), state.get("file_paths") |
|
|
) |
|
|
|
|
|
TOOL_FUNCTIONS = { |
|
|
"web_search": search, |
|
|
"parse_file": parse_file, |
|
|
"calculator": safe_eval, |
|
|
"vision": analyze_image, |
|
|
"youtube_transcript": youtube_transcript, |
|
|
"transcribe_audio": transcribe_audio, |
|
|
} |
|
|
|
|
|
for tool_call in tool_calls: |
|
|
try: |
|
|
tool_name = tool_call["tool"] |
|
|
params = tool_call["params"] |
|
|
tool_func = TOOL_FUNCTIONS.get(tool_name) |
|
|
if tool_func: |
|
|
result = tool_func(**params) |
|
|
tool_results.append( |
|
|
{ |
|
|
"tool": tool_name, |
|
|
"params": params, |
|
|
"result": result, |
|
|
"status": "success", |
|
|
} |
|
|
) |
|
|
if isinstance(result, dict): |
|
|
if "answer" in result: |
|
|
evidence.append(result["answer"]) |
|
|
elif "results" in result: |
|
|
results_list = result.get("results", []) |
|
|
if results_list: |
|
|
formatted = [] |
|
|
for r in results_list[:3]: |
|
|
title = r.get("title", "")[:100] |
|
|
url = r.get("url", "")[:100] |
|
|
snippet = r.get("snippet", "")[:200] |
|
|
formatted.append( |
|
|
f"Title: {title}\nURL: {url}\nSnippet: {snippet}" |
|
|
) |
|
|
evidence.append("\n\n".join(formatted)) |
|
|
else: |
|
|
evidence.append(str(result)) |
|
|
else: |
|
|
evidence.append(str(result)) |
|
|
elif isinstance(result, str): |
|
|
evidence.append(result) |
|
|
else: |
|
|
evidence.append(str(result)) |
|
|
logger.info(f"[execute] Fallback {tool_name} ✓") |
|
|
except Exception as tool_error: |
|
|
logger.error(f"[execute] Fallback {tool_name} ✗ {tool_error}") |
|
|
except Exception as fallback_error: |
|
|
logger.error(f"[execute] Fallback failed: {fallback_error}") |
|
|
|
|
|
|
|
|
state["tool_calls"] = tool_calls |
|
|
state["tool_results"] = tool_results |
|
|
state["evidence"] = evidence |
|
|
return state |
|
|
|
|
|
|
|
|
def answer_node(state: AgentState) -> AgentState: |
|
|
"""Answer synthesis node: Generate final factoid answer from evidence.""" |
|
|
if state["errors"]: |
|
|
logger.warning(f"[answer] Errors: {state['errors']}") |
|
|
|
|
|
try: |
|
|
if not state["evidence"]: |
|
|
error_summary = ( |
|
|
"; ".join(state["errors"]) if state["errors"] else "No errors logged" |
|
|
) |
|
|
state["answer"] = f"ERROR: No evidence. {error_summary}" |
|
|
logger.error(f"[answer] ✗ No evidence - {error_summary}") |
|
|
return state |
|
|
|
|
|
answer = synthesize_answer( |
|
|
question=state["question"], evidence=state["evidence"] |
|
|
) |
|
|
state["answer"] = answer |
|
|
logger.info(f"[answer] ✓ {answer}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[answer] ✗ {type(e).__name__}: {str(e)}") |
|
|
state["errors"].append(f"Answer synthesis error: {type(e).__name__}: {str(e)}") |
|
|
state["answer"] = ( |
|
|
f"ERROR: Answer synthesis failed - {type(e).__name__}: {str(e)}" |
|
|
) |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_gaia_graph() -> StateGraph: |
|
|
""" |
|
|
Create LangGraph StateGraph for GAIA agent. |
|
|
|
|
|
Implements sequential workflow (Level 3 decision): |
|
|
question → plan → execute → answer |
|
|
|
|
|
Returns: |
|
|
Compiled StateGraph ready for execution |
|
|
""" |
|
|
settings = Settings() |
|
|
|
|
|
|
|
|
graph = StateGraph(AgentState) |
|
|
|
|
|
|
|
|
graph.add_node("plan", plan_node) |
|
|
graph.add_node("execute", execute_node) |
|
|
graph.add_node("answer", answer_node) |
|
|
|
|
|
|
|
|
graph.set_entry_point("plan") |
|
|
graph.add_edge("plan", "execute") |
|
|
graph.add_edge("execute", "answer") |
|
|
graph.add_edge("answer", END) |
|
|
|
|
|
|
|
|
compiled_graph = graph.compile() |
|
|
|
|
|
print("[create_gaia_graph] StateGraph compiled successfully") |
|
|
return compiled_graph |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GAIAAgent: |
|
|
""" |
|
|
GAIA Benchmark Agent - Main interface. |
|
|
|
|
|
Wraps LangGraph StateGraph and provides simple call interface. |
|
|
Compatible with existing BasicAgent interface in app.py. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize agent and compile StateGraph.""" |
|
|
print("GAIAAgent initializing...") |
|
|
|
|
|
|
|
|
missing_keys = validate_environment() |
|
|
if missing_keys: |
|
|
warning_msg = f"⚠️ WARNING: Missing API keys: {', '.join(missing_keys)}" |
|
|
print(warning_msg) |
|
|
logger.warning(warning_msg) |
|
|
print( |
|
|
" Agent may fail to answer questions. Set keys in environment variables." |
|
|
) |
|
|
else: |
|
|
print("✓ All API keys present") |
|
|
|
|
|
self.graph = create_gaia_graph() |
|
|
self.last_state = None |
|
|
print("GAIAAgent initialized successfully") |
|
|
|
|
|
def __call__(self, question: str, file_path: Optional[str] = None) -> str: |
|
|
""" |
|
|
Process question and return answer. |
|
|
Supports optional file attachment for file-based questions. |
|
|
|
|
|
Args: |
|
|
question: GAIA question text |
|
|
file_path: Optional path to downloaded file attachment |
|
|
|
|
|
Returns: |
|
|
Factoid answer string |
|
|
""" |
|
|
print(f"GAIAAgent processing question (first 50 chars): {question[:50]}...") |
|
|
if file_path: |
|
|
print(f"GAIAAgent processing file: {file_path}") |
|
|
|
|
|
|
|
|
initial_state: AgentState = { |
|
|
"question": question, |
|
|
"file_paths": [file_path] if file_path else None, |
|
|
"plan": None, |
|
|
"tool_calls": [], |
|
|
"tool_results": [], |
|
|
"evidence": [], |
|
|
"answer": None, |
|
|
"errors": [], |
|
|
} |
|
|
|
|
|
|
|
|
final_state = self.graph.invoke(initial_state) |
|
|
|
|
|
|
|
|
self.last_state = final_state |
|
|
|
|
|
|
|
|
answer = final_state.get("answer", "Error: No answer generated") |
|
|
print(f"GAIAAgent returning answer: {answer}") |
|
|
|
|
|
return answer |
|
|
|