Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import base64 | |
| import subprocess | |
| import tempfile | |
| import requests | |
| import pandas as pd | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| import anthropic | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # ββ helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _strip_html(html: str) -> str: | |
| from html.parser import HTMLParser | |
| class _P(HTMLParser): | |
| def __init__(self): | |
| super().__init__() | |
| self.parts = [] | |
| self._skip = False | |
| self._skip_tags = {"script", "style", "nav", "footer", "head"} | |
| def handle_starttag(self, tag, attrs): | |
| if tag in self._skip_tags: | |
| self._skip = True | |
| def handle_endtag(self, tag): | |
| if tag in self._skip_tags: | |
| self._skip = False | |
| def handle_data(self, data): | |
| if not self._skip and data.strip(): | |
| self.parts.append(data.strip()) | |
| p = _P() | |
| p.feed(html) | |
| return " ".join(p.parts) | |
| # ββ agent βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class BasicAgent: | |
| def __init__(self): | |
| # Use Anthropic API β no HF credits needed | |
| self.anthropic_client = anthropic.Anthropic( | |
| api_key=os.environ.get("ANTHROPIC_API_KEY", "") | |
| ) | |
| self.model = "claude-sonnet-4-20250514" | |
| # Keep HF client only for Whisper ASR (free, no Inference Provider needed) | |
| hf_token = self._get_hf_token() | |
| self.hf_token = hf_token | |
| self.hf_client = InferenceClient(token=hf_token) if hf_token else None | |
| self.api_url = DEFAULT_API_URL | |
| print(f"β Agent initialised with model: {self.model}") | |
| def _get_hf_token(self): | |
| for var in ("HF_TOKEN", "HUGGING_FACE_HUB_TOKEN", "HUGGINGFACE_HUB_TOKEN"): | |
| token = os.getenv(var, "").strip() | |
| if token: | |
| return token | |
| return None | |
| # ββ raw file fetch ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _fetch_file(self, task_id: str): | |
| """Return (bytes, content_type) or (None, '').""" | |
| try: | |
| r = requests.get(f"{self.api_url}/files/{task_id}", timeout=15) | |
| if r.status_code == 200 and r.content: | |
| return r.content, r.headers.get("Content-Type", "") | |
| except Exception: | |
| pass | |
| return None, "" | |
| # ββ tool implementations ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def tool_check_file(self, task_id: str) -> str: | |
| fb, ct = self._fetch_file(task_id) | |
| if not fb: | |
| return "NO_FILE" | |
| ct_clean = ct.split(";")[0].strip().lower() | |
| return ( | |
| f"FILE_EXISTS type={ct_clean} size={len(fb)}_bytes. " | |
| f"Use the right tool: imageβanalyse_image, pythonβrun_python_file, " | |
| f"excel/xlsxβread_excel_file, audioβtranscribe_audio, " | |
| f"text/pdfβread_text_file." | |
| ) | |
| def tool_analyse_image(self, task_id: str, question: str) -> str: | |
| """Analyse image using Claude's vision.""" | |
| fb, ct = self._fetch_file(task_id) | |
| if not fb: | |
| return "No image found." | |
| ct_clean = ct.split(";")[0].strip().lower() | |
| if "image" not in ct_clean: | |
| return f"File is not an image (type={ct_clean})." | |
| b64 = base64.b64encode(fb).decode() | |
| # Map content type to Anthropic media type | |
| media_map = { | |
| "image/jpeg": "image/jpeg", | |
| "image/jpg": "image/jpeg", | |
| "image/png": "image/png", | |
| "image/gif": "image/gif", | |
| "image/webp": "image/webp", | |
| } | |
| media_type = media_map.get(ct_clean, "image/jpeg") | |
| try: | |
| response = self.anthropic_client.messages.create( | |
| model=self.model, | |
| max_tokens=800, | |
| messages=[{ | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "image", | |
| "source": { | |
| "type": "base64", | |
| "media_type": media_type, | |
| "data": b64, | |
| }, | |
| }, | |
| {"type": "text", "text": question}, | |
| ], | |
| }], | |
| ) | |
| return response.content[0].text | |
| except Exception as e: | |
| return f"Vision error: {e}" | |
| def tool_run_python_file(self, task_id: str) -> str: | |
| """Download and execute Python file, return stdout.""" | |
| fb, _ = self._fetch_file(task_id) | |
| if not fb: | |
| return "No file found." | |
| code = fb.decode("utf-8", errors="ignore") | |
| try: | |
| with tempfile.NamedTemporaryFile( | |
| suffix=".py", delete=False, mode="w" | |
| ) as f: | |
| f.write(code) | |
| fname = f.name | |
| result = subprocess.run( | |
| ["python3", fname], | |
| capture_output=True, text=True, timeout=30, | |
| ) | |
| out = result.stdout.strip() | |
| err = result.stderr.strip() | |
| return f"STDOUT:\n{out}" if out else f"STDERR:\n{err}" if err else "No output." | |
| except Exception as e: | |
| return f"Execution error: {e}" | |
| def tool_read_excel_file(self, task_id: str, question: str) -> str: | |
| """Load Excel/CSV and answer a question about it.""" | |
| fb, ct = self._fetch_file(task_id) | |
| if not fb: | |
| return "No file found." | |
| try: | |
| import io | |
| ct_clean = ct.split(";")[0].strip().lower() | |
| df = ( | |
| pd.read_csv(io.BytesIO(fb)) | |
| if ("csv" in ct_clean or "text" in ct_clean) | |
| else pd.read_excel(io.BytesIO(fb)) | |
| ) | |
| preview = df.to_string(max_rows=80, max_cols=20) | |
| return ( | |
| f"SPREADSHEET DATA:\n{preview}\n\n" | |
| f"Answer the following about this data: {question}" | |
| ) | |
| except Exception as e: | |
| return f"Excel read error: {e}" | |
| def tool_transcribe_audio(self, task_id: str) -> str: | |
| """Transcribe audio using HF Whisper (free ASR endpoint).""" | |
| fb, ct = self._fetch_file(task_id) | |
| if not fb: | |
| return "No file found." | |
| try: | |
| ct_clean = ct.split(";")[0].strip().lower() | |
| ext_map = { | |
| "audio/mpeg": ".mp3", "audio/mp3": ".mp3", | |
| "audio/wav": ".wav", "audio/x-wav": ".wav", | |
| "audio/ogg": ".ogg", "audio/flac": ".flac", | |
| "audio/m4a": ".m4a", "audio/mp4": ".mp4", | |
| } | |
| ext = ext_map.get(ct_clean, ".mp3") | |
| with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as f: | |
| f.write(fb) | |
| fname = f.name | |
| if self.hf_client: | |
| asr_client = InferenceClient( | |
| model="openai/whisper-large-v3", | |
| token=self.hf_token, | |
| ) | |
| with open(fname, "rb") as audio_f: | |
| result = asr_client.automatic_speech_recognition(audio_f) | |
| return result.text if hasattr(result, "text") else str(result) | |
| else: | |
| return "No HF token available for audio transcription." | |
| except Exception as e: | |
| return f"Transcription error: {e}" | |
| def tool_read_text_file(self, task_id: str) -> str: | |
| fb, ct = self._fetch_file(task_id) | |
| if not fb: | |
| return "No file found." | |
| try: | |
| ct_clean = ct.split(";")[0].strip().lower() | |
| if "pdf" in ct_clean: | |
| try: | |
| import pdfminer.high_level | |
| import io | |
| return pdfminer.high_level.extract_text(io.BytesIO(fb))[:6000] | |
| except ImportError: | |
| pass | |
| return fb.decode("utf-8", errors="ignore")[:6000] | |
| except Exception as e: | |
| return f"Read error: {e}" | |
| def tool_search_web(self, query: str) -> str: | |
| try: | |
| hdrs = { | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 Chrome/124.0 Safari/537.36" | |
| ) | |
| } | |
| r = requests.get( | |
| "https://html.duckduckgo.com/html/", | |
| params={"q": query}, headers=hdrs, timeout=12, | |
| ) | |
| from html.parser import HTMLParser | |
| class _DDG(HTMLParser): | |
| def __init__(self): | |
| super().__init__() | |
| self.results = [] | |
| self._in = False | |
| self._cur = "" | |
| def handle_starttag(self, tag, attrs): | |
| d = dict(attrs) | |
| if "result__snippet" in d.get("class", ""): | |
| self._in = True | |
| self._cur = "" | |
| def handle_data(self, data): | |
| if self._in: | |
| self._cur += data | |
| def handle_endtag(self, tag): | |
| if self._in: | |
| t = self._cur.strip() | |
| if t: | |
| self.results.append(t) | |
| self._in = False | |
| p = _DDG() | |
| p.feed(r.text) | |
| return "\n\n".join(p.results[:6]) or "No results." | |
| except Exception as e: | |
| return f"Search error: {e}" | |
| def tool_fetch_webpage(self, url: str) -> str: | |
| try: | |
| hdrs = {"User-Agent": "Mozilla/5.0 Chrome/124.0"} | |
| r = requests.get(url, headers=hdrs, timeout=18) | |
| r.raise_for_status() | |
| return _strip_html(r.text)[:8000] | |
| except Exception as e: | |
| return f"Fetch error: {e}" | |
| def tool_fetch_wikipedia(self, title: str) -> str: | |
| try: | |
| slug = requests.utils.quote(title.replace(" ", "_")) | |
| r = requests.get( | |
| f"https://en.wikipedia.org/api/rest_v1/page/summary/{slug}", | |
| timeout=12, | |
| ) | |
| if r.status_code == 200: | |
| return r.json().get("extract", "Not found.") | |
| r2 = requests.get( | |
| "https://en.wikipedia.org/w/api.php", | |
| params={ | |
| "action": "query", "prop": "extracts", | |
| "titles": title, "format": "json", "redirects": 1, | |
| }, | |
| timeout=12, | |
| ) | |
| pages = r2.json().get("query", {}).get("pages", {}) | |
| for page in pages.values(): | |
| text = _strip_html(page.get("extract", "")) | |
| if text: | |
| return text[:7000] | |
| except Exception as e: | |
| return f"Wikipedia error: {e}" | |
| return "Not found." | |
| def tool_youtube_transcript(self, video_url: str) -> str: | |
| try: | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| vid = re.search(r"v=([^&]+)", video_url) | |
| if not vid: | |
| return "Bad URL." | |
| entries = YouTubeTranscriptApi.get_transcript(vid.group(1)) | |
| return " ".join(e["text"] for e in entries)[:6000] | |
| except Exception as e: | |
| err = str(e) | |
| if any(k in err.lower() for k in | |
| ("blocked", "ip", "cloud", "requestblocked", "ipblocked")): | |
| return ( | |
| "BLOCKED: YouTube blocks cloud IPs. " | |
| "Use search_web to find transcript or description of this video." | |
| ) | |
| return f"Transcript error: {err}" | |
| # ββ Anthropic tool definitions ββββββββββββββββββββββββββββββββββββββββββββ | |
| TOOLS = [ | |
| { | |
| "name": "check_file", | |
| "description": ( | |
| "ALWAYS call this first. Checks if a file is attached to the task. " | |
| "Returns NO_FILE or the file type and which tool to use next." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": {"task_id": {"type": "string"}}, | |
| "required": ["task_id"], | |
| }, | |
| }, | |
| { | |
| "name": "analyse_image", | |
| "description": ( | |
| "Analyse an image file attached to the task using vision. " | |
| "Use for chess boards, diagrams, photos, screenshots." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "task_id": {"type": "string"}, | |
| "question": { | |
| "type": "string", | |
| "description": "What to find or answer from the image.", | |
| }, | |
| }, | |
| "required": ["task_id", "question"], | |
| }, | |
| }, | |
| { | |
| "name": "run_python_file", | |
| "description": ( | |
| "Execute the Python file attached to the task and return its output. " | |
| "The stdout IS the answer." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": {"task_id": {"type": "string"}}, | |
| "required": ["task_id"], | |
| }, | |
| }, | |
| { | |
| "name": "read_excel_file", | |
| "description": "Read an Excel or CSV file and answer a question about its data.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "task_id": {"type": "string"}, | |
| "question": {"type": "string"}, | |
| }, | |
| "required": ["task_id", "question"], | |
| }, | |
| }, | |
| { | |
| "name": "transcribe_audio", | |
| "description": ( | |
| "Transcribe an audio file using Whisper. " | |
| "Use for voice memos, recordings, audio questions." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": {"task_id": {"type": "string"}}, | |
| "required": ["task_id"], | |
| }, | |
| }, | |
| { | |
| "name": "read_text_file", | |
| "description": "Read a text or PDF file attached to the task.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": {"task_id": {"type": "string"}}, | |
| "required": ["task_id"], | |
| }, | |
| }, | |
| { | |
| "name": "youtube_transcript", | |
| "description": ( | |
| "Fetch YouTube video transcript. " | |
| "If cloud-blocked, use search_web instead." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": {"video_url": {"type": "string"}}, | |
| "required": ["video_url"], | |
| }, | |
| }, | |
| { | |
| "name": "search_web", | |
| "description": "Search the web via DuckDuckGo. Returns top result snippets.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": {"query": {"type": "string"}}, | |
| "required": ["query"], | |
| }, | |
| }, | |
| { | |
| "name": "fetch_webpage", | |
| "description": "Fetch and read the full text of any URL.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": {"url": {"type": "string"}}, | |
| "required": ["url"], | |
| }, | |
| }, | |
| { | |
| "name": "fetch_wikipedia", | |
| "description": ( | |
| "Fetch a Wikipedia article by exact title via REST API. " | |
| "Always prefer this over fetch_webpage for Wikipedia." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": {"title": {"type": "string"}}, | |
| "required": ["title"], | |
| }, | |
| }, | |
| ] | |
| def _dispatch(self, fn: str, args: dict, task_id: str, question: str) -> str: | |
| if fn == "check_file": | |
| return self.tool_check_file(args.get("task_id", task_id)) | |
| if fn == "analyse_image": | |
| return self.tool_analyse_image( | |
| args.get("task_id", task_id), args.get("question", question)) | |
| if fn == "run_python_file": | |
| return self.tool_run_python_file(args.get("task_id", task_id)) | |
| if fn == "read_excel_file": | |
| return self.tool_read_excel_file( | |
| args.get("task_id", task_id), args.get("question", question)) | |
| if fn == "transcribe_audio": | |
| return self.tool_transcribe_audio(args.get("task_id", task_id)) | |
| if fn == "read_text_file": | |
| return self.tool_read_text_file(args.get("task_id", task_id)) | |
| if fn == "youtube_transcript": | |
| return self.tool_youtube_transcript(args.get("video_url", "")) | |
| if fn == "search_web": | |
| return self.tool_search_web(args.get("query", "")) | |
| if fn == "fetch_webpage": | |
| return self.tool_fetch_webpage(args.get("url", "")) | |
| if fn == "fetch_wikipedia": | |
| return self.tool_fetch_wikipedia(args.get("title", "")) | |
| return "Unknown tool." | |
| # ββ system prompt βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SYSTEM = """You are a precise research agent solving GAIA benchmark tasks. | |
| MANDATORY WORKFLOW: | |
| STEP 1 β Call check_file(task_id) first for every task. | |
| β’ NO_FILE β go to STEP 2. | |
| β’ image file β call analyse_image(task_id, question). | |
| β’ python file β call run_python_file(task_id). Its output IS the answer. | |
| β’ excel/csv file β call read_excel_file(task_id, question). | |
| β’ audio file β call transcribe_audio(task_id), then answer from transcript. | |
| β’ text/pdf file β call read_text_file(task_id), then answer from content. | |
| NEVER return "NO_FILE" or tool status strings as your final answer. | |
| STEP 2 β Gather information. | |
| β’ YouTube URL β call youtube_transcript(url). If BLOCKED β search_web. | |
| β’ Wikipedia question β fetch_wikipedia("Exact Article Title"). | |
| Discography β count ONLY solo studio albums (not collaborations/live/EP). | |
| β’ LibreTexts 1.E β fetch_webpage: | |
| https://chem.libretexts.org/Bookshelves/Introductory_Chemistry/Introductory_Chemistry_(LibreTexts)/02%3A_Measurement_and_Problem_Solving/2.E%3A_Measurement_and_Problem_Solving_(Exercises) | |
| β’ Sports stats β search_web then fetch_webpage for exact numbers. | |
| β’ Any other question β search_web, then fetch_webpage for details. | |
| STEP 3 β Try at least 2-3 different search queries before concluding. | |
| Never say "I was unable to find." Always use tools to find the answer. | |
| STEP 4 β Final answer: ONLY the value. No explanation. No preamble. | |
| Numbers: just digits. Names: just the name. Lists: comma-separated.""" | |
| # ββ main call βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def __call__(self, question: str, task_id: str = "") -> str: | |
| print(f"βΆ Task {task_id[:8]}: {question[:80]}") | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": f"task_id: {task_id}\n\nTask: {question}", | |
| }, | |
| ] | |
| bad_phrases = ( | |
| "no_file", "file_exists", "i was unable", "i couldn't", | |
| "i can't access", "please provide", "you might want", | |
| "i'm unable", "i cannot", "i am unable", | |
| ) | |
| for _round in range(10): | |
| try: | |
| resp = self.anthropic_client.messages.create( | |
| model=self.model, | |
| max_tokens=1500, | |
| system=self.SYSTEM, | |
| tools=self.TOOLS, | |
| messages=messages, | |
| ) | |
| except Exception as e: | |
| print(f" Anthropic API error: {e}") | |
| return "Error." | |
| # Check stop reason | |
| stop_reason = resp.stop_reason | |
| # Collect text and tool use blocks | |
| tool_uses = [b for b in resp.content if b.type == "tool_use"] | |
| text_blocks = [b for b in resp.content if b.type == "text"] | |
| # Append assistant message | |
| messages.append({"role": "assistant", "content": resp.content}) | |
| if stop_reason == "end_turn" or not tool_uses: | |
| # Final answer | |
| answer = text_blocks[0].text.strip() if text_blocks else "" | |
| if any(b in answer.lower() for b in bad_phrases): | |
| messages.append({ | |
| "role": "user", | |
| "content": ( | |
| "That is not acceptable. Use your tools to find the " | |
| "real answer. Return ONLY the final value." | |
| ), | |
| }) | |
| continue | |
| return answer | |
| # Execute tool calls and collect results | |
| tool_results = [] | |
| for tb in tool_uses: | |
| fn = tb.name | |
| args = tb.input if isinstance(tb.input, dict) else {} | |
| result = self._dispatch(fn, args, task_id, question) | |
| print(f" {fn} β {str(result)[:80]}") | |
| tool_results.append({ | |
| "type": "tool_result", | |
| "tool_use_id": tb.id, | |
| "content": result or "Empty result.", | |
| }) | |
| messages.append({"role": "user", "content": tool_results}) | |
| # Force final answer after max rounds | |
| try: | |
| messages.append({ | |
| "role": "user", | |
| "content": "Final answer only β just the value, no explanation.", | |
| }) | |
| resp = self.anthropic_client.messages.create( | |
| model=self.model, | |
| max_tokens=100, | |
| system=self.SYSTEM, | |
| messages=messages, | |
| ) | |
| text_blocks = [b for b in resp.content if b.type == "text"] | |
| return text_blocks[0].text.strip() if text_blocks else "Error." | |
| except Exception: | |
| return "Error." | |
| # ββ Gradio UI βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_and_submit_all(profile: gr.OAuthProfile | None): | |
| if not profile: | |
| return "Please login to Hugging Face first.", None | |
| username = profile.username | |
| space_id = os.getenv("SPACE_ID", "") | |
| api_url = DEFAULT_API_URL | |
| try: | |
| agent = BasicAgent() | |
| except Exception as e: | |
| return f"Init failed: {e}", None | |
| try: | |
| qs = requests.get(f"{api_url}/questions", timeout=15) | |
| qs.raise_for_status() | |
| questions_data = qs.json() | |
| except Exception as e: | |
| return f"Error fetching questions: {e}", None | |
| results_log, answers_payload = [], [] | |
| for item in questions_data: | |
| task_id = item.get("task_id", "") | |
| question_text = item.get("question", "") | |
| try: | |
| answer = agent(question_text, task_id=task_id) | |
| except Exception as e: | |
| answer = f"Error: {e}" | |
| print(f" β {answer[:60]}") | |
| answers_payload.append({"task_id": task_id, "submitted_answer": answer}) | |
| results_log.append({ | |
| "Task ID": task_id, | |
| "Question": question_text[:120], | |
| "Answer": answer, | |
| }) | |
| try: | |
| r = requests.post( | |
| f"{api_url}/submit", | |
| json={ | |
| "username": username.strip(), | |
| "agent_code": f"https://huggingface.co/spaces/{space_id}/tree/main", | |
| "answers": answers_payload, | |
| }, | |
| timeout=60, | |
| ) | |
| r.raise_for_status() | |
| res = r.json() | |
| status = ( | |
| f"β Submitted!\n" | |
| f"Score: {res.get('score')}% " | |
| f"({res.get('correct_count')}/{res.get('total_attempted')})\n" | |
| f"Message: {res.get('message')}" | |
| ) | |
| except Exception as e: | |
| status = f"Submission failed: {e}" | |
| return status, pd.DataFrame(results_log) | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π€ GAIA Agent β Claude Sonnet") | |
| gr.Markdown( | |
| f"**LLM:** `claude-sonnet-4-20250514` (Anthropic API) \n" | |
| "**Vision:** Claude native vision \n" | |
| "**ASR:** `openai/whisper-large-v3` (HF)" | |
| ) | |
| gr.LoginButton() | |
| run_button = gr.Button("π Run Evaluation & Submit", variant="primary") | |
| status_output = gr.Textbox(label="Status", lines=5) | |
| results_table = gr.DataFrame(label="Results") | |
| run_button.click(fn=run_and_submit_all, | |
| outputs=[status_output, results_table]) | |
| if __name__ == "__main__": | |
| demo.launch() |