import os
import base64
import requests
import json
import traceback
import datetime
import subprocess
import tempfile
import time
from typing import TypedDict, List, Dict, Any, Optional, Union
from langchain_core import tools
from langgraph.graph import StateGraph, START, END
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFacePipeline
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
from langchain_community.document_loaders import WikipediaLoader
from ddgs import DDGS
from dotenv import load_dotenv
from groq import Groq
from langchain_groq import ChatGroq
from langchain_community.document_loaders.image import UnstructuredImageLoader
from langchain_community.document_loaders import WebBaseLoader
from langchain_google_genai import ChatGoogleGenerativeAI
try:
import cv2
except ImportError:
cv2 = None
# os.environ["USER_AGENT"] = "gaia-agent/1.0"
whisper_model = None
def get_whisper():
global whisper_model
if whisper_model is None:
import whisper
# Lazy load the smallest, fastest model
whisper_model = whisper.load_model("base")
return whisper_model
load_dotenv(override=True)
# Base Hugging Face LLM used by the chat wrapper
# base_llm = HuggingFaceEndpoint(
# repo_id="openai/gpt-oss-20b:hyperbolic",
# # deepseek-ai/DeepSeek-OCR:novita
# task="text-generation",
# temperature=0.0,
# huggingfacehub_api_token=os.getenv("HUGGINGFACEHUB_API_TOKEN"),
# )
# Model initializations moved to smart_invoke for lazy loading to prevent import errors if keys are missing.
def smart_invoke(msgs, use_tools=False, start_tier=0):
"""
Tiered fallback: OpenRouter -> Gemini -> Groq -> NVIDIA -> Vercel.
Retries next tier if a 429 (rate limit), 402 (credits), or 404 (model found) error occurs.
"""
# Adaptive Gemini names verified via list_models (REST API)
gemini_alternatives = ["gemini-2.5-flash", "gemini-2.0-flash", "gemini-flash-latest", "gemini-pro-latest"]
tiers_config = [
{"name": "Qwen3-Next-80B", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "qwen/qwen3-next-80b-a3b-instruct:free", "base_url": "https://openrouter.ai/api/v1"},
{"name": "Gemma-3-27B", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "google/gemma-3-27b-it:free", "base_url": "https://openrouter.ai/api/v1"},
{"name": "NVIDIA-Nemotron-Super", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "nvidia/nemotron-3-super-120b-a12b:free", "base_url": "https://openrouter.ai/api/v1"},
{"name": "OpenRouter-FreeRouter", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "openrouter/free", "base_url": "https://openrouter.ai/api/v1"},
{"name": "DeepSeek-R1", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "deepseek/deepseek-r1:free", "base_url": "https://openrouter.ai/api/v1"},
{"name": "Gemini-Flash", "key": "GOOGLE_API_KEY", "provider": "google", "model_name": "gemini-2.0-flash", "alternatives": gemini_alternatives},
{"name": "Groq", "key": "GROQ_API_KEY", "provider": "groq", "model_name": "llama-3.3-70b-versatile"},
]
last_exception = None
for i in range(start_tier, len(tiers_config)):
tier = tiers_config[i]
api_key = os.getenv(tier["key"])
if not api_key:
continue
def create_model_instance(m_name, provider, b_url=None):
if provider == "openai":
from langchain_openai import ChatOpenAI
return ChatOpenAI(model=m_name, openai_api_key=api_key, openai_api_base=b_url, temperature=0)
elif provider == "google":
from langchain_google_genai import ChatGoogleGenerativeAI
return ChatGoogleGenerativeAI(model=m_name, temperature=0)
elif provider == "groq":
from langchain_groq import ChatGroq
return ChatGroq(model=m_name, temperature=0, max_retries=2)
return None
primary_model = create_model_instance(tier["model_name"], tier["provider"], tier.get("base_url"))
if use_tools:
primary_model = primary_model.bind_tools(tools)
models_to_try = [primary_model]
if "alternatives" in tier:
for alt_name in tier["alternatives"]:
alt_model = create_model_instance(alt_name, tier["provider"], tier.get("base_url"))
if use_tools:
alt_model = alt_model.bind_tools(tools)
models_to_try.append(alt_model)
for current_model in models_to_try:
try:
model_name = getattr(current_model, "model", tier["name"])
print(f"--- Calling {tier['name']} ({model_name}) ---")
return current_model.invoke(msgs), i
except Exception as e:
err_str = str(e).lower()
# If it's a 404 (not found) and we have more alternatives, continue to the next alternative
if any(x in err_str for x in ["not_found", "404"]) and current_model != models_to_try[-1]:
print(f"--- {tier['name']} model {model_name} not found. Trying alternative... ---")
continue
# Catch other fallback triggers
if any(x in err_str for x in ["rate_limit", "429", "500", "503", "overloaded", "not_found", "404", "402", "credits", "decommissioned", "invalid_request_error"]):
print(f"--- {tier['name']} Error: {e}. Trying next model/tier... ---")
last_exception = e
# If this tier has more alternatives, continue to the next one
if current_model != models_to_try[-1]:
continue
break # Move to next tier
raise e
if last_exception:
print("CRITICAL: All fallback tiers failed.")
raise last_exception
return None, 0
@tool
def web_search(keywords: str) -> str:
"""
Uses duckduckgo to search the top 5 result on web
Use cases:
- Identify personal information
- Information search
- Finding organisation information
- Obtain the latest news
Args:
keywords: keywords used to search the web
Returns:
Search result (Header + body + url)
"""
max_retries = 3
for attempt in range(max_retries):
try:
with DDGS() as ddgs:
output = ""
results = ddgs.text(keywords, max_results = 5)
for result in results:
output += f"Results: {result['title']}\n{result['body']}\n{result['href']}\n\n"
return output
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
return f"Search failed after {max_retries} attempts: {str(e)}"
@tool
def wiki_search(query: str) -> str:
"""
Search Wikipedia for a query and return up to 3 results.
Use cases:
When the question requires the use of information from wikipedia
Args:
query: The search query
"""
search_docs = WikipediaLoader(query=query, load_max_docs=3, doc_content_chars_max=15000).load()
if not search_docs:
return "No Wikipedia results found."
formatted_search_docs = "\n\n---\n\n".join(
[
f'\n{doc.page_content}\n'
for doc in search_docs
])
return formatted_search_docs
def get_vision_models():
"""Returns a list of vision models to try, in order of preference."""
configs = [
{"name": "OpenRouter-Qwen3-VL", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "qwen/qwen3-vl-235b-thinking:free", "base_url": "https://openrouter.ai/api/v1"},
{"name": "NVIDIA-Nemotron-VL", "key": "NVIDIA_API_KEY", "provider": "openai", "model_name": "nvidia/nemotron-nano-2-vl:free", "base_url": "https://integrate.api.nvidia.com/v1"},
{"name": "OpenRouter-Gemma-3-27b-it", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "google/gemma-3-27b-it:free", "base_url": "https://openrouter.ai/api/v1"},
{"name": "Google-Gemini-2.0-Flash", "key": "GOOGLE_API_KEY", "provider": "google", "model_name": "gemini-2.0-flash"},
{"name": "Google-Gemini-Flash-Latest", "key": "GOOGLE_API_KEY", "provider": "google", "model_name": "gemini-flash-latest"},
]
models = []
for cfg in configs:
api_key = os.getenv(cfg["key"])
if not api_key:
continue
if cfg["provider"] == "openai":
from langchain_openai import ChatOpenAI
m = ChatOpenAI(model=cfg["model_name"], openai_api_key=api_key, openai_api_base=cfg.get("base_url"), temperature=0)
elif cfg["provider"] == "google":
from langchain_google_genai import ChatGoogleGenerativeAI
m = ChatGoogleGenerativeAI(model=cfg["model_name"], temperature=0)
elif cfg["provider"] == "groq":
from langchain_groq import ChatGroq
m = ChatGroq(model=cfg["model_name"], temperature=0)
models.append({"name": cfg["name"], "model": m})
return models
@tool
def analyze_image(image_path: str, question: str) -> str:
"""
EXTERNAL SIGHT API: Sends an image path to a Vision Model to answer a specific question.
YOU MUST CALL THIS TOOL ANY TIME an image (.png, .jpg, .jpeg) is attached to the prompt.
NEVER claim you cannot see images. Use this tool instead.
Args:
image_path: The local path or URL to the image file.
question: Specific question describing what you want the vision model to look for.
"""
try:
if not os.path.exists(image_path):
return f"Error: Image file not found at {image_path}"
# If it's a local file, we encode it to base64
with open(image_path, "rb") as image_file:
encoded_image = base64.b64encode(image_file.read()).decode('utf-8')
message = HumanMessage(
content=[
{"type": "text", "text": question},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"},
},
]
)
vision_models = get_vision_models()
if not vision_models:
return "Error: No vision models configured (missing API keys)."
last_err = None
for item in vision_models:
try:
m_name = getattr(item['model'], 'model', 'unknown')
print(f"--- Calling Vision Model: {item['name']} ({m_name}) ---")
response = item['model'].invoke([message])
return extract_text_from_content(response.content)
except Exception as e:
print(f"Vision Model {item['name']} failed.")
traceback.print_exc()
last_err = e
return f"Error analyzing image: All vision models failed. Last error: {str(last_err)}"
except Exception as e:
traceback.print_exc()
return f"Error reading/processing image: {str(e)}"
@tool
def analyze_audio(audio_path: str, question: str) -> str:
"""
Transcribes an audio file (.mp3, .wav, .m4a) to answer questions about what is spoken.
Args:
audio_path: The local path to the audio file.
question: The specific question to ask.
"""
try:
model = get_whisper()
result = model.transcribe(audio_path)
transcript = result["text"]
return f"Audio Transcript:\n{transcript}"
except Exception as e:
return f"Error analyzing audio: {str(e)}. Tip: You requires 'ffmpeg' installed on your system."
@tool
def analyze_video(video_path: str, question: str) -> str:
"""
EXTERNAL SIGHT/HEARING API: Sends a video file to an external Vision/Audio model.
YOU MUST CALL THIS TOOL ANY TIME a video (.mp4, .avi) is attached to the prompt.
NEVER claim you cannot analyze videos. Use this tool instead.
Args:
video_path: The local path to the video file.
question: Specific question describing what you want to extract from the video.
"""
if cv2 is None:
return "Error: cv2 is not installed. Please install opencv-python."
temp_dir = tempfile.gettempdir()
downloaded_video = None
try:
# Check if video_path is a URL
if video_path.startswith("http"):
print(f"Downloading video from URL: {video_path}")
downloaded_video = os.path.join(temp_dir, f"video_{int(time.time())}.mp4")
try:
# Use yt-dlp to download the video
# Note: --ffmpeg-location could be used if we knew where it was, but we assume it's in path or missing
subprocess.run(["yt-dlp", "-f", "best[ext=mp4]/mp4", "-o", downloaded_video, video_path], check=True, timeout=120)
video_path = downloaded_video
except Exception as e:
return f"Error downloading video from URL: {str(e)}. Tip: Check if yt-dlp is installed and the URL is valid."
# 1. Extract frames evenly spaced throughout the video
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames == 0:
return "Error: Could not read video frames."
# Take 5 frames as a summary
frame_indices = [int(i * total_frames / 5) for i in range(5)]
extracted_descriptions = []
vision_models = get_vision_models()
# Ensure Groq-Llama is at the front for video if preferred, but we'll use the default order for now.
for idx_num, frame_idx in enumerate(frame_indices):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if ret:
# Convert frame to base64
_, buffer = cv2.imencode('.jpg', frame)
encoded_image = base64.b64encode(buffer).decode('utf-8')
# Ask a vision model to describe the frame (with fallback)
msg = HumanMessage(
content=[
{"type": "text", "text": f"Describe what is happening in this video frame concisely. Focus on aspects related to: {question}"},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"}},
]
)
desc = "No description available."
for item in vision_models:
try:
print(f"--- Calling Vision Model for Frame {idx_num+1}: {item['name']} ---")
desc = item['model'].invoke([msg]).content
break
except Exception as e:
print(f"Vision Model {item['name']} failed for frame: {e}")
continue
extracted_descriptions.append(f"Frame {idx_num + 1}: {desc}")
cap.release()
# 2. Compile the context for the agent
video_context = "\n".join(extracted_descriptions)
# 3. Transcribe audio if possible
try:
whisper_mod = get_whisper()
trans_result = whisper_mod.transcribe(video_path)
transcript = trans_result.get("text", "")
if transcript.strip():
video_context += f"\n\nVideo Audio Transcript:\n{transcript}"
except Exception as e:
video_context += f"\n\n(No audio transcript generated: {e})"
return f"Video Summary based on extracted frames and audio:\n{video_context}"
except Exception as e:
err_msg = str(e)
if "No address associated with hostname" in err_msg or "Failed to resolve" in err_msg:
return f"Error: The environment cannot access the internet (DNS failure). Please use 'web_search' or 'wiki_search' to find information about this video content instead of trying to download it."
return f"Error analyzing video: {err_msg}"
finally:
if downloaded_video and os.path.exists(downloaded_video):
try:
os.remove(downloaded_video)
except:
pass
@tool
def read_url(url: str) -> str:
"""
Reads and extracts text from a specific webpage URL.
Use this if a web search snippet doesn't contain enough detail.
"""
try:
loader = WebBaseLoader(url)
docs = loader.load()
# Truncate to first 15000 characters to fit context
if not docs:
return "No content could be extracted from this URL."
return docs[0].page_content[:15000]
except Exception as e:
return f"Error reading URL: {e}"
@tool
def run_python_script(code: str) -> str:
"""
Executes a Python script locally and returns the stdout and stderr.
Use this to perform complex math, data analysis (e.g. pandas), or file processing.
When given a file path, you can write python code to read and analyze it.
"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
temp_file_name = f.name
try:
result = subprocess.run(
["python", temp_file_name],
capture_output=True,
text=True,
timeout=60
)
os.remove(temp_file_name)
output = result.stdout
if result.stderr:
output += f"\nErrors:\n{result.stderr}"
return (output or "Script executed successfully with no output.")[:15000]
except subprocess.TimeoutExpired:
os.remove(temp_file_name)
return "Script execution timed out after 60 seconds."
except Exception as e:
if os.path.exists(temp_file_name):
os.remove(temp_file_name)
return f"Failed to execute script: {str(e)}"
@tool
def read_document(file_path: str) -> str:
"""
Reads the text contents of a local document (.txt, .csv, .json, .md).
For binary files like .xlsx or .pdf, use run_python_script to process them instead.
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if len(content) > 15000:
return content[:15000] + "... (truncated)"
return content
except Exception as e:
return f"Error reading document: {str(e)}. Tip: You can try running a python script to read it!"
system_prompt = """
You are a helpful assistant tasked with answering questions using a set of tools.
Now, I will ask you a question. Report your thoughts, and finish your answer with the following template:
FINAL ANSWER: [YOUR FINAL ANSWER].
YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string.
Your answer should only start with "FINAL ANSWER: ", then follows with the answer.
"""
class AgentState(TypedDict):
messages: List[Union[HumanMessage, AIMessage, SystemMessage]]
def read_message(state: AgentState) -> AgentState:
messages = state["messages"]
print(f"Processing question: {messages[-1].content if messages else ''}")
# Just pass the messages through to the next node
return {"messages": messages}
def restart_required(state: AgentState) -> AgentState:
messages = state["messages"]
print(f"Processing question: {messages[-1].content if messages else ''}")
# Just pass the messages through to the next node
return {"messages": messages}
# def tool_message(state: AgentState) -> AgentState:
# messages = state["messages"]
# prompt = f"""
# You are a GAIA question answering expert.
# Your task is to decide whether to use a tool or not.
# If you need to use a tool, answer ONLY:
# CALL_TOOL:
# If you do not need to use a tool, answer ONLY:
# NO_TOOL
# Here is the question:
# {messages}
# """
# return {"messages": messages}
# response = model_with_tools.invoke(prompt)
# return {"messages": messages + [response]}
# Augment the LLM with tools
tools = [web_search, wiki_search, analyze_image, analyze_audio, analyze_video, read_url, run_python_script, read_document]
tools_by_name = {tool.name: tool for tool in tools}
def extract_text_from_content(content: Any) -> str:
"""Extracts a simple string from various possible AIMessage content formats."""
if isinstance(content, str):
return content
if isinstance(content, list):
text_parts = []
for part in content:
if isinstance(part, str):
text_parts.append(part)
elif isinstance(part, dict) and "text" in part:
text_parts.append(part["text"])
elif isinstance(part, dict) and "type" in part and part["type"] == "text":
text_parts.append(part.get("text", ""))
return "".join(text_parts)
return str(content)
def answer_message(state: AgentState) -> AgentState:
messages = state["messages"]
current_date = datetime.datetime.now().strftime("%Y-%m-%d")
prompt = [SystemMessage(f"""
You are a master of the GAIA benchmark, a general AI assistant designed to solve complex multi-step tasks.
Think carefully and logically. Use your tools effectively. Use your internal monologue to plan your steps.
TODAY'S EXACT DATE is {current_date}. Keep this in mind for all time-sensitive queries.
CRITICAL RULES:
1. If you see a path like `[Attached File Local Path: ...]` followed by an image, video, or audio file, YOU MUST USE THE CORRESPONDING TOOL (analyze_image, analyze_video, analyze_audio) IMMEDIATELY in your next step.
2. Plan your steps ahead. 12 steps is your LIMIT for the reasoning loop, so make every step count.
3. If a tool fails (e.g., 429 or 402), the system will automatically try another model for you, so just keep going!
4. Be concise and accurate. YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list.
5. CHAIN-OF-THOUGHT: For complex questions, show your reasoning step by step before giving the final answer.
6. USE TOOLS AGGRESSIVELY: If a question requires computation, file reading, or web search, use the appropriate tools - don't try to answer from memory.
7. VERIFY YOUR ANSWER: Double-check calculations and facts using tools when uncertain.
""")]
messages = prompt + messages
# Force tool usage if image path is detected
for msg in state["messages"]:
if isinstance(msg, HumanMessage) and "[Attached File Local Path:" in msg.content:
messages.append(HumanMessage(content="IMPORTANT: I see an image path in the message. I MUST call the analyze_image tool IMMEDIATELY in my next step to see it."))
# Multi-step ReAct Loop (Up to 12 reasoning steps)
max_steps = 12
draft_response = None
current_tier = 0
for step in range(max_steps):
if step > 0:
time.sleep(3)
print(f"--- ReAct Step {step + 1} ---")
# Max history truncation to avoid 413 Request Too Large errors
safe_messages = messages[:2] + messages[-6:] if len(messages) > 10 else messages
ai_msg, current_tier = smart_invoke(safe_messages, use_tools=True, start_tier=current_tier)
messages.append(ai_msg)
# Check if the model requested tools
tool_calls = getattr(ai_msg, "tool_calls", None) or []
if not tool_calls:
# Model decided it has enough info to answer
draft_response = ai_msg
print(f"Model found answer or stopped tools: {ai_msg.content}")
break
# Execute requested tools and append their text output into the conversation
for tool_call in tool_calls:
name = tool_call["name"]
args = tool_call["args"]
tool_call_id = tool_call.get("id")
print(f"Calling tool: {name} with args: {args}")
try:
tool = tools_by_name[name]
tool_result = tool.invoke(args)
except Exception as e:
tool_result = f"Error executing tool {name}: {str(e)}"
# Using ToolMessage allows the model to map the result back perfectly to its request
messages.append(ToolMessage(content=str(tool_result), tool_call_id=tool_call_id, name=name))
# If we exhausted all steps without an answer, force a draft response
if draft_response is None:
print("Max reasoning steps reached. Forcing answer extraction.")
forced_msg = HumanMessage(content="You have reached the maximum reasoning steps. Please provide your best final answer based on the current context without any more tool calls.")
messages.append(forced_msg)
draft_response, _ = smart_invoke(messages, use_tools=False)
# Third pass: strict GAIA formatting extraction
formatting_sys = SystemMessage(
content=(
"You are a strict output formatter for the GAIA benchmark. "
"Given a verbose draft answer, extract ONLY the final exact answer required. "
"Return nothing else. DO NOT include prefixes like 'The answer is'. "
"Strip trailing whitespace only. "
"If the answer is a number, just return the number. "
"If the answer is a list or set of elements, return them as a COMMA-SEPARATED list (e.g., 'a, b, c'). "
"Preserve necessary punctuation within answers (e.g., 'Dr. Smith' should keep the period)."
)
)
final_response, _ = smart_invoke([formatting_sys, HumanMessage(content=extract_text_from_content(draft_response.content))], use_tools=False, start_tier=current_tier)
print(f"Draft response: {draft_response.content}")
print(f"Strict Final response: {final_response.content}")
# Return messages including the final AIMessage so BasicAgent reads .content
# Ensure final_response has string content for basic agents
if not isinstance(final_response.content, str):
final_response.content = extract_text_from_content(final_response.content)
messages.append(draft_response)
messages.append(final_response)
return {"messages": messages}
def build_graph():
agent_graph = StateGraph(AgentState)
# Add nodes
agent_graph.add_node("read_message", read_message)
agent_graph.add_node("answer_message", answer_message)
# Add edges
agent_graph.add_edge(START, "read_message")
agent_graph.add_edge("read_message", "answer_message")
# Final edge
agent_graph.add_edge("answer_message", END)
# Compile and return the executable graph for use in app.py
compiled_graph = agent_graph.compile()
return compiled_graph