Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| from smolagents import ( | |
| CodeAgent, | |
| DuckDuckGoSearchTool, | |
| VisitWebpageTool, | |
| OpenAIModel, | |
| tool, | |
| ) | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # ============================================= | |
| # CUSTOM TOOLS | |
| # ============================================= | |
| def download_file_from_api(task_id: str) -> str: | |
| """Downloads a file for a GAIA task. Use when question mentions a file/attachment. | |
| Args: | |
| task_id: The task_id string for the question. | |
| """ | |
| import tempfile | |
| url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" | |
| try: | |
| resp = requests.get(url, timeout=30) | |
| resp.raise_for_status() | |
| ct = resp.headers.get("Content-Type", "") | |
| if any(t in ct for t in ["text", "json", "csv", "xml", "html"]): | |
| return resp.text[:12000] | |
| if any(t in ct for t in ["spreadsheet", "excel", "openxmlformats"]): | |
| import openpyxl, io | |
| wb = openpyxl.load_workbook(io.BytesIO(resp.content)) | |
| lines = [] | |
| for sn in wb.sheetnames: | |
| ws = wb[sn] | |
| lines.append(f"--- Sheet: {sn} ---") | |
| for row in ws.iter_rows(values_only=True): | |
| lines.append("\t".join(str(c) if c else "" for c in row)) | |
| return "\n".join(lines)[:12000] | |
| if "pdf" in ct: | |
| import PyPDF2, io | |
| reader = PyPDF2.PdfReader(io.BytesIO(resp.content)) | |
| text = "".join(p.extract_text() or "" for p in reader.pages) | |
| return text[:12000] if text.strip() else "PDF: no text extracted." | |
| if "image" in ct: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as f: | |
| f.write(resp.content) | |
| return f"IMAGE_FILE_SAVED:{f.name}" | |
| if any(t in ct for t in ["audio", "mpeg", "wav", "mp3", "ogg"]): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as f: | |
| f.write(resp.content) | |
| return f"AUDIO_FILE_SAVED:{f.name}" | |
| if "python" in ct: | |
| return resp.text[:12000] | |
| if "wordprocessingml" in ct or "msword" in ct: | |
| import docx, io | |
| doc = docx.Document(io.BytesIO(resp.content)) | |
| return "\n".join(p.text for p in doc.paragraphs)[:12000] | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as f: | |
| f.write(resp.content) | |
| return f"File saved: {f.name} (type: {ct}, {len(resp.content)} bytes)" | |
| except Exception as e: | |
| return f"Error downloading: {e}" | |
| def describe_image(image_path: str) -> str: | |
| """Describes an image using a vision model. Use after getting IMAGE_FILE_SAVED. | |
| Args: | |
| image_path: Path to the image file. | |
| """ | |
| try: | |
| from huggingface_hub import InferenceClient | |
| client = InferenceClient(token=os.getenv("HF_TOKEN")) | |
| with open(image_path, "rb") as f: | |
| result = client.image_to_text(image=f.read(), model="Salesforce/blip2-opt-2.7b") | |
| text = result if isinstance(result, str) else getattr(result, "generated_text", str(result)) | |
| return f"Image: {text}" | |
| except Exception as e: | |
| return f"Image error: {e}" | |
| def transcribe_audio(audio_path: str) -> str: | |
| """Transcribes audio to text. Use after getting AUDIO_FILE_SAVED. | |
| Args: | |
| audio_path: Path to the audio file. | |
| """ | |
| try: | |
| from huggingface_hub import InferenceClient | |
| client = InferenceClient(token=os.getenv("HF_TOKEN")) | |
| with open(audio_path, "rb") as f: | |
| result = client.automatic_speech_recognition(audio=f.read(), model="openai/whisper-large-v3-turbo") | |
| text = result if isinstance(result, str) else getattr(result, "text", str(result)) | |
| return f"Transcription: {text}" | |
| except Exception as e: | |
| return f"Audio error: {e}" | |
| def read_local_file(file_path: str) -> str: | |
| """Reads a local text file. | |
| Args: | |
| file_path: Path to the file. | |
| """ | |
| try: | |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
| return f.read()[:12000] | |
| except Exception as e: | |
| return f"Read error: {e}" | |
| def execute_python_file(file_path: str) -> str: | |
| """Runs a Python script and returns output. | |
| Args: | |
| file_path: Path to the .py file. | |
| """ | |
| import subprocess | |
| try: | |
| r = subprocess.run(["python3", file_path], capture_output=True, text=True, timeout=30) | |
| out = r.stdout + (f"\nSTDERR: {r.stderr}" if r.stderr else "") | |
| return out.strip() or "No output." | |
| except subprocess.TimeoutExpired: | |
| return "Timeout after 30s." | |
| except Exception as e: | |
| return f"Exec error: {e}" | |
| # ============================================= | |
| # AGENT | |
| # ============================================= | |
| # Concise instructions to save tokens | |
| INSTRUCTIONS = """You solve GAIA benchmark questions precisely. | |
| ANSWER FORMAT: | |
| - Return ONLY the final answer. No "The answer is", no explanations. | |
| - Number → just the number (e.g. "42") | |
| - Name → just the name (e.g. "Paris") | |
| - List → comma-separated (e.g. "red, blue, green") | |
| STRATEGY: | |
| - Keep reasoning SHORT. Think step by step but briefly. | |
| - Always verify facts with web_search. Don't rely on memory. | |
| - If the answer isn't found directly, break the problem into parts and reason through them. | |
| - For counting tasks: gather all items first, then count carefully. | |
| - If a question mentions a file/attachment, FIRST call download_file_from_api with the task_id. | |
| - If download returns IMAGE_FILE_SAVED → call describe_image with that path. | |
| - If download returns AUDIO_FILE_SAVED → call transcribe_audio with that path. | |
| - For reversed/encoded text, decode it before answering. | |
| - If a question references a URL, use visit_webpage to read it. | |
| """ | |
| class BasicAgent: | |
| def __init__(self): | |
| print("Initializing agent with Gemini 2.0 Flash...") | |
| model = OpenAIModel( | |
| model_id="gemma-4-31b-it", | |
| api_base="https://generativelanguage.googleapis.com/v1beta/openai/", | |
| api_key=os.getenv("GEMINI_API_KEY"), | |
| temperature=0.1, | |
| max_tokens=1500, | |
| ) | |
| self.agent = CodeAgent( | |
| model=model, | |
| tools=[ | |
| DuckDuckGoSearchTool(), | |
| VisitWebpageTool(), | |
| download_file_from_api, | |
| describe_image, | |
| transcribe_audio, | |
| read_local_file, | |
| execute_python_file, | |
| ], | |
| max_steps=7, | |
| verbosity_level=2, | |
| instructions=INSTRUCTIONS, | |
| additional_authorized_imports=[ | |
| "json", "re", "math", "datetime", "collections", | |
| "csv", "io", "os", "tempfile", "subprocess", | |
| "base64", "hashlib", "unicodedata", "string", | |
| ], | |
| ) | |
| print("Agent ready!") | |
| def __call__(self, question: str, task_id: str = None) -> str: | |
| print(f"Processing: {question[:80]}...") | |
| if task_id: | |
| prompt = f'If needed, download file with: download_file_from_api("{task_id}")\n\nQuestion: {question}\n\nAnswer with ONLY the final answer.' | |
| else: | |
| prompt = f"Question: {question}\n\nAnswer with ONLY the final answer." | |
| for attempt in range(2): | |
| try: | |
| result = self.agent.run(prompt) | |
| answer = str(result).strip() | |
| # Clean prefixes | |
| for p in ["The answer is ", "The answer is: ", "Answer: ", | |
| "FINAL ANSWER: ", "Final answer: ", "The final answer is ", | |
| "The final answer is: ", "Result: "]: | |
| if answer.lower().startswith(p.lower()): | |
| answer = answer[len(p):].strip() | |
| # Remove quotes | |
| if len(answer) > 2 and answer[0] in '"\'': | |
| if answer[-1] == answer[0]: | |
| answer = answer[1:-1].strip() | |
| # Remove trailing period | |
| if answer.endswith(".") and len(answer.split()) <= 5: | |
| answer = answer[:-1].strip() | |
| print(f"Answer: {answer}") | |
| return answer | |
| except Exception as e: | |
| print(f"Error (attempt {attempt+1}): {e}") | |
| if attempt == 0: | |
| time.sleep(3) | |
| return "Unable to determine the answer." | |
| # ============================================= | |
| # SUBMISSION | |
| # ============================================= | |
| def run_and_submit_all(profile: gr.OAuthProfile | None): | |
| space_id = os.getenv("SPACE_ID") | |
| if not profile: | |
| return "Please Login to Hugging Face with the button.", None | |
| username = profile.username | |
| print(f"User: {username}") | |
| api_url = DEFAULT_API_URL | |
| try: | |
| agent = BasicAgent() | |
| except Exception as e: | |
| return f"Error initializing agent: {e}", None | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| try: | |
| resp = requests.get(f"{api_url}/questions", timeout=15) | |
| resp.raise_for_status() | |
| questions = resp.json() | |
| if not questions: | |
| return "No questions fetched.", None | |
| print(f"Fetched {len(questions)} questions.") | |
| except Exception as e: | |
| return f"Error fetching questions: {e}", None | |
| results_log = [] | |
| answers = [] | |
| for i, item in enumerate(questions): | |
| task_id = item.get("task_id") | |
| question = item.get("question") | |
| if not task_id or question is None: | |
| continue | |
| print(f"\n{'='*60}") | |
| print(f" Q {i+1}/{len(questions)} — {task_id}") | |
| print(f" {question[:100]}...") | |
| print(f"{'='*60}") | |
| try: | |
| answer = agent(question, task_id=task_id) | |
| answers.append({"task_id": task_id, "submitted_answer": answer}) | |
| results_log.append({"Task ID": task_id, "Question": question, "Submitted Answer": answer}) | |
| except Exception as e: | |
| print(f"Error on {task_id}: {e}") | |
| results_log.append({"Task ID": task_id, "Question": question, "Submitted Answer": f"ERROR: {e}"}) | |
| time.sleep(1) | |
| if not answers: | |
| return "No answers produced.", pd.DataFrame(results_log) | |
| submission = {"username": username.strip(), "agent_code": agent_code, "answers": answers} | |
| try: | |
| resp = requests.post(f"{api_url}/submit", json=submission, timeout=120) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| status = ( | |
| f"Submission Successful!\n" | |
| f"User: {data.get('username')}\n" | |
| f"Score: {data.get('score', 'N/A')}% " | |
| f"({data.get('correct_count', '?')}/{data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {data.get('message', '')}" | |
| ) | |
| return status, pd.DataFrame(results_log) | |
| except requests.exceptions.HTTPError as e: | |
| detail = e.response.text[:500] if e.response else str(e) | |
| return f"Submission Failed: {detail}", pd.DataFrame(results_log) | |
| except Exception as e: | |
| return f"Submission error: {e}", pd.DataFrame(results_log) | |
| # --- Gradio UI --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 🤖 GAIA Agent — Final Assignment") | |
| gr.Markdown( | |
| """ | |
| **Agent**: CodeAgent with Gemini 2.0 Flash (free) | |
| **Tools**: Web Search · Webpage Visitor · File Downloader · Image Describer · Audio Transcriber · Python Executor | |
| 1. Log in with your HF account | |
| 2. Click Run to start (takes ~15-20 min) | |
| """ | |
| ) | |
| gr.LoginButton() | |
| run_button = gr.Button("🚀 Run Evaluation & Submit All Answers") | |
| status_output = gr.Textbox(label="Status", lines=5, interactive=False) | |
| results_table = gr.DataFrame(label="Results", wrap=True) | |
| run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " App Starting " + "-"*30) | |
| print(f"SPACE_ID: {os.getenv('SPACE_ID', 'not set')}") | |
| print("-"*60) | |
| demo.launch(debug=True, share=False) |