maytemuma's picture
Update app.py
32bfb75 verified
import os
import time
import gradio as gr
import requests
import pandas as pd
from smolagents import (
CodeAgent,
DuckDuckGoSearchTool,
VisitWebpageTool,
OpenAIModel,
tool,
)
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# =============================================
# CUSTOM TOOLS
# =============================================
@tool
def download_file_from_api(task_id: str) -> str:
"""Downloads a file for a GAIA task. Use when question mentions a file/attachment.
Args:
task_id: The task_id string for the question.
"""
import tempfile
url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
try:
resp = requests.get(url, timeout=30)
resp.raise_for_status()
ct = resp.headers.get("Content-Type", "")
if any(t in ct for t in ["text", "json", "csv", "xml", "html"]):
return resp.text[:12000]
if any(t in ct for t in ["spreadsheet", "excel", "openxmlformats"]):
import openpyxl, io
wb = openpyxl.load_workbook(io.BytesIO(resp.content))
lines = []
for sn in wb.sheetnames:
ws = wb[sn]
lines.append(f"--- Sheet: {sn} ---")
for row in ws.iter_rows(values_only=True):
lines.append("\t".join(str(c) if c else "" for c in row))
return "\n".join(lines)[:12000]
if "pdf" in ct:
import PyPDF2, io
reader = PyPDF2.PdfReader(io.BytesIO(resp.content))
text = "".join(p.extract_text() or "" for p in reader.pages)
return text[:12000] if text.strip() else "PDF: no text extracted."
if "image" in ct:
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as f:
f.write(resp.content)
return f"IMAGE_FILE_SAVED:{f.name}"
if any(t in ct for t in ["audio", "mpeg", "wav", "mp3", "ogg"]):
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as f:
f.write(resp.content)
return f"AUDIO_FILE_SAVED:{f.name}"
if "python" in ct:
return resp.text[:12000]
if "wordprocessingml" in ct or "msword" in ct:
import docx, io
doc = docx.Document(io.BytesIO(resp.content))
return "\n".join(p.text for p in doc.paragraphs)[:12000]
with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as f:
f.write(resp.content)
return f"File saved: {f.name} (type: {ct}, {len(resp.content)} bytes)"
except Exception as e:
return f"Error downloading: {e}"
@tool
def describe_image(image_path: str) -> str:
"""Describes an image using a vision model. Use after getting IMAGE_FILE_SAVED.
Args:
image_path: Path to the image file.
"""
try:
from huggingface_hub import InferenceClient
client = InferenceClient(token=os.getenv("HF_TOKEN"))
with open(image_path, "rb") as f:
result = client.image_to_text(image=f.read(), model="Salesforce/blip2-opt-2.7b")
text = result if isinstance(result, str) else getattr(result, "generated_text", str(result))
return f"Image: {text}"
except Exception as e:
return f"Image error: {e}"
@tool
def transcribe_audio(audio_path: str) -> str:
"""Transcribes audio to text. Use after getting AUDIO_FILE_SAVED.
Args:
audio_path: Path to the audio file.
"""
try:
from huggingface_hub import InferenceClient
client = InferenceClient(token=os.getenv("HF_TOKEN"))
with open(audio_path, "rb") as f:
result = client.automatic_speech_recognition(audio=f.read(), model="openai/whisper-large-v3-turbo")
text = result if isinstance(result, str) else getattr(result, "text", str(result))
return f"Transcription: {text}"
except Exception as e:
return f"Audio error: {e}"
@tool
def read_local_file(file_path: str) -> str:
"""Reads a local text file.
Args:
file_path: Path to the file.
"""
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()[:12000]
except Exception as e:
return f"Read error: {e}"
@tool
def execute_python_file(file_path: str) -> str:
"""Runs a Python script and returns output.
Args:
file_path: Path to the .py file.
"""
import subprocess
try:
r = subprocess.run(["python3", file_path], capture_output=True, text=True, timeout=30)
out = r.stdout + (f"\nSTDERR: {r.stderr}" if r.stderr else "")
return out.strip() or "No output."
except subprocess.TimeoutExpired:
return "Timeout after 30s."
except Exception as e:
return f"Exec error: {e}"
# =============================================
# AGENT
# =============================================
# Concise instructions to save tokens
INSTRUCTIONS = """You solve GAIA benchmark questions precisely.
ANSWER FORMAT:
- Return ONLY the final answer. No "The answer is", no explanations.
- Number → just the number (e.g. "42")
- Name → just the name (e.g. "Paris")
- List → comma-separated (e.g. "red, blue, green")
STRATEGY:
- Keep reasoning SHORT. Think step by step but briefly.
- Always verify facts with web_search. Don't rely on memory.
- If the answer isn't found directly, break the problem into parts and reason through them.
- For counting tasks: gather all items first, then count carefully.
- If a question mentions a file/attachment, FIRST call download_file_from_api with the task_id.
- If download returns IMAGE_FILE_SAVED → call describe_image with that path.
- If download returns AUDIO_FILE_SAVED → call transcribe_audio with that path.
- For reversed/encoded text, decode it before answering.
- If a question references a URL, use visit_webpage to read it.
"""
class BasicAgent:
def __init__(self):
print("Initializing agent with Gemini 2.0 Flash...")
model = OpenAIModel(
model_id="gemma-4-31b-it",
api_base="https://generativelanguage.googleapis.com/v1beta/openai/",
api_key=os.getenv("GEMINI_API_KEY"),
temperature=0.1,
max_tokens=1500,
)
self.agent = CodeAgent(
model=model,
tools=[
DuckDuckGoSearchTool(),
VisitWebpageTool(),
download_file_from_api,
describe_image,
transcribe_audio,
read_local_file,
execute_python_file,
],
max_steps=7,
verbosity_level=2,
instructions=INSTRUCTIONS,
additional_authorized_imports=[
"json", "re", "math", "datetime", "collections",
"csv", "io", "os", "tempfile", "subprocess",
"base64", "hashlib", "unicodedata", "string",
],
)
print("Agent ready!")
def __call__(self, question: str, task_id: str = None) -> str:
print(f"Processing: {question[:80]}...")
if task_id:
prompt = f'If needed, download file with: download_file_from_api("{task_id}")\n\nQuestion: {question}\n\nAnswer with ONLY the final answer.'
else:
prompt = f"Question: {question}\n\nAnswer with ONLY the final answer."
for attempt in range(2):
try:
result = self.agent.run(prompt)
answer = str(result).strip()
# Clean prefixes
for p in ["The answer is ", "The answer is: ", "Answer: ",
"FINAL ANSWER: ", "Final answer: ", "The final answer is ",
"The final answer is: ", "Result: "]:
if answer.lower().startswith(p.lower()):
answer = answer[len(p):].strip()
# Remove quotes
if len(answer) > 2 and answer[0] in '"\'':
if answer[-1] == answer[0]:
answer = answer[1:-1].strip()
# Remove trailing period
if answer.endswith(".") and len(answer.split()) <= 5:
answer = answer[:-1].strip()
print(f"Answer: {answer}")
return answer
except Exception as e:
print(f"Error (attempt {attempt+1}): {e}")
if attempt == 0:
time.sleep(3)
return "Unable to determine the answer."
# =============================================
# SUBMISSION
# =============================================
def run_and_submit_all(profile: gr.OAuthProfile | None):
space_id = os.getenv("SPACE_ID")
if not profile:
return "Please Login to Hugging Face with the button.", None
username = profile.username
print(f"User: {username}")
api_url = DEFAULT_API_URL
try:
agent = BasicAgent()
except Exception as e:
return f"Error initializing agent: {e}", None
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
try:
resp = requests.get(f"{api_url}/questions", timeout=15)
resp.raise_for_status()
questions = resp.json()
if not questions:
return "No questions fetched.", None
print(f"Fetched {len(questions)} questions.")
except Exception as e:
return f"Error fetching questions: {e}", None
results_log = []
answers = []
for i, item in enumerate(questions):
task_id = item.get("task_id")
question = item.get("question")
if not task_id or question is None:
continue
print(f"\n{'='*60}")
print(f" Q {i+1}/{len(questions)}{task_id}")
print(f" {question[:100]}...")
print(f"{'='*60}")
try:
answer = agent(question, task_id=task_id)
answers.append({"task_id": task_id, "submitted_answer": answer})
results_log.append({"Task ID": task_id, "Question": question, "Submitted Answer": answer})
except Exception as e:
print(f"Error on {task_id}: {e}")
results_log.append({"Task ID": task_id, "Question": question, "Submitted Answer": f"ERROR: {e}"})
time.sleep(1)
if not answers:
return "No answers produced.", pd.DataFrame(results_log)
submission = {"username": username.strip(), "agent_code": agent_code, "answers": answers}
try:
resp = requests.post(f"{api_url}/submit", json=submission, timeout=120)
resp.raise_for_status()
data = resp.json()
status = (
f"Submission Successful!\n"
f"User: {data.get('username')}\n"
f"Score: {data.get('score', 'N/A')}% "
f"({data.get('correct_count', '?')}/{data.get('total_attempted', '?')} correct)\n"
f"Message: {data.get('message', '')}"
)
return status, pd.DataFrame(results_log)
except requests.exceptions.HTTPError as e:
detail = e.response.text[:500] if e.response else str(e)
return f"Submission Failed: {detail}", pd.DataFrame(results_log)
except Exception as e:
return f"Submission error: {e}", pd.DataFrame(results_log)
# --- Gradio UI ---
with gr.Blocks() as demo:
gr.Markdown("# 🤖 GAIA Agent — Final Assignment")
gr.Markdown(
"""
**Agent**: CodeAgent with Gemini 2.0 Flash (free)
**Tools**: Web Search · Webpage Visitor · File Downloader · Image Describer · Audio Transcriber · Python Executor
1. Log in with your HF account
2. Click Run to start (takes ~15-20 min)
"""
)
gr.LoginButton()
run_button = gr.Button("🚀 Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Status", lines=5, interactive=False)
results_table = gr.DataFrame(label="Results", wrap=True)
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table])
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
print(f"SPACE_ID: {os.getenv('SPACE_ID', 'not set')}")
print("-"*60)
demo.launch(debug=True, share=False)