import os import re import json import base64 import subprocess import tempfile import requests import pandas as pd import gradio as gr from huggingface_hub import InferenceClient import anthropic DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # ── helpers ─────────────────────────────────────────────────────────────────── def _strip_html(html: str) -> str: from html.parser import HTMLParser class _P(HTMLParser): def __init__(self): super().__init__() self.parts = [] self._skip = False self._skip_tags = {"script", "style", "nav", "footer", "head"} def handle_starttag(self, tag, attrs): if tag in self._skip_tags: self._skip = True def handle_endtag(self, tag): if tag in self._skip_tags: self._skip = False def handle_data(self, data): if not self._skip and data.strip(): self.parts.append(data.strip()) p = _P() p.feed(html) return " ".join(p.parts) # ── agent ───────────────────────────────────────────────────────────────────── class BasicAgent: def __init__(self): # Use Anthropic API — no HF credits needed self.anthropic_client = anthropic.Anthropic( api_key=os.environ.get("ANTHROPIC_API_KEY", "") ) self.model = "claude-sonnet-4-20250514" # Keep HF client only for Whisper ASR (free, no Inference Provider needed) hf_token = self._get_hf_token() self.hf_token = hf_token self.hf_client = InferenceClient(token=hf_token) if hf_token else None self.api_url = DEFAULT_API_URL print(f"✅ Agent initialised with model: {self.model}") def _get_hf_token(self): for var in ("HF_TOKEN", "HUGGING_FACE_HUB_TOKEN", "HUGGINGFACE_HUB_TOKEN"): token = os.getenv(var, "").strip() if token: return token return None # ── raw file fetch ──────────────────────────────────────────────────────── def _fetch_file(self, task_id: str): """Return (bytes, content_type) or (None, '').""" try: r = requests.get(f"{self.api_url}/files/{task_id}", timeout=15) if r.status_code == 200 and r.content: return r.content, r.headers.get("Content-Type", "") except Exception: pass return None, "" # ── tool implementations ────────────────────────────────────────────────── def tool_check_file(self, task_id: str) -> str: fb, ct = self._fetch_file(task_id) if not fb: return "NO_FILE" ct_clean = ct.split(";")[0].strip().lower() return ( f"FILE_EXISTS type={ct_clean} size={len(fb)}_bytes. " f"Use the right tool: image→analyse_image, python→run_python_file, " f"excel/xlsx→read_excel_file, audio→transcribe_audio, " f"text/pdf→read_text_file." ) def tool_analyse_image(self, task_id: str, question: str) -> str: """Analyse image using Claude's vision.""" fb, ct = self._fetch_file(task_id) if not fb: return "No image found." ct_clean = ct.split(";")[0].strip().lower() if "image" not in ct_clean: return f"File is not an image (type={ct_clean})." b64 = base64.b64encode(fb).decode() # Map content type to Anthropic media type media_map = { "image/jpeg": "image/jpeg", "image/jpg": "image/jpeg", "image/png": "image/png", "image/gif": "image/gif", "image/webp": "image/webp", } media_type = media_map.get(ct_clean, "image/jpeg") try: response = self.anthropic_client.messages.create( model=self.model, max_tokens=800, messages=[{ "role": "user", "content": [ { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": b64, }, }, {"type": "text", "text": question}, ], }], ) return response.content[0].text except Exception as e: return f"Vision error: {e}" def tool_run_python_file(self, task_id: str) -> str: """Download and execute Python file, return stdout.""" fb, _ = self._fetch_file(task_id) if not fb: return "No file found." code = fb.decode("utf-8", errors="ignore") try: with tempfile.NamedTemporaryFile( suffix=".py", delete=False, mode="w" ) as f: f.write(code) fname = f.name result = subprocess.run( ["python3", fname], capture_output=True, text=True, timeout=30, ) out = result.stdout.strip() err = result.stderr.strip() return f"STDOUT:\n{out}" if out else f"STDERR:\n{err}" if err else "No output." except Exception as e: return f"Execution error: {e}" def tool_read_excel_file(self, task_id: str, question: str) -> str: """Load Excel/CSV and answer a question about it.""" fb, ct = self._fetch_file(task_id) if not fb: return "No file found." try: import io ct_clean = ct.split(";")[0].strip().lower() df = ( pd.read_csv(io.BytesIO(fb)) if ("csv" in ct_clean or "text" in ct_clean) else pd.read_excel(io.BytesIO(fb)) ) preview = df.to_string(max_rows=80, max_cols=20) return ( f"SPREADSHEET DATA:\n{preview}\n\n" f"Answer the following about this data: {question}" ) except Exception as e: return f"Excel read error: {e}" def tool_transcribe_audio(self, task_id: str) -> str: """Transcribe audio using HF Whisper (free ASR endpoint).""" fb, ct = self._fetch_file(task_id) if not fb: return "No file found." try: ct_clean = ct.split(";")[0].strip().lower() ext_map = { "audio/mpeg": ".mp3", "audio/mp3": ".mp3", "audio/wav": ".wav", "audio/x-wav": ".wav", "audio/ogg": ".ogg", "audio/flac": ".flac", "audio/m4a": ".m4a", "audio/mp4": ".mp4", } ext = ext_map.get(ct_clean, ".mp3") with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as f: f.write(fb) fname = f.name if self.hf_client: asr_client = InferenceClient( model="openai/whisper-large-v3", token=self.hf_token, ) with open(fname, "rb") as audio_f: result = asr_client.automatic_speech_recognition(audio_f) return result.text if hasattr(result, "text") else str(result) else: return "No HF token available for audio transcription." except Exception as e: return f"Transcription error: {e}" def tool_read_text_file(self, task_id: str) -> str: fb, ct = self._fetch_file(task_id) if not fb: return "No file found." try: ct_clean = ct.split(";")[0].strip().lower() if "pdf" in ct_clean: try: import pdfminer.high_level import io return pdfminer.high_level.extract_text(io.BytesIO(fb))[:6000] except ImportError: pass return fb.decode("utf-8", errors="ignore")[:6000] except Exception as e: return f"Read error: {e}" def tool_search_web(self, query: str) -> str: try: hdrs = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 Chrome/124.0 Safari/537.36" ) } r = requests.get( "https://html.duckduckgo.com/html/", params={"q": query}, headers=hdrs, timeout=12, ) from html.parser import HTMLParser class _DDG(HTMLParser): def __init__(self): super().__init__() self.results = [] self._in = False self._cur = "" def handle_starttag(self, tag, attrs): d = dict(attrs) if "result__snippet" in d.get("class", ""): self._in = True self._cur = "" def handle_data(self, data): if self._in: self._cur += data def handle_endtag(self, tag): if self._in: t = self._cur.strip() if t: self.results.append(t) self._in = False p = _DDG() p.feed(r.text) return "\n\n".join(p.results[:6]) or "No results." except Exception as e: return f"Search error: {e}" def tool_fetch_webpage(self, url: str) -> str: try: hdrs = {"User-Agent": "Mozilla/5.0 Chrome/124.0"} r = requests.get(url, headers=hdrs, timeout=18) r.raise_for_status() return _strip_html(r.text)[:8000] except Exception as e: return f"Fetch error: {e}" def tool_fetch_wikipedia(self, title: str) -> str: try: slug = requests.utils.quote(title.replace(" ", "_")) r = requests.get( f"https://en.wikipedia.org/api/rest_v1/page/summary/{slug}", timeout=12, ) if r.status_code == 200: return r.json().get("extract", "Not found.") r2 = requests.get( "https://en.wikipedia.org/w/api.php", params={ "action": "query", "prop": "extracts", "titles": title, "format": "json", "redirects": 1, }, timeout=12, ) pages = r2.json().get("query", {}).get("pages", {}) for page in pages.values(): text = _strip_html(page.get("extract", "")) if text: return text[:7000] except Exception as e: return f"Wikipedia error: {e}" return "Not found." def tool_youtube_transcript(self, video_url: str) -> str: try: from youtube_transcript_api import YouTubeTranscriptApi vid = re.search(r"v=([^&]+)", video_url) if not vid: return "Bad URL." entries = YouTubeTranscriptApi.get_transcript(vid.group(1)) return " ".join(e["text"] for e in entries)[:6000] except Exception as e: err = str(e) if any(k in err.lower() for k in ("blocked", "ip", "cloud", "requestblocked", "ipblocked")): return ( "BLOCKED: YouTube blocks cloud IPs. " "Use search_web to find transcript or description of this video." ) return f"Transcript error: {err}" # ── Anthropic tool definitions ──────────────────────────────────────────── TOOLS = [ { "name": "check_file", "description": ( "ALWAYS call this first. Checks if a file is attached to the task. " "Returns NO_FILE or the file type and which tool to use next." ), "input_schema": { "type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"], }, }, { "name": "analyse_image", "description": ( "Analyse an image file attached to the task using vision. " "Use for chess boards, diagrams, photos, screenshots." ), "input_schema": { "type": "object", "properties": { "task_id": {"type": "string"}, "question": { "type": "string", "description": "What to find or answer from the image.", }, }, "required": ["task_id", "question"], }, }, { "name": "run_python_file", "description": ( "Execute the Python file attached to the task and return its output. " "The stdout IS the answer." ), "input_schema": { "type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"], }, }, { "name": "read_excel_file", "description": "Read an Excel or CSV file and answer a question about its data.", "input_schema": { "type": "object", "properties": { "task_id": {"type": "string"}, "question": {"type": "string"}, }, "required": ["task_id", "question"], }, }, { "name": "transcribe_audio", "description": ( "Transcribe an audio file using Whisper. " "Use for voice memos, recordings, audio questions." ), "input_schema": { "type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"], }, }, { "name": "read_text_file", "description": "Read a text or PDF file attached to the task.", "input_schema": { "type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"], }, }, { "name": "youtube_transcript", "description": ( "Fetch YouTube video transcript. " "If cloud-blocked, use search_web instead." ), "input_schema": { "type": "object", "properties": {"video_url": {"type": "string"}}, "required": ["video_url"], }, }, { "name": "search_web", "description": "Search the web via DuckDuckGo. Returns top result snippets.", "input_schema": { "type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"], }, }, { "name": "fetch_webpage", "description": "Fetch and read the full text of any URL.", "input_schema": { "type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"], }, }, { "name": "fetch_wikipedia", "description": ( "Fetch a Wikipedia article by exact title via REST API. " "Always prefer this over fetch_webpage for Wikipedia." ), "input_schema": { "type": "object", "properties": {"title": {"type": "string"}}, "required": ["title"], }, }, ] def _dispatch(self, fn: str, args: dict, task_id: str, question: str) -> str: if fn == "check_file": return self.tool_check_file(args.get("task_id", task_id)) if fn == "analyse_image": return self.tool_analyse_image( args.get("task_id", task_id), args.get("question", question)) if fn == "run_python_file": return self.tool_run_python_file(args.get("task_id", task_id)) if fn == "read_excel_file": return self.tool_read_excel_file( args.get("task_id", task_id), args.get("question", question)) if fn == "transcribe_audio": return self.tool_transcribe_audio(args.get("task_id", task_id)) if fn == "read_text_file": return self.tool_read_text_file(args.get("task_id", task_id)) if fn == "youtube_transcript": return self.tool_youtube_transcript(args.get("video_url", "")) if fn == "search_web": return self.tool_search_web(args.get("query", "")) if fn == "fetch_webpage": return self.tool_fetch_webpage(args.get("url", "")) if fn == "fetch_wikipedia": return self.tool_fetch_wikipedia(args.get("title", "")) return "Unknown tool." # ── system prompt ───────────────────────────────────────────────────────── SYSTEM = """You are a precise research agent solving GAIA benchmark tasks. MANDATORY WORKFLOW: STEP 1 — Call check_file(task_id) first for every task. • NO_FILE → go to STEP 2. • image file → call analyse_image(task_id, question). • python file → call run_python_file(task_id). Its output IS the answer. • excel/csv file → call read_excel_file(task_id, question). • audio file → call transcribe_audio(task_id), then answer from transcript. • text/pdf file → call read_text_file(task_id), then answer from content. NEVER return "NO_FILE" or tool status strings as your final answer. STEP 2 — Gather information. • YouTube URL → call youtube_transcript(url). If BLOCKED → search_web. • Wikipedia question → fetch_wikipedia("Exact Article Title"). Discography → count ONLY solo studio albums (not collaborations/live/EP). • LibreTexts 1.E → fetch_webpage: https://chem.libretexts.org/Bookshelves/Introductory_Chemistry/Introductory_Chemistry_(LibreTexts)/02%3A_Measurement_and_Problem_Solving/2.E%3A_Measurement_and_Problem_Solving_(Exercises) • Sports stats → search_web then fetch_webpage for exact numbers. • Any other question → search_web, then fetch_webpage for details. STEP 3 — Try at least 2-3 different search queries before concluding. Never say "I was unable to find." Always use tools to find the answer. STEP 4 — Final answer: ONLY the value. No explanation. No preamble. Numbers: just digits. Names: just the name. Lists: comma-separated.""" # ── main call ───────────────────────────────────────────────────────────── def __call__(self, question: str, task_id: str = "") -> str: print(f"▶ Task {task_id[:8]}: {question[:80]}") messages = [ { "role": "user", "content": f"task_id: {task_id}\n\nTask: {question}", }, ] bad_phrases = ( "no_file", "file_exists", "i was unable", "i couldn't", "i can't access", "please provide", "you might want", "i'm unable", "i cannot", "i am unable", ) for _round in range(10): try: resp = self.anthropic_client.messages.create( model=self.model, max_tokens=1500, system=self.SYSTEM, tools=self.TOOLS, messages=messages, ) except Exception as e: print(f" Anthropic API error: {e}") return "Error." # Check stop reason stop_reason = resp.stop_reason # Collect text and tool use blocks tool_uses = [b for b in resp.content if b.type == "tool_use"] text_blocks = [b for b in resp.content if b.type == "text"] # Append assistant message messages.append({"role": "assistant", "content": resp.content}) if stop_reason == "end_turn" or not tool_uses: # Final answer answer = text_blocks[0].text.strip() if text_blocks else "" if any(b in answer.lower() for b in bad_phrases): messages.append({ "role": "user", "content": ( "That is not acceptable. Use your tools to find the " "real answer. Return ONLY the final value." ), }) continue return answer # Execute tool calls and collect results tool_results = [] for tb in tool_uses: fn = tb.name args = tb.input if isinstance(tb.input, dict) else {} result = self._dispatch(fn, args, task_id, question) print(f" {fn} → {str(result)[:80]}") tool_results.append({ "type": "tool_result", "tool_use_id": tb.id, "content": result or "Empty result.", }) messages.append({"role": "user", "content": tool_results}) # Force final answer after max rounds try: messages.append({ "role": "user", "content": "Final answer only — just the value, no explanation.", }) resp = self.anthropic_client.messages.create( model=self.model, max_tokens=100, system=self.SYSTEM, messages=messages, ) text_blocks = [b for b in resp.content if b.type == "text"] return text_blocks[0].text.strip() if text_blocks else "Error." except Exception: return "Error." # ── Gradio UI ───────────────────────────────────────────────────────────────── def run_and_submit_all(profile: gr.OAuthProfile | None): if not profile: return "Please login to Hugging Face first.", None username = profile.username space_id = os.getenv("SPACE_ID", "") api_url = DEFAULT_API_URL try: agent = BasicAgent() except Exception as e: return f"Init failed: {e}", None try: qs = requests.get(f"{api_url}/questions", timeout=15) qs.raise_for_status() questions_data = qs.json() except Exception as e: return f"Error fetching questions: {e}", None results_log, answers_payload = [], [] for item in questions_data: task_id = item.get("task_id", "") question_text = item.get("question", "") try: answer = agent(question_text, task_id=task_id) except Exception as e: answer = f"Error: {e}" print(f" → {answer[:60]}") answers_payload.append({"task_id": task_id, "submitted_answer": answer}) results_log.append({ "Task ID": task_id, "Question": question_text[:120], "Answer": answer, }) try: r = requests.post( f"{api_url}/submit", json={ "username": username.strip(), "agent_code": f"https://huggingface.co/spaces/{space_id}/tree/main", "answers": answers_payload, }, timeout=60, ) r.raise_for_status() res = r.json() status = ( f"✅ Submitted!\n" f"Score: {res.get('score')}% " f"({res.get('correct_count')}/{res.get('total_attempted')})\n" f"Message: {res.get('message')}" ) except Exception as e: status = f"Submission failed: {e}" return status, pd.DataFrame(results_log) with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 🤖 GAIA Agent — Claude Sonnet") gr.Markdown( f"**LLM:** `claude-sonnet-4-20250514` (Anthropic API) \n" "**Vision:** Claude native vision \n" "**ASR:** `openai/whisper-large-v3` (HF)" ) gr.LoginButton() run_button = gr.Button("🚀 Run Evaluation & Submit", variant="primary") status_output = gr.Textbox(label="Status", lines=5) results_table = gr.DataFrame(label="Results") run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) if __name__ == "__main__": demo.launch()