Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import json | |
| # Load .env file if present (local development) | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except ImportError: | |
| pass | |
| import re | |
| import base64 | |
| from io import StringIO | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| from huggingface_hub import InferenceClient | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # --- Tool Functions --- | |
| def web_search(query: str, max_results: int = 5) -> str: | |
| """Search the web using DuckDuckGo.""" | |
| try: | |
| from ddgs import DDGS | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=max_results)) | |
| if not results: | |
| return "No search results found." | |
| output = [] | |
| for r in results: | |
| output.append( | |
| f"Title: {r.get('title', '')}\n" | |
| f"URL: {r.get('href', '')}\n" | |
| f"Snippet: {r.get('body', '')}" | |
| ) | |
| return "\n\n".join(output) | |
| except Exception as e: | |
| return f"Search error: {e}" | |
| def visit_webpage(url: str) -> str: | |
| """Fetch and return text content of a webpage.""" | |
| try: | |
| headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} | |
| response = requests.get(url, headers=headers, timeout=15) | |
| response.raise_for_status() | |
| try: | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| for tag in soup(["script", "style", "nav", "footer", "header"]): | |
| tag.decompose() | |
| text = soup.get_text(separator=" ", strip=True) | |
| except ImportError: | |
| text = re.sub(r"<[^>]+>", " ", response.text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text[:12000] | |
| except Exception as e: | |
| return f"Error visiting webpage: {e}" | |
| def wikipedia_search(query: str) -> str: | |
| """Search Wikipedia for information about a topic.""" | |
| try: | |
| # Try direct page summary | |
| encoded = requests.utils.quote(query.replace(" ", "_")) | |
| url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{encoded}" | |
| resp = requests.get(url, timeout=10) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| extract = data.get("extract", "") | |
| if extract: | |
| return f"{data.get('title', '')}: {extract}" | |
| # Fallback: use search API | |
| search_url = "https://en.wikipedia.org/w/api.php" | |
| params = { | |
| "action": "query", "list": "search", | |
| "srsearch": query, "format": "json", | |
| "srlimit": 3, "srprop": "snippet", | |
| } | |
| resp = requests.get(search_url, params=params, timeout=10) | |
| if not resp.content: | |
| return "No Wikipedia results found." | |
| try: | |
| data = resp.json() | |
| except Exception: | |
| return "No Wikipedia results found." | |
| results = data.get("query", {}).get("search", []) | |
| if not results: | |
| return "No Wikipedia results found." | |
| # Get summary of first result | |
| title = results[0].get("title", "") | |
| encoded2 = requests.utils.quote(title.replace(" ", "_")) | |
| resp2 = requests.get( | |
| f"https://en.wikipedia.org/api/rest_v1/page/summary/{encoded2}", timeout=10 | |
| ) | |
| if resp2.status_code == 200 and resp2.content: | |
| try: | |
| d = resp2.json() | |
| return f"{d.get('title', '')}: {d.get('extract', '')}" | |
| except Exception: | |
| pass | |
| return "\n".join(r.get("snippet", "") for r in results) | |
| except Exception as e: | |
| return f"Wikipedia error: {e}" | |
| def python_interpreter(code: str) -> str: | |
| """Execute Python code and return its printed output.""" | |
| old_stdout = sys.stdout | |
| sys.stdout = buffer = StringIO() | |
| try: | |
| exec_globals: dict = {} | |
| exec(code, exec_globals) # noqa: S102 | |
| output = buffer.getvalue() | |
| return output if output else "Executed successfully (no output)." | |
| except Exception as e: | |
| return f"Error: {type(e).__name__}: {e}" | |
| finally: | |
| sys.stdout = old_stdout | |
| def download_task_file(task_id: str) -> str: | |
| """Download the file associated with a task and return its content.""" | |
| try: | |
| url = f"{DEFAULT_API_URL}/files/{task_id}" | |
| resp = requests.get(url, timeout=30) | |
| resp.raise_for_status() | |
| content_type = resp.headers.get("content-type", "") | |
| filename = "" | |
| if "content-disposition" in resp.headers: | |
| cd = resp.headers["content-disposition"] | |
| m = re.search(r'filename=["\']?([^"\';\n]+)', cd) | |
| if m: | |
| filename = m.group(1).strip() | |
| # Determine type by content-type or filename extension | |
| is_csv = "text/csv" in content_type or filename.endswith(".csv") | |
| is_excel = filename.endswith((".xlsx", ".xls")) or "spreadsheet" in content_type | |
| is_image = "image/" in content_type or filename.endswith( | |
| (".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp") | |
| ) | |
| is_python = filename.endswith(".py") | |
| if is_image: | |
| media_type = content_type.split(";")[0].strip() or "image/png" | |
| img_b64 = base64.b64encode(resp.content).decode() | |
| # Special prefix parsed by the agent to pass as vision content | |
| return f"IMAGE:{media_type}:{img_b64}" | |
| if is_csv: | |
| try: | |
| import io | |
| df = pd.read_csv(io.StringIO(resp.text)) | |
| return ( | |
| f"CSV file: {len(df)} rows × {len(df.columns)} columns.\n" | |
| f"Columns: {list(df.columns)}\n\n" | |
| f"{df.head(20).to_string()}" | |
| ) | |
| except Exception: | |
| return resp.text[:5000] | |
| if is_excel: | |
| try: | |
| import io | |
| df = pd.read_excel(io.BytesIO(resp.content)) | |
| return ( | |
| f"Excel file: {len(df)} rows × {len(df.columns)} columns.\n" | |
| f"Columns: {list(df.columns)}\n\n" | |
| f"{df.head(20).to_string()}" | |
| ) | |
| except Exception as e: | |
| return f"Excel file could not be parsed: {e}" | |
| is_audio = filename.endswith((".mp3", ".wav", ".ogg", ".flac", ".m4a")) or "audio/" in content_type | |
| if is_audio: | |
| try: | |
| asr_client = InferenceClient(api_key=os.environ["HF_TOKEN"]) | |
| transcript = asr_client.automatic_speech_recognition( | |
| audio=resp.content, | |
| model="openai/whisper-large-v3", | |
| ) | |
| text_result = transcript.text if hasattr(transcript, "text") else str(transcript) | |
| return f"Audio transcript:\n{text_result}" | |
| except Exception as e: | |
| return f"Audio file (transcription failed: {e}). File size: {len(resp.content)} bytes." | |
| if is_python: | |
| return f"Python file:\n```python\n{resp.text[:4000]}\n```" | |
| # Default: try to decode as text | |
| try: | |
| return resp.content.decode("utf-8")[:6000] | |
| except Exception: | |
| return f"Binary file ({len(resp.content)} bytes, type: {content_type})" | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 404: | |
| return "No file associated with this task." | |
| return f"Error downloading file: {e}" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| # --- Agent Definition --- | |
| class GAIAAgent: | |
| """ | |
| ReAct-style agent using plain chat completions (no native tool-calling API). | |
| Works with any instruction-following model on HF's free serverless inference. | |
| """ | |
| SYSTEM_PROMPT = """You are an expert AI assistant solving questions from the GAIA benchmark. | |
| You have access to these tools: | |
| - web_search(query): Search the web via DuckDuckGo for current facts, people, events, statistics. | |
| - visit_webpage(url): Fetch and read the text content of a specific webpage. | |
| - wikipedia_search(query): Search Wikipedia for background information on a topic. | |
| - python_interpreter(code): Execute Python code. Always use print() to output results. | |
| - download_task_file(task_id): Download the file attached to the current task (image, CSV, Excel, text, etc.). | |
| Use this EXACT format for every step: | |
| Thought: [your reasoning] | |
| Action: [tool_name] | |
| Action Input: {"key": "value"} | |
| After receiving the Observation, continue with more Thought/Action steps. | |
| When you have the final answer, write: | |
| Thought: I now know the final answer. | |
| Final Answer: [exact answer] | |
| Important rules: | |
| - "Final Answer:" must contain ONLY the bare answer — no explanation, no "FINAL ANSWER:" prefix. | |
| - Numbers: exact format as requested (integer, decimal, etc.). | |
| - Names: exact spelling as they appear in authoritative sources. | |
| - Lists: comma-separated values unless another format is specified. | |
| - Always use a tool to verify facts rather than relying on memory. | |
| - YouTube URLs cannot be visited directly; use web_search to find information about YouTube video content instead.""" | |
| MODEL = "moonshotai/Kimi-K2.5:cheapest" | |
| def __init__(self): | |
| self.client = InferenceClient( | |
| api_key=os.environ["HF_TOKEN"], | |
| ) | |
| print("GAIAAgent initialized.") | |
| def _strip_think(text: str) -> str: | |
| """Remove <think>…</think> reasoning blocks (DeepSeek-R1 / o1-style).""" | |
| return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip() | |
| def _run_tool(self, name: str, tool_input: dict) -> str: | |
| """Execute a named tool and return its result as a string.""" | |
| import time | |
| t0 = time.time() | |
| try: | |
| if name == "web_search": | |
| query = tool_input.get("query", "") | |
| if not query: | |
| return "Error: 'query' parameter is required." | |
| return web_search(query) | |
| if name == "visit_webpage": | |
| url = tool_input.get("url", "") | |
| if not url or not url.startswith("http"): | |
| print(f" [TOOL ERROR] visit_webpage called with invalid url: {url!r}") | |
| return "Error: valid 'url' parameter is required." | |
| return visit_webpage(url) | |
| if name == "wikipedia_search": | |
| query = tool_input.get("query", "") | |
| if not query: | |
| return "Error: 'query' parameter is required." | |
| return wikipedia_search(query) | |
| if name == "python_interpreter": | |
| code = tool_input.get("code", "") | |
| if not code: | |
| print(f" [TOOL ERROR] python_interpreter called with empty code. Full input: {tool_input!r}") | |
| return "Error: 'code' parameter is required." | |
| return python_interpreter(code) | |
| if name == "download_task_file": | |
| return download_task_file(tool_input.get("task_id", "")) | |
| print(f" [TOOL ERROR] Unknown tool called: {name!r}") | |
| return f"Unknown tool: {name}" | |
| except Exception as e: | |
| print(f" [TOOL EXCEPTION] {name} raised {type(e).__name__}: {e}") | |
| return f"Tool error: {e}" | |
| finally: | |
| print(f" [TOOL TIMING] {name} completed in {time.time() - t0:.2f}s") | |
| def _extract_json(text: str, start: int) -> dict: | |
| """ | |
| Extract a JSON object starting at `start` (which must be '{') by | |
| counting braces — handles nested dicts/code strings safely. | |
| """ | |
| depth = 0 | |
| in_string = False | |
| escape = False | |
| for i in range(start, len(text)): | |
| ch = text[i] | |
| if escape: | |
| escape = False | |
| continue | |
| if ch == "\\" and in_string: | |
| escape = True | |
| continue | |
| if ch == '"': | |
| in_string = not in_string | |
| continue | |
| if in_string: | |
| continue | |
| if ch == "{": | |
| depth += 1 | |
| elif ch == "}": | |
| depth -= 1 | |
| if depth == 0: | |
| raw = text[start : i + 1] | |
| try: | |
| return json.loads(raw) | |
| except json.JSONDecodeError as e: | |
| print(f" [PARSE ERROR] JSON decode failed: {e} | raw={raw[:200]!r}") | |
| return {} | |
| print(f" [PARSE ERROR] Unmatched braces — no closing '}}' found from pos {start}") | |
| return {} | |
| def _parse_action(self, text: str): | |
| """ | |
| Return (tool_name, tool_input_dict) for the last Action block in text, | |
| or (None, None) if none is found. | |
| """ | |
| action_matches = list(re.finditer(r"Action:\s*(\w+)", text)) | |
| if not action_matches: | |
| return None, None | |
| tool_name = action_matches[-1].group(1).strip() | |
| tool_input: dict = {} | |
| ai_matches = list(re.finditer(r"Action Input:\s*", text)) | |
| if not ai_matches: | |
| print(f" [PARSE WARN] Action '{tool_name}' found but no 'Action Input:' block.") | |
| else: | |
| pos = ai_matches[-1].end() | |
| if pos < len(text) and text[pos] == "{": | |
| tool_input = self._extract_json(text, pos) | |
| if not tool_input: | |
| print(f" [PARSE WARN] Action Input for '{tool_name}' parsed as empty dict.") | |
| else: | |
| snippet = text[pos : pos + 80].replace("\n", "\\n") | |
| print(f" [PARSE WARN] Action Input for '{tool_name}' does not start with '{{': {snippet!r}") | |
| return tool_name, tool_input | |
| def __call__(self, question: str, task_id: str = None) -> str: | |
| import time | |
| print(f"\nAgent processing task {task_id}: {question[:80]}...") | |
| user_content = f"Task ID: {task_id}\n\nQuestion: {question}" if task_id else question | |
| messages = [ | |
| {"role": "system", "content": self.SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_content}, | |
| ] | |
| for iteration in range(20): | |
| t_llm = time.time() | |
| response = None | |
| for attempt in range(3): | |
| try: | |
| response = self.client.chat.completions.create( | |
| model=self.MODEL, | |
| messages=messages, | |
| max_tokens=4096, | |
| temperature=0.1, | |
| ) | |
| break | |
| except Exception as e: | |
| is_retryable = any(code in str(e) for code in ("504", "502", "503", "429")) | |
| print(f" [{iteration}] [LLM ERROR attempt {attempt+1}/3] {type(e).__name__}: {str(e)[:120]}") | |
| if is_retryable and attempt < 2: | |
| wait = 15 * (attempt + 1) | |
| print(f" [{iteration}] Retrying in {wait}s...") | |
| time.sleep(wait) | |
| else: | |
| raise | |
| if response is None: | |
| raise RuntimeError("LLM returned no response after retries") | |
| llm_elapsed = time.time() - t_llm | |
| raw_output = (response.choices[0].message.content or "").strip() | |
| think_stripped = len(raw_output) - len(self._strip_think(raw_output)) | |
| output = self._strip_think(raw_output) | |
| usage = response.usage | |
| print( | |
| f" [{iteration}] LLM {llm_elapsed:.1f}s | " | |
| f"tokens in={getattr(usage, 'prompt_tokens', '?')} " | |
| f"out={getattr(usage, 'completion_tokens', '?')} | " | |
| f"think_stripped={think_stripped}chars" | |
| ) | |
| print(f" [{iteration}] Model output: {output[:300]}{'...' if len(output) > 300 else ''}") | |
| # ── Final answer found (must be at line start, not inside code/JSON) ── | |
| fa_match = re.search(r"(?:^|\n)Final Answer:\s*(.+?)(?:\n|$)", output) | |
| if fa_match: | |
| answer = fa_match.group(1).strip() | |
| print(f" [{iteration}] => Final Answer: {answer!r}") | |
| return answer | |
| # ── Tool call found ── | |
| tool_name, tool_input = self._parse_action(output) | |
| if tool_name: | |
| print(f" [{iteration}] Tool call: {tool_name}({json.dumps(tool_input)[:200]})") | |
| result = self._run_tool(tool_name, tool_input) | |
| result_preview = result[:200].replace("\n", " ") | |
| print(f" [{iteration}] Tool result ({len(result)} chars): {result_preview}{'...' if len(result) > 200 else ''}") | |
| messages.append({"role": "assistant", "content": raw_output}) | |
| if result.startswith("IMAGE:"): | |
| parts = result.split(":", 2) | |
| media_type, img_b64 = parts[1], parts[2] | |
| print(f" [{iteration}] Image received: type={media_type}, size={len(img_b64)} b64 chars") | |
| messages.append({ | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": "Observation: Here is the downloaded image. Analyse it to answer the question."}, | |
| {"type": "image_url", "image_url": {"url": f"data:{media_type};base64,{img_b64}"}}, | |
| ], | |
| }) | |
| else: | |
| messages.append({ | |
| "role": "user", | |
| "content": f"Observation: {result[:6000]}", | |
| }) | |
| else: | |
| print(f" [{iteration}] No tool call and no Final Answer — prompting model to conclude.") | |
| messages.append({"role": "assistant", "content": raw_output}) | |
| messages.append({ | |
| "role": "user", | |
| "content": ( | |
| "You haven't provided a Final Answer yet. " | |
| "Please conclude with:\nFinal Answer: [answer]" | |
| ), | |
| }) | |
| print(f" [MAX ITERATIONS] Reached iteration limit for task {task_id}.") | |
| return "Unable to determine answer." | |
| # --- Gradio App --- | |
| def run_and_submit_all(profile: gr.OAuthProfile | None): | |
| """ | |
| Fetches all questions, runs the GAIAAgent on them, submits all answers, | |
| and displays the results. | |
| """ | |
| space_id = os.getenv("SPACE_ID") | |
| if profile: | |
| username = profile.username | |
| print(f"User logged in: {username}") | |
| else: | |
| print("User not logged in.") | |
| return "Please Login to Hugging Face with the button.", None | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| submit_url = f"{api_url}/submit" | |
| # 1. Instantiate Agent | |
| try: | |
| agent = GAIAAgent() | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| return f"Error initializing agent: {e}", None | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| print(agent_code) | |
| # 2. Fetch Questions | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except requests.exceptions.RequestException as e: | |
| return f"Error fetching questions: {e}", None | |
| except Exception as e: | |
| return f"An unexpected error occurred fetching questions: {e}", None | |
| # 3. Run Agent | |
| results_log = [] | |
| answers_payload = [] | |
| print(f"Running agent on {len(questions_data)} questions...") | |
| for item in questions_data: | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| if not task_id or question_text is None: | |
| print(f"Skipping item with missing task_id or question: {item}") | |
| continue | |
| try: | |
| submitted_answer = agent(question_text, task_id=task_id) | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append({ | |
| "Task ID": task_id, | |
| "Question": question_text, | |
| "Submitted Answer": submitted_answer, | |
| }) | |
| except Exception as e: | |
| print(f"Error running agent on task {task_id}: {e}") | |
| results_log.append({ | |
| "Task ID": task_id, | |
| "Question": question_text, | |
| "Submitted Answer": f"AGENT ERROR: {e}", | |
| }) | |
| if not answers_payload: | |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
| # 4. Submit | |
| submission_data = { | |
| "username": username.strip(), | |
| "agent_code": agent_code, | |
| "answers": answers_payload, | |
| } | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| print("Submission successful.") | |
| return final_status, pd.DataFrame(results_log) | |
| except requests.exceptions.HTTPError as e: | |
| error_detail = f"Server responded with status {e.response.status_code}." | |
| try: | |
| error_json = e.response.json() | |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
| except Exception: | |
| error_detail += f" Response: {e.response.text[:500]}" | |
| status_message = f"Submission Failed: {error_detail}" | |
| print(status_message) | |
| return status_message, pd.DataFrame(results_log) | |
| except requests.exceptions.Timeout: | |
| status_message = "Submission Failed: The request timed out." | |
| print(status_message) | |
| return status_message, pd.DataFrame(results_log) | |
| except requests.exceptions.RequestException as e: | |
| status_message = f"Submission Failed: Network error - {e}" | |
| print(status_message) | |
| return status_message, pd.DataFrame(results_log) | |
| except Exception as e: | |
| status_message = f"An unexpected error occurred during submission: {e}" | |
| print(status_message) | |
| return status_message, pd.DataFrame(results_log) | |
| # --- Build Gradio Interface --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# GAIA Agent Evaluation Runner") | |
| gr.Markdown( | |
| f""" | |
| **Instructions:** | |
| 1. Log in to your Hugging Face account using the button below. | |
| 2. Click **Run Evaluation & Submit All Answers** to fetch questions, run the agent, submit answers, and see the score. | |
| --- | |
| **Notes:** | |
| - The agent uses models via HF InferenceClient (provider=auto) with a ReAct loop: web search, Wikipedia, Python interpreter, and file download tools. | |
| - Targets ≥30% on GAIA level-1 questions. | |
| - Submission can take several minutes while the agent processes each question. | |
| """ | |
| ) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Evaluation & Submit All Answers") | |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
| run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) | |
| if __name__ == "__main__": | |
| print("\n" + "-" * 30 + " App Starting " + "-" * 30) | |
| space_host_startup = os.getenv("SPACE_HOST") | |
| space_id_startup = os.getenv("SPACE_ID") | |
| if space_host_startup: | |
| print(f"✅ SPACE_HOST found: {space_host_startup}") | |
| print(f" Runtime URL should be: https://{space_host_startup}") | |
| else: | |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
| if space_id_startup: | |
| print(f"✅ SPACE_ID found: {space_id_startup}") | |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
| else: | |
| print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
| print("-" * (60 + len(" App Starting ")) + "\n") | |
| print("Launching Gradio Interface for GAIA Agent Evaluation...") | |
| demo.launch(debug=True, share=False) | |