import os import time import gradio as gr import requests import pandas as pd from smolagents import ( CodeAgent, DuckDuckGoSearchTool, VisitWebpageTool, OpenAIModel, tool, ) # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # ============================================= # CUSTOM TOOLS # ============================================= @tool def download_file_from_api(task_id: str) -> str: """Downloads a file for a GAIA task. Use when question mentions a file/attachment. Args: task_id: The task_id string for the question. """ import tempfile url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" try: resp = requests.get(url, timeout=30) resp.raise_for_status() ct = resp.headers.get("Content-Type", "") if any(t in ct for t in ["text", "json", "csv", "xml", "html"]): return resp.text[:12000] if any(t in ct for t in ["spreadsheet", "excel", "openxmlformats"]): import openpyxl, io wb = openpyxl.load_workbook(io.BytesIO(resp.content)) lines = [] for sn in wb.sheetnames: ws = wb[sn] lines.append(f"--- Sheet: {sn} ---") for row in ws.iter_rows(values_only=True): lines.append("\t".join(str(c) if c else "" for c in row)) return "\n".join(lines)[:12000] if "pdf" in ct: import PyPDF2, io reader = PyPDF2.PdfReader(io.BytesIO(resp.content)) text = "".join(p.extract_text() or "" for p in reader.pages) return text[:12000] if text.strip() else "PDF: no text extracted." if "image" in ct: with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as f: f.write(resp.content) return f"IMAGE_FILE_SAVED:{f.name}" if any(t in ct for t in ["audio", "mpeg", "wav", "mp3", "ogg"]): with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as f: f.write(resp.content) return f"AUDIO_FILE_SAVED:{f.name}" if "python" in ct: return resp.text[:12000] if "wordprocessingml" in ct or "msword" in ct: import docx, io doc = docx.Document(io.BytesIO(resp.content)) return "\n".join(p.text for p in doc.paragraphs)[:12000] with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as f: f.write(resp.content) return f"File saved: {f.name} (type: {ct}, {len(resp.content)} bytes)" except Exception as e: return f"Error downloading: {e}" @tool def describe_image(image_path: str) -> str: """Describes an image using a vision model. Use after getting IMAGE_FILE_SAVED. Args: image_path: Path to the image file. """ try: from huggingface_hub import InferenceClient client = InferenceClient(token=os.getenv("HF_TOKEN")) with open(image_path, "rb") as f: result = client.image_to_text(image=f.read(), model="Salesforce/blip2-opt-2.7b") text = result if isinstance(result, str) else getattr(result, "generated_text", str(result)) return f"Image: {text}" except Exception as e: return f"Image error: {e}" @tool def transcribe_audio(audio_path: str) -> str: """Transcribes audio to text. Use after getting AUDIO_FILE_SAVED. Args: audio_path: Path to the audio file. """ try: from huggingface_hub import InferenceClient client = InferenceClient(token=os.getenv("HF_TOKEN")) with open(audio_path, "rb") as f: result = client.automatic_speech_recognition(audio=f.read(), model="openai/whisper-large-v3-turbo") text = result if isinstance(result, str) else getattr(result, "text", str(result)) return f"Transcription: {text}" except Exception as e: return f"Audio error: {e}" @tool def read_local_file(file_path: str) -> str: """Reads a local text file. Args: file_path: Path to the file. """ try: with open(file_path, "r", encoding="utf-8", errors="ignore") as f: return f.read()[:12000] except Exception as e: return f"Read error: {e}" @tool def execute_python_file(file_path: str) -> str: """Runs a Python script and returns output. Args: file_path: Path to the .py file. """ import subprocess try: r = subprocess.run(["python3", file_path], capture_output=True, text=True, timeout=30) out = r.stdout + (f"\nSTDERR: {r.stderr}" if r.stderr else "") return out.strip() or "No output." except subprocess.TimeoutExpired: return "Timeout after 30s." except Exception as e: return f"Exec error: {e}" # ============================================= # AGENT # ============================================= # Concise instructions to save tokens INSTRUCTIONS = """You solve GAIA benchmark questions precisely. ANSWER FORMAT: - Return ONLY the final answer. No "The answer is", no explanations. - Number → just the number (e.g. "42") - Name → just the name (e.g. "Paris") - List → comma-separated (e.g. "red, blue, green") STRATEGY: - Keep reasoning SHORT. Think step by step but briefly. - Always verify facts with web_search. Don't rely on memory. - If the answer isn't found directly, break the problem into parts and reason through them. - For counting tasks: gather all items first, then count carefully. - If a question mentions a file/attachment, FIRST call download_file_from_api with the task_id. - If download returns IMAGE_FILE_SAVED → call describe_image with that path. - If download returns AUDIO_FILE_SAVED → call transcribe_audio with that path. - For reversed/encoded text, decode it before answering. - If a question references a URL, use visit_webpage to read it. """ class BasicAgent: def __init__(self): print("Initializing agent with Gemini 2.0 Flash...") model = OpenAIModel( model_id="gemma-4-31b-it", api_base="https://generativelanguage.googleapis.com/v1beta/openai/", api_key=os.getenv("GEMINI_API_KEY"), temperature=0.1, max_tokens=1500, ) self.agent = CodeAgent( model=model, tools=[ DuckDuckGoSearchTool(), VisitWebpageTool(), download_file_from_api, describe_image, transcribe_audio, read_local_file, execute_python_file, ], max_steps=7, verbosity_level=2, instructions=INSTRUCTIONS, additional_authorized_imports=[ "json", "re", "math", "datetime", "collections", "csv", "io", "os", "tempfile", "subprocess", "base64", "hashlib", "unicodedata", "string", ], ) print("Agent ready!") def __call__(self, question: str, task_id: str = None) -> str: print(f"Processing: {question[:80]}...") if task_id: prompt = f'If needed, download file with: download_file_from_api("{task_id}")\n\nQuestion: {question}\n\nAnswer with ONLY the final answer.' else: prompt = f"Question: {question}\n\nAnswer with ONLY the final answer." for attempt in range(2): try: result = self.agent.run(prompt) answer = str(result).strip() # Clean prefixes for p in ["The answer is ", "The answer is: ", "Answer: ", "FINAL ANSWER: ", "Final answer: ", "The final answer is ", "The final answer is: ", "Result: "]: if answer.lower().startswith(p.lower()): answer = answer[len(p):].strip() # Remove quotes if len(answer) > 2 and answer[0] in '"\'': if answer[-1] == answer[0]: answer = answer[1:-1].strip() # Remove trailing period if answer.endswith(".") and len(answer.split()) <= 5: answer = answer[:-1].strip() print(f"Answer: {answer}") return answer except Exception as e: print(f"Error (attempt {attempt+1}): {e}") if attempt == 0: time.sleep(3) return "Unable to determine the answer." # ============================================= # SUBMISSION # ============================================= def run_and_submit_all(profile: gr.OAuthProfile | None): space_id = os.getenv("SPACE_ID") if not profile: return "Please Login to Hugging Face with the button.", None username = profile.username print(f"User: {username}") api_url = DEFAULT_API_URL try: agent = BasicAgent() except Exception as e: return f"Error initializing agent: {e}", None agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" try: resp = requests.get(f"{api_url}/questions", timeout=15) resp.raise_for_status() questions = resp.json() if not questions: return "No questions fetched.", None print(f"Fetched {len(questions)} questions.") except Exception as e: return f"Error fetching questions: {e}", None results_log = [] answers = [] for i, item in enumerate(questions): task_id = item.get("task_id") question = item.get("question") if not task_id or question is None: continue print(f"\n{'='*60}") print(f" Q {i+1}/{len(questions)} — {task_id}") print(f" {question[:100]}...") print(f"{'='*60}") try: answer = agent(question, task_id=task_id) answers.append({"task_id": task_id, "submitted_answer": answer}) results_log.append({"Task ID": task_id, "Question": question, "Submitted Answer": answer}) except Exception as e: print(f"Error on {task_id}: {e}") results_log.append({"Task ID": task_id, "Question": question, "Submitted Answer": f"ERROR: {e}"}) time.sleep(1) if not answers: return "No answers produced.", pd.DataFrame(results_log) submission = {"username": username.strip(), "agent_code": agent_code, "answers": answers} try: resp = requests.post(f"{api_url}/submit", json=submission, timeout=120) resp.raise_for_status() data = resp.json() status = ( f"Submission Successful!\n" f"User: {data.get('username')}\n" f"Score: {data.get('score', 'N/A')}% " f"({data.get('correct_count', '?')}/{data.get('total_attempted', '?')} correct)\n" f"Message: {data.get('message', '')}" ) return status, pd.DataFrame(results_log) except requests.exceptions.HTTPError as e: detail = e.response.text[:500] if e.response else str(e) return f"Submission Failed: {detail}", pd.DataFrame(results_log) except Exception as e: return f"Submission error: {e}", pd.DataFrame(results_log) # --- Gradio UI --- with gr.Blocks() as demo: gr.Markdown("# 🤖 GAIA Agent — Final Assignment") gr.Markdown( """ **Agent**: CodeAgent with Gemini 2.0 Flash (free) **Tools**: Web Search · Webpage Visitor · File Downloader · Image Describer · Audio Transcriber · Python Executor 1. Log in with your HF account 2. Click Run to start (takes ~15-20 min) """ ) gr.LoginButton() run_button = gr.Button("🚀 Run Evaluation & Submit All Answers") status_output = gr.Textbox(label="Status", lines=5, interactive=False) results_table = gr.DataFrame(label="Results", wrap=True) run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) if __name__ == "__main__": print("\n" + "-"*30 + " App Starting " + "-"*30) print(f"SPACE_ID: {os.getenv('SPACE_ID', 'not set')}") print("-"*60) demo.launch(debug=True, share=False)