import os import re from groq import Groq from ddgs import DDGS from bs4 import BeautifulSoup import requests from utils import BaseAgent, SimpleRateLimiter class GaiaAgent(BaseAgent): """Simple but effective agent for GAIA benchmark""" def __init__(self): self.client = Groq(api_key=os.environ.get("GROQ_API_KEY")) # Modèles Groq valides (en ordre de préférence) self.models = [ "openai/gpt-oss-120b", # ← en priorité "qwen/qwen3.6-27b", # fallback 1 "llama-3.3-70b-versatile", # fallback 2 "llama-3.1-8b-instant" # fallback 3 ] self.rate_limiter = SimpleRateLimiter() self.current_model = self.models[0] def web_search(self, query, max_results=3): """Search the web using DDGS (DuckDuckGo)""" print(f"[web_search] query: {query[:80]}...") results = [] try: ddgs = DDGS() for r in ddgs.text(query, max_results=max_results): results.append({ "title": r.get("title"), "url": r.get("href"), "snippet": r.get("body") }) except Exception as e: print(f"[web_search] error: {e}") return results def fetch_page_text(self, url, max_chars=5000): """Fetch and clean page text from URL""" try: r = requests.get(url, timeout=10, headers={"User-Agent": "Mozilla/5.0"}) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") # Remove noise for tag in soup(["script", "style", "nav", "footer", "header"]): tag.decompose() text = soup.get_text(separator=" ", strip=True) text = re.sub(r"\s+", " ", text) return text[:max_chars] except Exception as e: print(f"[fetch_page_text] error: {e}") return "" def gather_web_context(self, question): """Gather web context for a question""" results = self.web_search(question, max_results=4) context_blocks = [] for i, r in enumerate(results[:2]): # Use top 2 results page_text = self.fetch_page_text(r["url"]) content = page_text if len(page_text) > 200 else r.get("snippet", "") if content: context_blocks.append(f"SOURCE {i+1}: {r['title']}\nCONTENT: {content[:2000]}") return "\n\n---\n\n".join(context_blocks) if context_blocks else "" def extract_answer(self, text): """Extract final answer from model output""" if not text: return "" # Look for FINAL ANSWER: marker match = re.search(r"final\s+answer\s*:\s*(.*?)(?:\n|$)", text, re.IGNORECASE | re.DOTALL) if match: return match.group(1).strip().split("\n")[0].strip() # Fallback: take last non-empty line lines = [l.strip() for l in text.split("\n") if l.strip()] return lines[-1] if lines else "" def run(self, question: str, file_content: str = "") -> str: """Run the agent on a question""" print(f"\n{'='*70}") print(f"[agent] question: {question[:100]}...") # Rate limit before API call self.rate_limiter.wait_if_needed() context_parts = [] # Add file context if provided if file_content: context_parts.append(f"FILE CONTENT:\n{file_content[:3000]}") # Add web context web_context = self.gather_web_context(question) if web_context: context_parts.append(f"WEB SEARCH:\n{web_context}") context = "\n\n===\n\n".join(context_parts) if context_parts else "(no context)" prompt = f"""You are answering a question from GAIA, an automated evaluation benchmark. IMPORTANT: Your response MUST end with a line starting with "FINAL ANSWER:" followed by ONLY the answer. - After "FINAL ANSWER:", provide only the answer with no explanation - For lists, use comma-separated format - For numbers, use exact format requested - For names, use exact spelling Question: {question} Context: {context} Remember to end with: FINAL ANSWER: """ # Try models in order with fallback answer = "" for model in self.models: self.current_model = model print(f"[agent] trying model: {model}...") try: response = self.client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], temperature=0, max_tokens=1000 ) output = response.choices[0].message.content print(f"[agent] ✓ got response ({len(output)} chars)") answer = self.extract_answer(output) if answer and len(answer) > 0: print(f"[agent] ✓ got answer from {model}") break else: print(f"[agent] ✗ model {model} returned empty answer, trying next...") except Exception as e: error_msg = str(e) if "does not exist" in error_msg or "not found" in error_msg: print(f"[agent] ✗ model {model} not found, trying next...") elif "overload" in error_msg.lower() or "rate limit" in error_msg.lower(): print(f"[agent] ✗ rate limit on {model}, trying next...") else: print(f"[agent] ✗ error with {model}: {e}") continue if not answer or len(answer) == 0: print(f"[agent] retrying with shorter prompt...") self.rate_limiter.wait_if_needed() try: response = self.client.chat.completions.create( model=self.models[0], messages=[{"role": "user", "content": f"Answer briefly:\n{question}\n\nFINAL ANSWER:"}], temperature=0, max_tokens=200 ) output = response.choices[0].message.content answer = self.extract_answer(output) except Exception as e: print(f"[agent] retry error: {e}") if not answer: answer = "I am unable to answer" print(f"[agent] final answer: '{answer}'") print(f"{'='*70}\n") return answer def __call__(self, question: str, file_content: str = "") -> str: return self.run(question, file_content)