| import os |
| import base64 |
| import requests |
| import json |
| import traceback |
| import datetime |
| import subprocess |
| import tempfile |
| import time |
| from typing import TypedDict, List, Dict, Any, Optional, Union |
| from langchain_core import tools |
| from langgraph.graph import StateGraph, START, END |
| from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFacePipeline |
| from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage |
| from langchain_core.tools import tool |
| from langchain_community.document_loaders import WikipediaLoader |
| from ddgs import DDGS |
| from dotenv import load_dotenv |
| from groq import Groq |
| from langchain_groq import ChatGroq |
| from langchain_community.document_loaders.image import UnstructuredImageLoader |
| from langchain_community.document_loaders import WebBaseLoader |
| from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
| try: |
| import cv2 |
| except ImportError: |
| cv2 = None |
|
|
| |
|
|
| whisper_model = None |
| def get_whisper(): |
| global whisper_model |
| if whisper_model is None: |
| import whisper |
| |
| whisper_model = whisper.load_model("base") |
| return whisper_model |
|
|
| load_dotenv(override=True) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| def smart_invoke(msgs, use_tools=False, start_tier=0): |
| """ |
| Tiered fallback: OpenRouter -> Gemini -> Groq -> NVIDIA -> Vercel. |
| Retries next tier if a 429 (rate limit), 402 (credits), or 404 (model found) error occurs. |
| """ |
| |
| |
| gemini_alternatives = ["gemini-2.5-flash", "gemini-2.0-flash", "gemini-flash-latest", "gemini-pro-latest"] |
| |
| tiers_config = [ |
| {"name": "Qwen3-Next-80B", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "qwen/qwen3-next-80b-a3b-instruct:free", "base_url": "https://openrouter.ai/api/v1"}, |
| {"name": "Gemma-3-27B", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "google/gemma-3-27b-it:free", "base_url": "https://openrouter.ai/api/v1"}, |
| {"name": "NVIDIA-Nemotron-Super", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "nvidia/nemotron-3-super-120b-a12b:free", "base_url": "https://openrouter.ai/api/v1"}, |
| {"name": "OpenRouter-FreeRouter", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "openrouter/free", "base_url": "https://openrouter.ai/api/v1"}, |
| {"name": "DeepSeek-R1", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "deepseek/deepseek-r1:free", "base_url": "https://openrouter.ai/api/v1"}, |
| {"name": "Gemini-Flash", "key": "GOOGLE_API_KEY", "provider": "google", "model_name": "gemini-2.0-flash", "alternatives": gemini_alternatives}, |
| {"name": "Groq", "key": "GROQ_API_KEY", "provider": "groq", "model_name": "llama-3.3-70b-versatile"}, |
| ] |
| |
| last_exception = None |
| for i in range(start_tier, len(tiers_config)): |
| tier = tiers_config[i] |
| api_key = os.getenv(tier["key"]) |
| if not api_key: |
| continue |
| |
| def create_model_instance(m_name, provider, b_url=None): |
| if provider == "openai": |
| from langchain_openai import ChatOpenAI |
| return ChatOpenAI(model=m_name, openai_api_key=api_key, openai_api_base=b_url, temperature=0) |
| elif provider == "google": |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| return ChatGoogleGenerativeAI(model=m_name, temperature=0) |
| elif provider == "groq": |
| from langchain_groq import ChatGroq |
| return ChatGroq(model=m_name, temperature=0, max_retries=2) |
| return None |
|
|
| primary_model = create_model_instance(tier["model_name"], tier["provider"], tier.get("base_url")) |
| if use_tools: |
| primary_model = primary_model.bind_tools(tools) |
| |
| models_to_try = [primary_model] |
| if "alternatives" in tier: |
| for alt_name in tier["alternatives"]: |
| alt_model = create_model_instance(alt_name, tier["provider"], tier.get("base_url")) |
| if use_tools: |
| alt_model = alt_model.bind_tools(tools) |
| models_to_try.append(alt_model) |
|
|
| for current_model in models_to_try: |
| try: |
| model_name = getattr(current_model, "model", tier["name"]) |
| print(f"--- Calling {tier['name']} ({model_name}) ---") |
| return current_model.invoke(msgs), i |
| except Exception as e: |
| err_str = str(e).lower() |
| |
| if any(x in err_str for x in ["not_found", "404"]) and current_model != models_to_try[-1]: |
| print(f"--- {tier['name']} model {model_name} not found. Trying alternative... ---") |
| continue |
| |
| |
| if any(x in err_str for x in ["rate_limit", "429", "500", "503", "overloaded", "not_found", "404", "402", "credits", "decommissioned", "invalid_request_error"]): |
| print(f"--- {tier['name']} Error: {e}. Trying next model/tier... ---") |
| last_exception = e |
| |
| if current_model != models_to_try[-1]: |
| continue |
| break |
| raise e |
| |
| if last_exception: |
| print("CRITICAL: All fallback tiers failed.") |
| raise last_exception |
| return None, 0 |
|
|
| @tool |
| def web_search(keywords: str) -> str: |
| """ |
| Uses duckduckgo to search the top 5 result on web |
| |
| Use cases: |
| - Identify personal information |
| - Information search |
| - Finding organisation information |
| - Obtain the latest news |
| |
| Args: |
| keywords: keywords used to search the web |
| |
| Returns: |
| Search result (Header + body + url) |
| """ |
| max_retries = 3 |
| for attempt in range(max_retries): |
| try: |
| with DDGS() as ddgs: |
| output = "" |
| results = ddgs.text(keywords, max_results = 5) |
| for result in results: |
| output += f"Results: {result['title']}\n{result['body']}\n{result['href']}\n\n" |
| return output |
| except Exception as e: |
| if attempt < max_retries - 1: |
| time.sleep(2 ** attempt) |
| continue |
| return f"Search failed after {max_retries} attempts: {str(e)}" |
|
|
| @tool |
| def wiki_search(query: str) -> str: |
| """ |
| Search Wikipedia for a query and return up to 3 results. |
| |
| Use cases: |
| When the question requires the use of information from wikipedia |
| |
| Args: |
| query: The search query |
| """ |
|
|
| search_docs = WikipediaLoader(query=query, load_max_docs=3, doc_content_chars_max=15000).load() |
| |
| if not search_docs: |
| return "No Wikipedia results found." |
| |
| formatted_search_docs = "\n\n---\n\n".join( |
| [ |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("title", "Unknown Title")}"/>\n{doc.page_content}\n</Document>' |
| for doc in search_docs |
| ]) |
| return formatted_search_docs |
|
|
| def get_vision_models(): |
| """Returns a list of vision models to try, in order of preference.""" |
| configs = [ |
| {"name": "OpenRouter-Qwen3-VL", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "qwen/qwen3-vl-235b-thinking:free", "base_url": "https://openrouter.ai/api/v1"}, |
| {"name": "NVIDIA-Nemotron-VL", "key": "NVIDIA_API_KEY", "provider": "openai", "model_name": "nvidia/nemotron-nano-2-vl:free", "base_url": "https://integrate.api.nvidia.com/v1"}, |
| {"name": "OpenRouter-Gemma-3-27b-it", "key": "OPENROUTER_API_KEY", "provider": "openai", "model_name": "google/gemma-3-27b-it:free", "base_url": "https://openrouter.ai/api/v1"}, |
| {"name": "Google-Gemini-2.0-Flash", "key": "GOOGLE_API_KEY", "provider": "google", "model_name": "gemini-2.0-flash"}, |
| {"name": "Google-Gemini-Flash-Latest", "key": "GOOGLE_API_KEY", "provider": "google", "model_name": "gemini-flash-latest"}, |
| ] |
| models = [] |
| for cfg in configs: |
| api_key = os.getenv(cfg["key"]) |
| if not api_key: |
| continue |
| if cfg["provider"] == "openai": |
| from langchain_openai import ChatOpenAI |
| m = ChatOpenAI(model=cfg["model_name"], openai_api_key=api_key, openai_api_base=cfg.get("base_url"), temperature=0) |
| elif cfg["provider"] == "google": |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| m = ChatGoogleGenerativeAI(model=cfg["model_name"], temperature=0) |
| elif cfg["provider"] == "groq": |
| from langchain_groq import ChatGroq |
| m = ChatGroq(model=cfg["model_name"], temperature=0) |
| models.append({"name": cfg["name"], "model": m}) |
| return models |
|
|
| @tool |
| def analyze_image(image_path: str, question: str) -> str: |
| """ |
| EXTERNAL SIGHT API: Sends an image path to a Vision Model to answer a specific question. |
| YOU MUST CALL THIS TOOL ANY TIME an image (.png, .jpg, .jpeg) is attached to the prompt. |
| NEVER claim you cannot see images. Use this tool instead. |
| |
| Args: |
| image_path: The local path or URL to the image file. |
| question: Specific question describing what you want the vision model to look for. |
| """ |
| try: |
| if not os.path.exists(image_path): |
| return f"Error: Image file not found at {image_path}" |
| |
| |
| with open(image_path, "rb") as image_file: |
| encoded_image = base64.b64encode(image_file.read()).decode('utf-8') |
| |
| message = HumanMessage( |
| content=[ |
| {"type": "text", "text": question}, |
| { |
| "type": "image_url", |
| "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"}, |
| }, |
| ] |
| ) |
| |
| vision_models = get_vision_models() |
| if not vision_models: |
| return "Error: No vision models configured (missing API keys)." |
| |
| last_err = None |
| for item in vision_models: |
| try: |
| m_name = getattr(item['model'], 'model', 'unknown') |
| print(f"--- Calling Vision Model: {item['name']} ({m_name}) ---") |
| response = item['model'].invoke([message]) |
| return extract_text_from_content(response.content) |
| except Exception as e: |
| print(f"Vision Model {item['name']} failed.") |
| traceback.print_exc() |
| last_err = e |
| return f"Error analyzing image: All vision models failed. Last error: {str(last_err)}" |
| except Exception as e: |
| traceback.print_exc() |
| return f"Error reading/processing image: {str(e)}" |
|
|
| @tool |
| def analyze_audio(audio_path: str, question: str) -> str: |
| """ |
| Transcribes an audio file (.mp3, .wav, .m4a) to answer questions about what is spoken. |
| |
| Args: |
| audio_path: The local path to the audio file. |
| question: The specific question to ask. |
| """ |
| try: |
| model = get_whisper() |
| result = model.transcribe(audio_path) |
| transcript = result["text"] |
| return f"Audio Transcript:\n{transcript}" |
| except Exception as e: |
| return f"Error analyzing audio: {str(e)}. Tip: You requires 'ffmpeg' installed on your system." |
|
|
| @tool |
| def analyze_video(video_path: str, question: str) -> str: |
| """ |
| EXTERNAL SIGHT/HEARING API: Sends a video file to an external Vision/Audio model. |
| YOU MUST CALL THIS TOOL ANY TIME a video (.mp4, .avi) is attached to the prompt. |
| NEVER claim you cannot analyze videos. Use this tool instead. |
| |
| Args: |
| video_path: The local path to the video file. |
| question: Specific question describing what you want to extract from the video. |
| """ |
| if cv2 is None: |
| return "Error: cv2 is not installed. Please install opencv-python." |
| |
| temp_dir = tempfile.gettempdir() |
| downloaded_video = None |
| |
| try: |
| |
| if video_path.startswith("http"): |
| print(f"Downloading video from URL: {video_path}") |
| downloaded_video = os.path.join(temp_dir, f"video_{int(time.time())}.mp4") |
| try: |
| |
| |
| subprocess.run(["yt-dlp", "-f", "best[ext=mp4]/mp4", "-o", downloaded_video, video_path], check=True, timeout=120) |
| video_path = downloaded_video |
| except Exception as e: |
| return f"Error downloading video from URL: {str(e)}. Tip: Check if yt-dlp is installed and the URL is valid." |
|
|
| |
| cap = cv2.VideoCapture(video_path) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| if total_frames == 0: |
| return "Error: Could not read video frames." |
| |
| |
| frame_indices = [int(i * total_frames / 5) for i in range(5)] |
| extracted_descriptions = [] |
| |
| vision_models = get_vision_models() |
| |
| |
| for idx_num, frame_idx in enumerate(frame_indices): |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) |
| ret, frame = cap.read() |
| if ret: |
| |
| _, buffer = cv2.imencode('.jpg', frame) |
| encoded_image = base64.b64encode(buffer).decode('utf-8') |
| |
| |
| msg = HumanMessage( |
| content=[ |
| {"type": "text", "text": f"Describe what is happening in this video frame concisely. Focus on aspects related to: {question}"}, |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"}}, |
| ] |
| ) |
| |
| desc = "No description available." |
| for item in vision_models: |
| try: |
| print(f"--- Calling Vision Model for Frame {idx_num+1}: {item['name']} ---") |
| desc = item['model'].invoke([msg]).content |
| break |
| except Exception as e: |
| print(f"Vision Model {item['name']} failed for frame: {e}") |
| continue |
| |
| extracted_descriptions.append(f"Frame {idx_num + 1}: {desc}") |
| |
| cap.release() |
| |
| |
| video_context = "\n".join(extracted_descriptions) |
| |
| |
| try: |
| whisper_mod = get_whisper() |
| trans_result = whisper_mod.transcribe(video_path) |
| transcript = trans_result.get("text", "") |
| if transcript.strip(): |
| video_context += f"\n\nVideo Audio Transcript:\n{transcript}" |
| except Exception as e: |
| video_context += f"\n\n(No audio transcript generated: {e})" |
| |
| return f"Video Summary based on extracted frames and audio:\n{video_context}" |
| except Exception as e: |
| err_msg = str(e) |
| if "No address associated with hostname" in err_msg or "Failed to resolve" in err_msg: |
| return f"Error: The environment cannot access the internet (DNS failure). Please use 'web_search' or 'wiki_search' to find information about this video content instead of trying to download it." |
| return f"Error analyzing video: {err_msg}" |
| finally: |
| if downloaded_video and os.path.exists(downloaded_video): |
| try: |
| os.remove(downloaded_video) |
| except: |
| pass |
|
|
| @tool |
| def read_url(url: str) -> str: |
| """ |
| Reads and extracts text from a specific webpage URL. |
| Use this if a web search snippet doesn't contain enough detail. |
| """ |
| try: |
| loader = WebBaseLoader(url) |
| docs = loader.load() |
| |
| if not docs: |
| return "No content could be extracted from this URL." |
| return docs[0].page_content[:15000] |
| except Exception as e: |
| return f"Error reading URL: {e}" |
|
|
| @tool |
| def run_python_script(code: str) -> str: |
| """ |
| Executes a Python script locally and returns the stdout and stderr. |
| Use this to perform complex math, data analysis (e.g. pandas), or file processing. |
| When given a file path, you can write python code to read and analyze it. |
| """ |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: |
| f.write(code) |
| temp_file_name = f.name |
| |
| try: |
| result = subprocess.run( |
| ["python", temp_file_name], |
| capture_output=True, |
| text=True, |
| timeout=60 |
| ) |
| os.remove(temp_file_name) |
| |
| output = result.stdout |
| if result.stderr: |
| output += f"\nErrors:\n{result.stderr}" |
| |
| return (output or "Script executed successfully with no output.")[:15000] |
| except subprocess.TimeoutExpired: |
| os.remove(temp_file_name) |
| return "Script execution timed out after 60 seconds." |
| except Exception as e: |
| if os.path.exists(temp_file_name): |
| os.remove(temp_file_name) |
| return f"Failed to execute script: {str(e)}" |
|
|
| @tool |
| def read_document(file_path: str) -> str: |
| """ |
| Reads the text contents of a local document (.txt, .csv, .json, .md). |
| For binary files like .xlsx or .pdf, use run_python_script to process them instead. |
| """ |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| if len(content) > 15000: |
| return content[:15000] + "... (truncated)" |
| return content |
| except Exception as e: |
| return f"Error reading document: {str(e)}. Tip: You can try running a python script to read it!" |
|
|
| system_prompt = """ |
| You are a helpful assistant tasked with answering questions using a set of tools. |
| Now, I will ask you a question. Report your thoughts, and finish your answer with the following template: |
| FINAL ANSWER: [YOUR FINAL ANSWER]. |
| YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. |
| Your answer should only start with "FINAL ANSWER: ", then follows with the answer. |
| """ |
|
|
| class AgentState(TypedDict): |
| messages: List[Union[HumanMessage, AIMessage, SystemMessage]] |
|
|
| def read_message(state: AgentState) -> AgentState: |
| messages = state["messages"] |
| print(f"Processing question: {messages[-1].content if messages else ''}") |
| |
| return {"messages": messages} |
|
|
| def restart_required(state: AgentState) -> AgentState: |
| messages = state["messages"] |
| print(f"Processing question: {messages[-1].content if messages else ''}") |
| |
| return {"messages": messages} |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| tools = [web_search, wiki_search, analyze_image, analyze_audio, analyze_video, read_url, run_python_script, read_document] |
| tools_by_name = {tool.name: tool for tool in tools} |
| def extract_text_from_content(content: Any) -> str: |
| """Extracts a simple string from various possible AIMessage content formats.""" |
| if isinstance(content, str): |
| return content |
| if isinstance(content, list): |
| text_parts = [] |
| for part in content: |
| if isinstance(part, str): |
| text_parts.append(part) |
| elif isinstance(part, dict) and "text" in part: |
| text_parts.append(part["text"]) |
| elif isinstance(part, dict) and "type" in part and part["type"] == "text": |
| text_parts.append(part.get("text", "")) |
| return "".join(text_parts) |
| return str(content) |
|
|
| def answer_message(state: AgentState) -> AgentState: |
| messages = state["messages"] |
| current_date = datetime.datetime.now().strftime("%Y-%m-%d") |
| |
| prompt = [SystemMessage(f""" |
| You are a master of the GAIA benchmark, a general AI assistant designed to solve complex multi-step tasks. |
| Think carefully and logically. Use your tools effectively. Use your internal monologue to plan your steps. |
| |
| TODAY'S EXACT DATE is {current_date}. Keep this in mind for all time-sensitive queries. |
| |
| CRITICAL RULES: |
| 1. If you see a path like `[Attached File Local Path: ...]` followed by an image, video, or audio file, YOU MUST USE THE CORRESPONDING TOOL (analyze_image, analyze_video, analyze_audio) IMMEDIATELY in your next step. |
| 2. Plan your steps ahead. 12 steps is your LIMIT for the reasoning loop, so make every step count. |
| 3. If a tool fails (e.g., 429 or 402), the system will automatically try another model for you, so just keep going! |
| 4. Be concise and accurate. YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list. |
| 5. CHAIN-OF-THOUGHT: For complex questions, show your reasoning step by step before giving the final answer. |
| 6. USE TOOLS AGGRESSIVELY: If a question requires computation, file reading, or web search, use the appropriate tools - don't try to answer from memory. |
| 7. VERIFY YOUR ANSWER: Double-check calculations and facts using tools when uncertain. |
| """)] |
| messages = prompt + messages |
|
|
| |
| for msg in state["messages"]: |
| if isinstance(msg, HumanMessage) and "[Attached File Local Path:" in msg.content: |
| messages.append(HumanMessage(content="IMPORTANT: I see an image path in the message. I MUST call the analyze_image tool IMMEDIATELY in my next step to see it.")) |
|
|
| |
| max_steps = 12 |
| draft_response = None |
| current_tier = 0 |
| |
| for step in range(max_steps): |
| if step > 0: |
| time.sleep(3) |
| |
| print(f"--- ReAct Step {step + 1} ---") |
| |
| |
| safe_messages = messages[:2] + messages[-6:] if len(messages) > 10 else messages |
| |
| ai_msg, current_tier = smart_invoke(safe_messages, use_tools=True, start_tier=current_tier) |
| messages.append(ai_msg) |
|
|
| |
| tool_calls = getattr(ai_msg, "tool_calls", None) or [] |
| if not tool_calls: |
| |
| draft_response = ai_msg |
| print(f"Model found answer or stopped tools: {ai_msg.content}") |
| break |
|
|
| |
| for tool_call in tool_calls: |
| name = tool_call["name"] |
| args = tool_call["args"] |
| tool_call_id = tool_call.get("id") |
| print(f"Calling tool: {name} with args: {args}") |
| try: |
| tool = tools_by_name[name] |
| tool_result = tool.invoke(args) |
| except Exception as e: |
| tool_result = f"Error executing tool {name}: {str(e)}" |
| |
| |
| messages.append(ToolMessage(content=str(tool_result), tool_call_id=tool_call_id, name=name)) |
|
|
| |
| if draft_response is None: |
| print("Max reasoning steps reached. Forcing answer extraction.") |
| forced_msg = HumanMessage(content="You have reached the maximum reasoning steps. Please provide your best final answer based on the current context without any more tool calls.") |
| messages.append(forced_msg) |
| draft_response, _ = smart_invoke(messages, use_tools=False) |
|
|
| |
| formatting_sys = SystemMessage( |
| content=( |
| "You are a strict output formatter for the GAIA benchmark. " |
| "Given a verbose draft answer, extract ONLY the final exact answer required. " |
| "Return nothing else. DO NOT include prefixes like 'The answer is'. " |
| "Strip trailing whitespace only. " |
| "If the answer is a number, just return the number. " |
| "If the answer is a list or set of elements, return them as a COMMA-SEPARATED list (e.g., 'a, b, c'). " |
| "Preserve necessary punctuation within answers (e.g., 'Dr. Smith' should keep the period)." |
| ) |
| ) |
| final_response, _ = smart_invoke([formatting_sys, HumanMessage(content=extract_text_from_content(draft_response.content))], use_tools=False, start_tier=current_tier) |
| print(f"Draft response: {draft_response.content}") |
| print(f"Strict Final response: {final_response.content}") |
|
|
| |
| |
| if not isinstance(final_response.content, str): |
| final_response.content = extract_text_from_content(final_response.content) |
| |
| messages.append(draft_response) |
| messages.append(final_response) |
| return {"messages": messages} |
|
|
|
|
| def build_graph(): |
| agent_graph = StateGraph(AgentState) |
|
|
| |
| agent_graph.add_node("read_message", read_message) |
| agent_graph.add_node("answer_message", answer_message) |
|
|
| |
| agent_graph.add_edge(START, "read_message") |
| agent_graph.add_edge("read_message", "answer_message") |
|
|
| |
| agent_graph.add_edge("answer_message", END) |
|
|
| |
| compiled_graph = agent_graph.compile() |
| return compiled_graph |
|
|