| import os |
| import re |
| from groq import Groq |
| from ddgs import DDGS |
| from bs4 import BeautifulSoup |
| import requests |
| from utils import BaseAgent, SimpleRateLimiter |
|
|
| class GaiaAgent(BaseAgent): |
| """Simple but effective agent for GAIA benchmark""" |
| |
| def __init__(self): |
| self.client = Groq(api_key=os.environ.get("GROQ_API_KEY")) |
| |
| self.models = [ |
| "openai/gpt-oss-120b", |
| "qwen/qwen3.6-27b", |
| "llama-3.3-70b-versatile", |
| "llama-3.1-8b-instant" |
| ] |
| self.rate_limiter = SimpleRateLimiter() |
| self.current_model = self.models[0] |
| |
| def web_search(self, query, max_results=3): |
| """Search the web using DDGS (DuckDuckGo)""" |
| print(f"[web_search] query: {query[:80]}...") |
| results = [] |
| try: |
| ddgs = DDGS() |
| for r in ddgs.text(query, max_results=max_results): |
| results.append({ |
| "title": r.get("title"), |
| "url": r.get("href"), |
| "snippet": r.get("body") |
| }) |
| except Exception as e: |
| print(f"[web_search] error: {e}") |
| return results |
| |
| def fetch_page_text(self, url, max_chars=5000): |
| """Fetch and clean page text from URL""" |
| try: |
| r = requests.get(url, timeout=10, headers={"User-Agent": "Mozilla/5.0"}) |
| r.raise_for_status() |
| soup = BeautifulSoup(r.text, "html.parser") |
| |
| |
| for tag in soup(["script", "style", "nav", "footer", "header"]): |
| tag.decompose() |
| |
| text = soup.get_text(separator=" ", strip=True) |
| text = re.sub(r"\s+", " ", text) |
| return text[:max_chars] |
| except Exception as e: |
| print(f"[fetch_page_text] error: {e}") |
| return "" |
| |
| def gather_web_context(self, question): |
| """Gather web context for a question""" |
| results = self.web_search(question, max_results=4) |
| context_blocks = [] |
| |
| for i, r in enumerate(results[:2]): |
| page_text = self.fetch_page_text(r["url"]) |
| content = page_text if len(page_text) > 200 else r.get("snippet", "") |
| if content: |
| context_blocks.append(f"SOURCE {i+1}: {r['title']}\nCONTENT: {content[:2000]}") |
| |
| return "\n\n---\n\n".join(context_blocks) if context_blocks else "" |
| |
| def extract_answer(self, text): |
| """Extract final answer from model output""" |
| if not text: |
| return "" |
| |
| |
| match = re.search(r"final\s+answer\s*:\s*(.*?)(?:\n|$)", text, re.IGNORECASE | re.DOTALL) |
| if match: |
| return match.group(1).strip().split("\n")[0].strip() |
| |
| |
| lines = [l.strip() for l in text.split("\n") if l.strip()] |
| return lines[-1] if lines else "" |
| |
| def run(self, question: str, file_content: str = "") -> str: |
| """Run the agent on a question""" |
| print(f"\n{'='*70}") |
| print(f"[agent] question: {question[:100]}...") |
| |
| |
| self.rate_limiter.wait_if_needed() |
| |
| context_parts = [] |
| |
| |
| if file_content: |
| context_parts.append(f"FILE CONTENT:\n{file_content[:3000]}") |
| |
| |
| web_context = self.gather_web_context(question) |
| if web_context: |
| context_parts.append(f"WEB SEARCH:\n{web_context}") |
| |
| context = "\n\n===\n\n".join(context_parts) if context_parts else "(no context)" |
| |
| prompt = f"""You are answering a question from GAIA, an automated evaluation benchmark. |
| |
| IMPORTANT: Your response MUST end with a line starting with "FINAL ANSWER:" followed by ONLY the answer. |
| - After "FINAL ANSWER:", provide only the answer with no explanation |
| - For lists, use comma-separated format |
| - For numbers, use exact format requested |
| - For names, use exact spelling |
| |
| Question: |
| {question} |
| |
| Context: |
| {context} |
| |
| Remember to end with: |
| FINAL ANSWER: <answer> |
| """ |
| |
| |
| answer = "" |
| for model in self.models: |
| self.current_model = model |
| print(f"[agent] trying model: {model}...") |
| try: |
| response = self.client.chat.completions.create( |
| model=model, |
| messages=[{"role": "user", "content": prompt}], |
| temperature=0, |
| max_tokens=1000 |
| ) |
| output = response.choices[0].message.content |
| print(f"[agent] ✓ got response ({len(output)} chars)") |
| answer = self.extract_answer(output) |
| |
| if answer and len(answer) > 0: |
| print(f"[agent] ✓ got answer from {model}") |
| break |
| else: |
| print(f"[agent] ✗ model {model} returned empty answer, trying next...") |
| |
| except Exception as e: |
| error_msg = str(e) |
| if "does not exist" in error_msg or "not found" in error_msg: |
| print(f"[agent] ✗ model {model} not found, trying next...") |
| elif "overload" in error_msg.lower() or "rate limit" in error_msg.lower(): |
| print(f"[agent] ✗ rate limit on {model}, trying next...") |
| else: |
| print(f"[agent] ✗ error with {model}: {e}") |
| continue |
| |
| if not answer or len(answer) == 0: |
| print(f"[agent] retrying with shorter prompt...") |
| self.rate_limiter.wait_if_needed() |
| try: |
| response = self.client.chat.completions.create( |
| model=self.models[0], |
| messages=[{"role": "user", "content": f"Answer briefly:\n{question}\n\nFINAL ANSWER:"}], |
| temperature=0, |
| max_tokens=200 |
| ) |
| output = response.choices[0].message.content |
| answer = self.extract_answer(output) |
| except Exception as e: |
| print(f"[agent] retry error: {e}") |
| |
| if not answer: |
| answer = "I am unable to answer" |
| |
| print(f"[agent] final answer: '{answer}'") |
| print(f"{'='*70}\n") |
| return answer |
| |
| def __call__(self, question: str, file_content: str = "") -> str: |
| return self.run(question, file_content) |