import os import sys import json # Load .env file if present (local development) try: from dotenv import load_dotenv load_dotenv() except ImportError: pass import re import base64 from io import StringIO import gradio as gr import requests import pandas as pd from huggingface_hub import InferenceClient # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # --- Tool Functions --- def web_search(query: str, max_results: int = 5) -> str: """Search the web using DuckDuckGo.""" try: from ddgs import DDGS with DDGS() as ddgs: results = list(ddgs.text(query, max_results=max_results)) if not results: return "No search results found." output = [] for r in results: output.append( f"Title: {r.get('title', '')}\n" f"URL: {r.get('href', '')}\n" f"Snippet: {r.get('body', '')}" ) return "\n\n".join(output) except Exception as e: return f"Search error: {e}" def visit_webpage(url: str) -> str: """Fetch and return text content of a webpage.""" try: headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} response = requests.get(url, headers=headers, timeout=15) response.raise_for_status() try: from bs4 import BeautifulSoup soup = BeautifulSoup(response.text, "html.parser") for tag in soup(["script", "style", "nav", "footer", "header"]): tag.decompose() text = soup.get_text(separator=" ", strip=True) except ImportError: text = re.sub(r"<[^>]+>", " ", response.text) text = re.sub(r"\s+", " ", text).strip() return text[:12000] except Exception as e: return f"Error visiting webpage: {e}" def wikipedia_search(query: str) -> str: """Search Wikipedia for information about a topic.""" try: # Try direct page summary encoded = requests.utils.quote(query.replace(" ", "_")) url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{encoded}" resp = requests.get(url, timeout=10) if resp.status_code == 200: data = resp.json() extract = data.get("extract", "") if extract: return f"{data.get('title', '')}: {extract}" # Fallback: use search API search_url = "https://en.wikipedia.org/w/api.php" params = { "action": "query", "list": "search", "srsearch": query, "format": "json", "srlimit": 3, "srprop": "snippet", } resp = requests.get(search_url, params=params, timeout=10) if not resp.content: return "No Wikipedia results found." try: data = resp.json() except Exception: return "No Wikipedia results found." results = data.get("query", {}).get("search", []) if not results: return "No Wikipedia results found." # Get summary of first result title = results[0].get("title", "") encoded2 = requests.utils.quote(title.replace(" ", "_")) resp2 = requests.get( f"https://en.wikipedia.org/api/rest_v1/page/summary/{encoded2}", timeout=10 ) if resp2.status_code == 200 and resp2.content: try: d = resp2.json() return f"{d.get('title', '')}: {d.get('extract', '')}" except Exception: pass return "\n".join(r.get("snippet", "") for r in results) except Exception as e: return f"Wikipedia error: {e}" def python_interpreter(code: str) -> str: """Execute Python code and return its printed output.""" old_stdout = sys.stdout sys.stdout = buffer = StringIO() try: exec_globals: dict = {} exec(code, exec_globals) # noqa: S102 output = buffer.getvalue() return output if output else "Executed successfully (no output)." except Exception as e: return f"Error: {type(e).__name__}: {e}" finally: sys.stdout = old_stdout def download_task_file(task_id: str) -> str: """Download the file associated with a task and return its content.""" try: url = f"{DEFAULT_API_URL}/files/{task_id}" resp = requests.get(url, timeout=30) resp.raise_for_status() content_type = resp.headers.get("content-type", "") filename = "" if "content-disposition" in resp.headers: cd = resp.headers["content-disposition"] m = re.search(r'filename=["\']?([^"\';\n]+)', cd) if m: filename = m.group(1).strip() # Determine type by content-type or filename extension is_csv = "text/csv" in content_type or filename.endswith(".csv") is_excel = filename.endswith((".xlsx", ".xls")) or "spreadsheet" in content_type is_image = "image/" in content_type or filename.endswith( (".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp") ) is_python = filename.endswith(".py") if is_image: media_type = content_type.split(";")[0].strip() or "image/png" img_b64 = base64.b64encode(resp.content).decode() # Special prefix parsed by the agent to pass as vision content return f"IMAGE:{media_type}:{img_b64}" if is_csv: try: import io df = pd.read_csv(io.StringIO(resp.text)) return ( f"CSV file: {len(df)} rows × {len(df.columns)} columns.\n" f"Columns: {list(df.columns)}\n\n" f"{df.head(20).to_string()}" ) except Exception: return resp.text[:5000] if is_excel: try: import io df = pd.read_excel(io.BytesIO(resp.content)) return ( f"Excel file: {len(df)} rows × {len(df.columns)} columns.\n" f"Columns: {list(df.columns)}\n\n" f"{df.head(20).to_string()}" ) except Exception as e: return f"Excel file could not be parsed: {e}" is_audio = filename.endswith((".mp3", ".wav", ".ogg", ".flac", ".m4a")) or "audio/" in content_type if is_audio: try: asr_client = InferenceClient(api_key=os.environ["HF_TOKEN"]) transcript = asr_client.automatic_speech_recognition( audio=resp.content, model="openai/whisper-large-v3", ) text_result = transcript.text if hasattr(transcript, "text") else str(transcript) return f"Audio transcript:\n{text_result}" except Exception as e: return f"Audio file (transcription failed: {e}). File size: {len(resp.content)} bytes." if is_python: return f"Python file:\n```python\n{resp.text[:4000]}\n```" # Default: try to decode as text try: return resp.content.decode("utf-8")[:6000] except Exception: return f"Binary file ({len(resp.content)} bytes, type: {content_type})" except requests.exceptions.HTTPError as e: if e.response.status_code == 404: return "No file associated with this task." return f"Error downloading file: {e}" except Exception as e: return f"Error: {e}" # --- Agent Definition --- class GAIAAgent: """ ReAct-style agent using plain chat completions (no native tool-calling API). Works with any instruction-following model on HF's free serverless inference. """ SYSTEM_PROMPT = """You are an expert AI assistant solving questions from the GAIA benchmark. You have access to these tools: - web_search(query): Search the web via DuckDuckGo for current facts, people, events, statistics. - visit_webpage(url): Fetch and read the text content of a specific webpage. - wikipedia_search(query): Search Wikipedia for background information on a topic. - python_interpreter(code): Execute Python code. Always use print() to output results. - download_task_file(task_id): Download the file attached to the current task (image, CSV, Excel, text, etc.). Use this EXACT format for every step: Thought: [your reasoning] Action: [tool_name] Action Input: {"key": "value"} After receiving the Observation, continue with more Thought/Action steps. When you have the final answer, write: Thought: I now know the final answer. Final Answer: [exact answer] Important rules: - "Final Answer:" must contain ONLY the bare answer — no explanation, no "FINAL ANSWER:" prefix. - Numbers: exact format as requested (integer, decimal, etc.). - Names: exact spelling as they appear in authoritative sources. - Lists: comma-separated values unless another format is specified. - Always use a tool to verify facts rather than relying on memory. - YouTube URLs cannot be visited directly; use web_search to find information about YouTube video content instead.""" MODEL = "moonshotai/Kimi-K2.5:cheapest" def __init__(self): self.client = InferenceClient( api_key=os.environ["HF_TOKEN"], ) print("GAIAAgent initialized.") @staticmethod def _strip_think(text: str) -> str: """Remove reasoning blocks (DeepSeek-R1 / o1-style).""" return re.sub(r".*?", "", text, flags=re.DOTALL).strip() def _run_tool(self, name: str, tool_input: dict) -> str: """Execute a named tool and return its result as a string.""" import time t0 = time.time() try: if name == "web_search": query = tool_input.get("query", "") if not query: return "Error: 'query' parameter is required." return web_search(query) if name == "visit_webpage": url = tool_input.get("url", "") if not url or not url.startswith("http"): print(f" [TOOL ERROR] visit_webpage called with invalid url: {url!r}") return "Error: valid 'url' parameter is required." return visit_webpage(url) if name == "wikipedia_search": query = tool_input.get("query", "") if not query: return "Error: 'query' parameter is required." return wikipedia_search(query) if name == "python_interpreter": code = tool_input.get("code", "") if not code: print(f" [TOOL ERROR] python_interpreter called with empty code. Full input: {tool_input!r}") return "Error: 'code' parameter is required." return python_interpreter(code) if name == "download_task_file": return download_task_file(tool_input.get("task_id", "")) print(f" [TOOL ERROR] Unknown tool called: {name!r}") return f"Unknown tool: {name}" except Exception as e: print(f" [TOOL EXCEPTION] {name} raised {type(e).__name__}: {e}") return f"Tool error: {e}" finally: print(f" [TOOL TIMING] {name} completed in {time.time() - t0:.2f}s") @staticmethod def _extract_json(text: str, start: int) -> dict: """ Extract a JSON object starting at `start` (which must be '{') by counting braces — handles nested dicts/code strings safely. """ depth = 0 in_string = False escape = False for i in range(start, len(text)): ch = text[i] if escape: escape = False continue if ch == "\\" and in_string: escape = True continue if ch == '"': in_string = not in_string continue if in_string: continue if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: raw = text[start : i + 1] try: return json.loads(raw) except json.JSONDecodeError as e: print(f" [PARSE ERROR] JSON decode failed: {e} | raw={raw[:200]!r}") return {} print(f" [PARSE ERROR] Unmatched braces — no closing '}}' found from pos {start}") return {} def _parse_action(self, text: str): """ Return (tool_name, tool_input_dict) for the last Action block in text, or (None, None) if none is found. """ action_matches = list(re.finditer(r"Action:\s*(\w+)", text)) if not action_matches: return None, None tool_name = action_matches[-1].group(1).strip() tool_input: dict = {} ai_matches = list(re.finditer(r"Action Input:\s*", text)) if not ai_matches: print(f" [PARSE WARN] Action '{tool_name}' found but no 'Action Input:' block.") else: pos = ai_matches[-1].end() if pos < len(text) and text[pos] == "{": tool_input = self._extract_json(text, pos) if not tool_input: print(f" [PARSE WARN] Action Input for '{tool_name}' parsed as empty dict.") else: snippet = text[pos : pos + 80].replace("\n", "\\n") print(f" [PARSE WARN] Action Input for '{tool_name}' does not start with '{{': {snippet!r}") return tool_name, tool_input def __call__(self, question: str, task_id: str = None) -> str: import time print(f"\nAgent processing task {task_id}: {question[:80]}...") user_content = f"Task ID: {task_id}\n\nQuestion: {question}" if task_id else question messages = [ {"role": "system", "content": self.SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ] for iteration in range(20): t_llm = time.time() response = None for attempt in range(3): try: response = self.client.chat.completions.create( model=self.MODEL, messages=messages, max_tokens=4096, temperature=0.1, ) break except Exception as e: is_retryable = any(code in str(e) for code in ("504", "502", "503", "429")) print(f" [{iteration}] [LLM ERROR attempt {attempt+1}/3] {type(e).__name__}: {str(e)[:120]}") if is_retryable and attempt < 2: wait = 15 * (attempt + 1) print(f" [{iteration}] Retrying in {wait}s...") time.sleep(wait) else: raise if response is None: raise RuntimeError("LLM returned no response after retries") llm_elapsed = time.time() - t_llm raw_output = (response.choices[0].message.content or "").strip() think_stripped = len(raw_output) - len(self._strip_think(raw_output)) output = self._strip_think(raw_output) usage = response.usage print( f" [{iteration}] LLM {llm_elapsed:.1f}s | " f"tokens in={getattr(usage, 'prompt_tokens', '?')} " f"out={getattr(usage, 'completion_tokens', '?')} | " f"think_stripped={think_stripped}chars" ) print(f" [{iteration}] Model output: {output[:300]}{'...' if len(output) > 300 else ''}") # ── Final answer found (must be at line start, not inside code/JSON) ── fa_match = re.search(r"(?:^|\n)Final Answer:\s*(.+?)(?:\n|$)", output) if fa_match: answer = fa_match.group(1).strip() print(f" [{iteration}] => Final Answer: {answer!r}") return answer # ── Tool call found ── tool_name, tool_input = self._parse_action(output) if tool_name: print(f" [{iteration}] Tool call: {tool_name}({json.dumps(tool_input)[:200]})") result = self._run_tool(tool_name, tool_input) result_preview = result[:200].replace("\n", " ") print(f" [{iteration}] Tool result ({len(result)} chars): {result_preview}{'...' if len(result) > 200 else ''}") messages.append({"role": "assistant", "content": raw_output}) if result.startswith("IMAGE:"): parts = result.split(":", 2) media_type, img_b64 = parts[1], parts[2] print(f" [{iteration}] Image received: type={media_type}, size={len(img_b64)} b64 chars") messages.append({ "role": "user", "content": [ {"type": "text", "text": "Observation: Here is the downloaded image. Analyse it to answer the question."}, {"type": "image_url", "image_url": {"url": f"data:{media_type};base64,{img_b64}"}}, ], }) else: messages.append({ "role": "user", "content": f"Observation: {result[:6000]}", }) else: print(f" [{iteration}] No tool call and no Final Answer — prompting model to conclude.") messages.append({"role": "assistant", "content": raw_output}) messages.append({ "role": "user", "content": ( "You haven't provided a Final Answer yet. " "Please conclude with:\nFinal Answer: [answer]" ), }) print(f" [MAX ITERATIONS] Reached iteration limit for task {task_id}.") return "Unable to determine answer." # --- Gradio App --- def run_and_submit_all(profile: gr.OAuthProfile | None): """ Fetches all questions, runs the GAIAAgent on them, submits all answers, and displays the results. """ space_id = os.getenv("SPACE_ID") if profile: username = profile.username print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" # 1. Instantiate Agent try: agent = GAIAAgent() except Exception as e: print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", None agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" print(agent_code) # 2. Fetch Questions print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") except requests.exceptions.RequestException as e: return f"Error fetching questions: {e}", None except Exception as e: return f"An unexpected error occurred fetching questions: {e}", None # 3. Run Agent results_log = [] answers_payload = [] print(f"Running agent on {len(questions_data)} questions...") for item in questions_data: task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue try: submitted_answer = agent(question_text, task_id=task_id) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer, }) except Exception as e: print(f"Error running agent on task {task_id}: {e}") results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}", }) if not answers_payload: return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) # 4. Submit submission_data = { "username": username.strip(), "agent_code": agent_code, "answers": answers_payload, } print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") return final_status, pd.DataFrame(results_log) except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except Exception: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" print(status_message) return status_message, pd.DataFrame(results_log) except requests.exceptions.Timeout: status_message = "Submission Failed: The request timed out." print(status_message) return status_message, pd.DataFrame(results_log) except requests.exceptions.RequestException as e: status_message = f"Submission Failed: Network error - {e}" print(status_message) return status_message, pd.DataFrame(results_log) except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) return status_message, pd.DataFrame(results_log) # --- Build Gradio Interface --- with gr.Blocks() as demo: gr.Markdown("# GAIA Agent Evaluation Runner") gr.Markdown( f""" **Instructions:** 1. Log in to your Hugging Face account using the button below. 2. Click **Run Evaluation & Submit All Answers** to fetch questions, run the agent, submit answers, and see the score. --- **Notes:** - The agent uses models via HF InferenceClient (provider=auto) with a ReAct loop: web search, Wikipedia, Python interpreter, and file download tools. - Targets ≥30% on GAIA level-1 questions. - Submission can take several minutes while the agent processes each question. """ ) gr.LoginButton() run_button = gr.Button("Run Evaluation & Submit All Answers") status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) if __name__ == "__main__": print("\n" + "-" * 30 + " App Starting " + "-" * 30) space_host_startup = os.getenv("SPACE_HOST") space_id_startup = os.getenv("SPACE_ID") if space_host_startup: print(f"✅ SPACE_HOST found: {space_host_startup}") print(f" Runtime URL should be: https://{space_host_startup}") else: print("ℹ️ SPACE_HOST environment variable not found (running locally?).") if space_id_startup: print(f"✅ SPACE_ID found: {space_id_startup}") print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") else: print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") print("-" * (60 + len(" App Starting ")) + "\n") print("Launching Gradio Interface for GAIA Agent Evaluation...") demo.launch(debug=True, share=False)