Gianluca Tessitore
fix some issues
ea66e30
import os
import sys
import json
# Load .env file if present (local development)
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
import re
import base64
from io import StringIO
import gradio as gr
import requests
import pandas as pd
from huggingface_hub import InferenceClient
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- Tool Functions ---
def web_search(query: str, max_results: int = 5) -> str:
"""Search the web using DuckDuckGo."""
try:
from ddgs import DDGS
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
if not results:
return "No search results found."
output = []
for r in results:
output.append(
f"Title: {r.get('title', '')}\n"
f"URL: {r.get('href', '')}\n"
f"Snippet: {r.get('body', '')}"
)
return "\n\n".join(output)
except Exception as e:
return f"Search error: {e}"
def visit_webpage(url: str) -> str:
"""Fetch and return text content of a webpage."""
try:
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator=" ", strip=True)
except ImportError:
text = re.sub(r"<[^>]+>", " ", response.text)
text = re.sub(r"\s+", " ", text).strip()
return text[:12000]
except Exception as e:
return f"Error visiting webpage: {e}"
def wikipedia_search(query: str) -> str:
"""Search Wikipedia for information about a topic."""
try:
# Try direct page summary
encoded = requests.utils.quote(query.replace(" ", "_"))
url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{encoded}"
resp = requests.get(url, timeout=10)
if resp.status_code == 200:
data = resp.json()
extract = data.get("extract", "")
if extract:
return f"{data.get('title', '')}: {extract}"
# Fallback: use search API
search_url = "https://en.wikipedia.org/w/api.php"
params = {
"action": "query", "list": "search",
"srsearch": query, "format": "json",
"srlimit": 3, "srprop": "snippet",
}
resp = requests.get(search_url, params=params, timeout=10)
if not resp.content:
return "No Wikipedia results found."
try:
data = resp.json()
except Exception:
return "No Wikipedia results found."
results = data.get("query", {}).get("search", [])
if not results:
return "No Wikipedia results found."
# Get summary of first result
title = results[0].get("title", "")
encoded2 = requests.utils.quote(title.replace(" ", "_"))
resp2 = requests.get(
f"https://en.wikipedia.org/api/rest_v1/page/summary/{encoded2}", timeout=10
)
if resp2.status_code == 200 and resp2.content:
try:
d = resp2.json()
return f"{d.get('title', '')}: {d.get('extract', '')}"
except Exception:
pass
return "\n".join(r.get("snippet", "") for r in results)
except Exception as e:
return f"Wikipedia error: {e}"
def python_interpreter(code: str) -> str:
"""Execute Python code and return its printed output."""
old_stdout = sys.stdout
sys.stdout = buffer = StringIO()
try:
exec_globals: dict = {}
exec(code, exec_globals) # noqa: S102
output = buffer.getvalue()
return output if output else "Executed successfully (no output)."
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
finally:
sys.stdout = old_stdout
def download_task_file(task_id: str) -> str:
"""Download the file associated with a task and return its content."""
try:
url = f"{DEFAULT_API_URL}/files/{task_id}"
resp = requests.get(url, timeout=30)
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
filename = ""
if "content-disposition" in resp.headers:
cd = resp.headers["content-disposition"]
m = re.search(r'filename=["\']?([^"\';\n]+)', cd)
if m:
filename = m.group(1).strip()
# Determine type by content-type or filename extension
is_csv = "text/csv" in content_type or filename.endswith(".csv")
is_excel = filename.endswith((".xlsx", ".xls")) or "spreadsheet" in content_type
is_image = "image/" in content_type or filename.endswith(
(".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp")
)
is_python = filename.endswith(".py")
if is_image:
media_type = content_type.split(";")[0].strip() or "image/png"
img_b64 = base64.b64encode(resp.content).decode()
# Special prefix parsed by the agent to pass as vision content
return f"IMAGE:{media_type}:{img_b64}"
if is_csv:
try:
import io
df = pd.read_csv(io.StringIO(resp.text))
return (
f"CSV file: {len(df)} rows × {len(df.columns)} columns.\n"
f"Columns: {list(df.columns)}\n\n"
f"{df.head(20).to_string()}"
)
except Exception:
return resp.text[:5000]
if is_excel:
try:
import io
df = pd.read_excel(io.BytesIO(resp.content))
return (
f"Excel file: {len(df)} rows × {len(df.columns)} columns.\n"
f"Columns: {list(df.columns)}\n\n"
f"{df.head(20).to_string()}"
)
except Exception as e:
return f"Excel file could not be parsed: {e}"
is_audio = filename.endswith((".mp3", ".wav", ".ogg", ".flac", ".m4a")) or "audio/" in content_type
if is_audio:
try:
asr_client = InferenceClient(api_key=os.environ["HF_TOKEN"])
transcript = asr_client.automatic_speech_recognition(
audio=resp.content,
model="openai/whisper-large-v3",
)
text_result = transcript.text if hasattr(transcript, "text") else str(transcript)
return f"Audio transcript:\n{text_result}"
except Exception as e:
return f"Audio file (transcription failed: {e}). File size: {len(resp.content)} bytes."
if is_python:
return f"Python file:\n```python\n{resp.text[:4000]}\n```"
# Default: try to decode as text
try:
return resp.content.decode("utf-8")[:6000]
except Exception:
return f"Binary file ({len(resp.content)} bytes, type: {content_type})"
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return "No file associated with this task."
return f"Error downloading file: {e}"
except Exception as e:
return f"Error: {e}"
# --- Agent Definition ---
class GAIAAgent:
"""
ReAct-style agent using plain chat completions (no native tool-calling API).
Works with any instruction-following model on HF's free serverless inference.
"""
SYSTEM_PROMPT = """You are an expert AI assistant solving questions from the GAIA benchmark.
You have access to these tools:
- web_search(query): Search the web via DuckDuckGo for current facts, people, events, statistics.
- visit_webpage(url): Fetch and read the text content of a specific webpage.
- wikipedia_search(query): Search Wikipedia for background information on a topic.
- python_interpreter(code): Execute Python code. Always use print() to output results.
- download_task_file(task_id): Download the file attached to the current task (image, CSV, Excel, text, etc.).
Use this EXACT format for every step:
Thought: [your reasoning]
Action: [tool_name]
Action Input: {"key": "value"}
After receiving the Observation, continue with more Thought/Action steps.
When you have the final answer, write:
Thought: I now know the final answer.
Final Answer: [exact answer]
Important rules:
- "Final Answer:" must contain ONLY the bare answer — no explanation, no "FINAL ANSWER:" prefix.
- Numbers: exact format as requested (integer, decimal, etc.).
- Names: exact spelling as they appear in authoritative sources.
- Lists: comma-separated values unless another format is specified.
- Always use a tool to verify facts rather than relying on memory.
- YouTube URLs cannot be visited directly; use web_search to find information about YouTube video content instead."""
MODEL = "moonshotai/Kimi-K2.5:cheapest"
def __init__(self):
self.client = InferenceClient(
api_key=os.environ["HF_TOKEN"],
)
print("GAIAAgent initialized.")
@staticmethod
def _strip_think(text: str) -> str:
"""Remove <think>…</think> reasoning blocks (DeepSeek-R1 / o1-style)."""
return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()
def _run_tool(self, name: str, tool_input: dict) -> str:
"""Execute a named tool and return its result as a string."""
import time
t0 = time.time()
try:
if name == "web_search":
query = tool_input.get("query", "")
if not query:
return "Error: 'query' parameter is required."
return web_search(query)
if name == "visit_webpage":
url = tool_input.get("url", "")
if not url or not url.startswith("http"):
print(f" [TOOL ERROR] visit_webpage called with invalid url: {url!r}")
return "Error: valid 'url' parameter is required."
return visit_webpage(url)
if name == "wikipedia_search":
query = tool_input.get("query", "")
if not query:
return "Error: 'query' parameter is required."
return wikipedia_search(query)
if name == "python_interpreter":
code = tool_input.get("code", "")
if not code:
print(f" [TOOL ERROR] python_interpreter called with empty code. Full input: {tool_input!r}")
return "Error: 'code' parameter is required."
return python_interpreter(code)
if name == "download_task_file":
return download_task_file(tool_input.get("task_id", ""))
print(f" [TOOL ERROR] Unknown tool called: {name!r}")
return f"Unknown tool: {name}"
except Exception as e:
print(f" [TOOL EXCEPTION] {name} raised {type(e).__name__}: {e}")
return f"Tool error: {e}"
finally:
print(f" [TOOL TIMING] {name} completed in {time.time() - t0:.2f}s")
@staticmethod
def _extract_json(text: str, start: int) -> dict:
"""
Extract a JSON object starting at `start` (which must be '{') by
counting braces — handles nested dicts/code strings safely.
"""
depth = 0
in_string = False
escape = False
for i in range(start, len(text)):
ch = text[i]
if escape:
escape = False
continue
if ch == "\\" and in_string:
escape = True
continue
if ch == '"':
in_string = not in_string
continue
if in_string:
continue
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
raw = text[start : i + 1]
try:
return json.loads(raw)
except json.JSONDecodeError as e:
print(f" [PARSE ERROR] JSON decode failed: {e} | raw={raw[:200]!r}")
return {}
print(f" [PARSE ERROR] Unmatched braces — no closing '}}' found from pos {start}")
return {}
def _parse_action(self, text: str):
"""
Return (tool_name, tool_input_dict) for the last Action block in text,
or (None, None) if none is found.
"""
action_matches = list(re.finditer(r"Action:\s*(\w+)", text))
if not action_matches:
return None, None
tool_name = action_matches[-1].group(1).strip()
tool_input: dict = {}
ai_matches = list(re.finditer(r"Action Input:\s*", text))
if not ai_matches:
print(f" [PARSE WARN] Action '{tool_name}' found but no 'Action Input:' block.")
else:
pos = ai_matches[-1].end()
if pos < len(text) and text[pos] == "{":
tool_input = self._extract_json(text, pos)
if not tool_input:
print(f" [PARSE WARN] Action Input for '{tool_name}' parsed as empty dict.")
else:
snippet = text[pos : pos + 80].replace("\n", "\\n")
print(f" [PARSE WARN] Action Input for '{tool_name}' does not start with '{{': {snippet!r}")
return tool_name, tool_input
def __call__(self, question: str, task_id: str = None) -> str:
import time
print(f"\nAgent processing task {task_id}: {question[:80]}...")
user_content = f"Task ID: {task_id}\n\nQuestion: {question}" if task_id else question
messages = [
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": user_content},
]
for iteration in range(20):
t_llm = time.time()
response = None
for attempt in range(3):
try:
response = self.client.chat.completions.create(
model=self.MODEL,
messages=messages,
max_tokens=4096,
temperature=0.1,
)
break
except Exception as e:
is_retryable = any(code in str(e) for code in ("504", "502", "503", "429"))
print(f" [{iteration}] [LLM ERROR attempt {attempt+1}/3] {type(e).__name__}: {str(e)[:120]}")
if is_retryable and attempt < 2:
wait = 15 * (attempt + 1)
print(f" [{iteration}] Retrying in {wait}s...")
time.sleep(wait)
else:
raise
if response is None:
raise RuntimeError("LLM returned no response after retries")
llm_elapsed = time.time() - t_llm
raw_output = (response.choices[0].message.content or "").strip()
think_stripped = len(raw_output) - len(self._strip_think(raw_output))
output = self._strip_think(raw_output)
usage = response.usage
print(
f" [{iteration}] LLM {llm_elapsed:.1f}s | "
f"tokens in={getattr(usage, 'prompt_tokens', '?')} "
f"out={getattr(usage, 'completion_tokens', '?')} | "
f"think_stripped={think_stripped}chars"
)
print(f" [{iteration}] Model output: {output[:300]}{'...' if len(output) > 300 else ''}")
# ── Final answer found (must be at line start, not inside code/JSON) ──
fa_match = re.search(r"(?:^|\n)Final Answer:\s*(.+?)(?:\n|$)", output)
if fa_match:
answer = fa_match.group(1).strip()
print(f" [{iteration}] => Final Answer: {answer!r}")
return answer
# ── Tool call found ──
tool_name, tool_input = self._parse_action(output)
if tool_name:
print(f" [{iteration}] Tool call: {tool_name}({json.dumps(tool_input)[:200]})")
result = self._run_tool(tool_name, tool_input)
result_preview = result[:200].replace("\n", " ")
print(f" [{iteration}] Tool result ({len(result)} chars): {result_preview}{'...' if len(result) > 200 else ''}")
messages.append({"role": "assistant", "content": raw_output})
if result.startswith("IMAGE:"):
parts = result.split(":", 2)
media_type, img_b64 = parts[1], parts[2]
print(f" [{iteration}] Image received: type={media_type}, size={len(img_b64)} b64 chars")
messages.append({
"role": "user",
"content": [
{"type": "text", "text": "Observation: Here is the downloaded image. Analyse it to answer the question."},
{"type": "image_url", "image_url": {"url": f"data:{media_type};base64,{img_b64}"}},
],
})
else:
messages.append({
"role": "user",
"content": f"Observation: {result[:6000]}",
})
else:
print(f" [{iteration}] No tool call and no Final Answer — prompting model to conclude.")
messages.append({"role": "assistant", "content": raw_output})
messages.append({
"role": "user",
"content": (
"You haven't provided a Final Answer yet. "
"Please conclude with:\nFinal Answer: [answer]"
),
})
print(f" [MAX ITERATIONS] Reached iteration limit for task {task_id}.")
return "Unable to determine answer."
# --- Gradio App ---
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the GAIAAgent on them, submits all answers,
and displays the results.
"""
space_id = os.getenv("SPACE_ID")
if profile:
username = profile.username
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent
try:
agent = GAIAAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
return f"Error fetching questions: {e}", None
except Exception as e:
return f"An unexpected error occurred fetching questions: {e}", None
# 3. Run Agent
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
try:
submitted_answer = agent(question_text, task_id=task_id)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": submitted_answer,
})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": f"AGENT ERROR: {e}",
})
if not answers_payload:
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Submit
submission_data = {
"username": username.strip(),
"agent_code": agent_code,
"answers": answers_payload,
}
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
return final_status, pd.DataFrame(results_log)
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except Exception:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
return status_message, pd.DataFrame(results_log)
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
return status_message, pd.DataFrame(results_log)
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
return status_message, pd.DataFrame(results_log)
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
return status_message, pd.DataFrame(results_log)
# --- Build Gradio Interface ---
with gr.Blocks() as demo:
gr.Markdown("# GAIA Agent Evaluation Runner")
gr.Markdown(
f"""
**Instructions:**
1. Log in to your Hugging Face account using the button below.
2. Click **Run Evaluation & Submit All Answers** to fetch questions, run the agent, submit answers, and see the score.
---
**Notes:**
- The agent uses models via HF InferenceClient (provider=auto) with a ReAct loop: web search, Wikipedia, Python interpreter, and file download tools.
- Targets ≥30% on GAIA level-1 questions.
- Submission can take several minutes while the agent processes each question.
"""
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table])
if __name__ == "__main__":
print("\n" + "-" * 30 + " App Starting " + "-" * 30)
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID")
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup:
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
print("-" * (60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for GAIA Agent Evaluation...")
demo.launch(debug=True, share=False)