Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| import math | |
| import statistics | |
| import ast | |
| import pathlib | |
| import io | |
| import tempfile | |
| import base64 | |
| import urllib.request | |
| import time | |
| import re | |
| import json | |
| from huggingface_hub import InferenceClient | |
| from smolagents import CodeAgent, InferenceClientModel, tool | |
| from smolagents import DuckDuckGoSearchTool, VisitWebpageTool | |
| # --- Custom tool: safe arithmetic calculator --- | |
| def calculator(expression: str) -> str: | |
| """ | |
| Evaluate a safe arithmetic or mathematical expression. | |
| Use this for numeric computations: arithmetic, trig, sqrt, logarithms, etc. | |
| Args: | |
| expression: A Python-style math expression, e.g. "sqrt(144) + 2**10" or "mean([3,5,7])" | |
| """ | |
| _ALLOWED_NODES = { | |
| ast.Expression, ast.BinOp, ast.UnaryOp, ast.Num, ast.Constant, | |
| ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Pow, ast.Mod, ast.USub, ast.UAdd, | |
| ast.FloorDiv, ast.Load, ast.Compare, ast.Eq, ast.NotEq, ast.Lt, ast.LtE, ast.Gt, ast.GtE, | |
| ast.Call, ast.Name, ast.Tuple, ast.List, | |
| } | |
| _math_funcs = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")} | |
| _math_funcs.update({"mean": statistics.mean, "median": statistics.median, | |
| "sum": sum, "min": min, "max": max, "round": round, "abs": abs}) | |
| def _check(n): | |
| if type(n) not in _ALLOWED_NODES: | |
| raise ValueError(f"Disallowed expression: {type(n).__name__}") | |
| for child in ast.iter_child_nodes(n): | |
| _check(child) | |
| try: | |
| node = ast.parse(expression, mode="eval") | |
| _check(node) | |
| val = eval(compile(node, "<calc>", "eval"), {"__builtins__": {}}, _math_funcs) | |
| return str(val) | |
| except Exception as e: | |
| return f"ERROR: calculator failed: {e}" | |
| # --- Multimodal tool: image OCR via FireRed-OCR --- | |
| def ocr_image(image_source: str) -> str: | |
| """ | |
| Extract all text visible in an image using FireRed-OCR (a VLM-based OCR model). | |
| Accepts an HTTP/HTTPS image URL or a local file path. | |
| Args: | |
| image_source: HTTP URL or absolute local file path of the image to process. | |
| """ | |
| try: | |
| client = InferenceClient("FireRedTeam/FireRed-OCR", token=os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN")) | |
| if image_source.startswith("http"): | |
| image_content = {"type": "image_url", "image_url": {"url": image_source}} | |
| else: | |
| with open(image_source, "rb") as f: | |
| b64 = base64.b64encode(f.read()).decode() | |
| ext = pathlib.Path(image_source).suffix.lstrip(".") or "png" | |
| image_content = { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/{ext};base64,{b64}"}, | |
| } | |
| messages = [{ | |
| "role": "user", | |
| "content": [ | |
| image_content, | |
| {"type": "text", "text": "Extract and return ALL text visible in this image. Output only the extracted text, and a full description of the image."}, | |
| ], | |
| }] | |
| resp = client.chat_completion(messages=messages, max_tokens=1024) | |
| return resp.choices[0].message.content.strip() or "(no text detected)" | |
| except Exception as e: | |
| return f"ERROR: ocr_image failed: {e}" | |
| # --- Multimodal tool: video understanding via LLaVA-Video-7B-Qwen2 --- | |
| def analyze_video(video_url: str, question: str = "Describe this video in detail.") -> str: | |
| """ | |
| Analyze a video and answer a question about it using LLaVA-Video-7B-Qwen2. | |
| Args: | |
| video_url: Direct HTTP/HTTPS URL to the video file (mp4, avi, webm, mov, etc.). | |
| question: The question to ask about the video content. | |
| """ | |
| try: | |
| client = InferenceClient("lmms-lab/LLaVA-Video-7B-Qwen2", token=os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN")) | |
| # LLaVA-Video does not accept YouTube watch URLs — only direct video file URLs. | |
| # If caller passed a YouTube URL, surface a clear error so the agent can fall back. | |
| if "youtube.com/watch" in video_url or "youtu.be/" in video_url: | |
| return ( | |
| "ERROR: analyze_video does not support YouTube watch URLs. " | |
| "Call get_youtube_transcript(url) instead to get the spoken content, " | |
| "or use DuckDuckGoSearchTool to search for information about the video." | |
| ) | |
| messages = [{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "video_url", "video_url": {"url": video_url}}, | |
| {"type": "text", "text": question}, | |
| ], | |
| }] | |
| resp = client.chat_completion(messages=messages, max_tokens=768) | |
| return resp.choices[0].message.content.strip() | |
| except Exception as e: | |
| return f"ERROR: analyze_video failed: {e if e else 'model returned empty response'}" | |
| # --- YouTube transcript via youtube-transcript-api --- | |
| def get_youtube_transcript(url: str) -> str: | |
| """ | |
| Retrieve the spoken transcript of a YouTube video. | |
| Works with standard youtube.com/watch?v=... and youtu.be/... URLs. | |
| Use this for any question about what is said or shown in a YouTube video. | |
| Args: | |
| url: Full YouTube video URL, e.g. 'https://www.youtube.com/watch?v=abcd1234' | |
| """ | |
| try: | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| # Extract video ID | |
| match = re.search(r"(?:v=|youtu\.be/)([A-Za-z0-9_-]{11})", url) | |
| if not match: | |
| return f"ERROR: could not extract YouTube video ID from URL: {url}" | |
| video_id = match.group(1) | |
| # Try to get English transcript first, fall back to any available | |
| try: | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=["en", "en-US", "en-GB"]) | |
| except Exception: | |
| # Fetch whatever language is available | |
| transcripts = YouTubeTranscriptApi.list_transcripts(video_id) | |
| transcript_list = transcripts.find_transcript( | |
| [t.language_code for t in transcripts] | |
| ).fetch() | |
| text = " ".join(entry["text"] for entry in transcript_list) | |
| return text[:8000] if len(text) > 8000 else text | |
| except Exception as e: | |
| return f"ERROR: get_youtube_transcript failed: {e}" | |
| # --- Audio transcription via Whisper --- | |
| def transcribe_audio(audio_source: str) -> str: | |
| """ | |
| Transcribe speech in an audio file to text using openai/whisper-large-v3. | |
| Accepts an HTTP/HTTPS URL or a local file path. | |
| Args: | |
| audio_source: HTTP URL or local path to an audio file (mp3, wav, flac, ogg, m4a). | |
| """ | |
| try: | |
| client = InferenceClient( | |
| "openai/whisper-large-v3", | |
| token=os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN"), | |
| provider="hf-inference", # avoids paid fal-ai routing | |
| ) | |
| result = client.automatic_speech_recognition(audio_source) | |
| return result.text if hasattr(result, "text") else str(result) | |
| except Exception as e: | |
| return f"ERROR: transcribe_audio failed: {e}" | |
| # --- File interpretation: PDF, CSV, Excel, text, image, audio, video --- | |
| def read_task_file(task_id: str, file_name: str, file_path: str = "") -> str: | |
| """ | |
| Download and parse the file attached to a GAIA task question. | |
| Automatically handles: PDF (text extraction), CSV/Excel (table as text), | |
| plain text/JSON/HTML, images (OCR), audio (transcription), video (analysis). | |
| Args: | |
| task_id: The GAIA task ID whose attached file should be read. | |
| file_name: The original file name including extension (e.g. 'data.csv', 'chart.png'). | |
| file_path: Optional relative file path from the task metadata (e.g. '2023/test/uuid.jpg'). | |
| When provided this is tried first as the download URL. | |
| """ | |
| hf_token = os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN") | |
| hf_headers = {"User-Agent": "HF-AgentsCourse/1.0"} | |
| if hf_token: | |
| hf_headers["Authorization"] = f"Bearer {hf_token}" | |
| # Build candidate URLs in priority order: | |
| # 1. HuggingFace dataset repo (actual storage location, needs auth) | |
| # 2. GAIA scoring API /files/{task_id} (fallback) | |
| candidates = [] | |
| if file_path: | |
| candidates.append( | |
| f"https://huggingface.co/datasets/gaia-benchmark/GAIA/resolve/main/{file_path}" | |
| ) | |
| candidates.append(f"https://agents-course-unit4-scoring.hf.space/files/{task_id}") | |
| data = None | |
| last_err = "" | |
| successful_url = candidates[0] # default | |
| for url in candidates: | |
| try: | |
| req = urllib.request.Request(url, headers=hf_headers) | |
| with urllib.request.urlopen(req, timeout=30) as resp: | |
| data = resp.read() | |
| successful_url = url | |
| break # success | |
| except Exception as e: | |
| last_err = str(e) | |
| if data is None: | |
| return f"ERROR: could not download file for task '{task_id}': {last_err}" | |
| ext = pathlib.Path(file_name).suffix.lower() | |
| try: | |
| if ext == ".pdf": | |
| import pypdf | |
| reader = pypdf.PdfReader(io.BytesIO(data)) | |
| pages = [p.extract_text() or "" for p in reader.pages] | |
| text = "\n\n--- Page Break ---\n\n".join(pages).strip() | |
| return text[:8000] if text else "(no text extracted from PDF)" | |
| elif ext == ".csv": | |
| df = pd.read_csv(io.BytesIO(data)) | |
| return df.to_string(max_rows=200, index=False) | |
| elif ext in (".xlsx", ".xls"): | |
| df = pd.read_excel(io.BytesIO(data)) | |
| return df.to_string(max_rows=200, index=False) | |
| elif ext in (".txt", ".md", ".json", ".xml", ".html", ".htm", ".py", ".tsv"): | |
| return data.decode("utf-8", errors="replace")[:8000] | |
| elif ext in (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff"): | |
| suffix = ext or ".png" | |
| with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: | |
| tmp.write(data) | |
| tmp_path = tmp.name | |
| try: | |
| return ocr_image(tmp_path) | |
| finally: | |
| os.unlink(tmp_path) | |
| elif ext in (".mp3", ".wav", ".flac", ".ogg", ".m4a"): | |
| return transcribe_audio(successful_url) | |
| elif ext in (".mp4", ".avi", ".mov", ".mkv", ".webm"): | |
| return analyze_video(successful_url) | |
| else: | |
| # Try decoding as UTF-8 text, fall back to size info | |
| try: | |
| return data.decode("utf-8", errors="replace")[:4000] | |
| except Exception: | |
| return f"[binary file, {len(data)} bytes, extension='{ext}']" | |
| except Exception as e: | |
| return f"ERROR: read_task_file parsing failed (ext='{ext}'): {e}" | |
| # --- Wikipedia search via official API (avoids 403 from VisitWebpageTool) --- | |
| def wikipedia_search(query: str, sentences: int = 10) -> str: | |
| """ | |
| Search Wikipedia and return a plain-text summary of the most relevant article. | |
| Preferred over VisitWebpageTool for Wikipedia questions — never returns 403. | |
| For full article sections (e.g. discography, revisions history), call with a | |
| precise article title and use the 'sections' parameter via VisitWebpageTool as | |
| fallback if you need more than the summary. | |
| Args: | |
| query: Search term or exact Wikipedia article title. | |
| sentences: How many sentences of summary to return (default 10). | |
| """ | |
| try: | |
| # Step 1: search for the best matching title | |
| search_url = ( | |
| "https://en.wikipedia.org/w/api.php" | |
| f"?action=opensearch&search={urllib.request.quote(query)}&limit=1&format=json" | |
| ) | |
| req = urllib.request.Request(search_url, headers={"User-Agent": "HF-AgentsCourse/1.0 (research bot)"}) | |
| with urllib.request.urlopen(req, timeout=15) as r: | |
| results = json.loads(r.read()) | |
| if not results[1]: | |
| return f"No Wikipedia article found for query: '{query}'" | |
| title = results[1][0] | |
| # Step 2: fetch plain-text extract | |
| extract_url = ( | |
| "https://en.wikipedia.org/w/api.php" | |
| f"?action=query&titles={urllib.request.quote(title)}" | |
| f"&prop=extracts&explaintext=true&exsentences={sentences}&format=json" | |
| ) | |
| req2 = urllib.request.Request(extract_url, headers={"User-Agent": "HF-AgentsCourse/1.0 (research bot)"}) | |
| with urllib.request.urlopen(req2, timeout=15) as r2: | |
| data = json.loads(r2.read()) | |
| pages = data.get("query", {}).get("pages", {}) | |
| page = next(iter(pages.values())) | |
| extract = page.get("extract", "").strip() | |
| return extract[:6000] if extract else f"No content found for article: '{title}'" | |
| except Exception as e: | |
| return f"ERROR: wikipedia_search failed: {e}" | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # ReAct-style instructions appended to each task. CodeAgent implements the | |
| # Thought → Code → Observation → … → final_answer() ReAct loop natively. | |
| # The final_answer() value must follow the GAIA submission format below. | |
| REACT_INSTRUCTIONS = ( | |
| "\n\nYou are a general AI assistant. I will ask you a question. " | |
| "Report your thoughts, and finish your answer with the following template: " | |
| "FINAL ANSWER: [YOUR FINAL ANSWER].\n" | |
| "YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma " | |
| "separated list of numbers and/or strings.\n" | |
| "If you are asked for a number, don't use comma to write your number neither use " | |
| "units such as $ or percent sign unless specified otherwise.\n" | |
| "If you are asked for a string, don't use articles, neither abbreviations " | |
| "(e.g. for cities), and write the digits in plain text unless specified otherwise.\n" | |
| "If you are asked for a comma separated list, apply the above rules depending of " | |
| "whether the element to be put in the list is a number or a string.\n\n" | |
| "Additional execution rules:\n" | |
| "- Reason step-by-step in code comments before calling tools.\n" | |
| "- Use DuckDuckGoSearchTool / VisitWebpageTool to look up facts.\n" | |
| "- Use calculator for any arithmetic; never compute in your head.\n" | |
| "- If the question mentions an attached file, call read_task_file first.\n" | |
| "- For images call ocr_image, for audio call transcribe_audio, " | |
| "for video call analyze_video.\n" | |
| "- For YouTube video questions, call get_youtube_transcript(url) — " | |
| "analyze_video does NOT work with YouTube URLs.\n" | |
| "- For Wikipedia questions, prefer wikipedia_search over VisitWebpageTool " | |
| "(it uses the API and never gets 403).\n" | |
| "- When you are confident, call final_answer() with ONLY the bare answer value " | |
| "(no 'FINAL ANSWER:' prefix — the prefix is for your reasoning trace only)." | |
| ) | |
| def _extract_final_answer(raw: str) -> str: | |
| """ | |
| Pull the answer out of the agent's output. | |
| Handles both: | |
| - CodeAgent returning a plain string from final_answer() | |
| - A string containing 'FINAL ANSWER: ...' anywhere in it | |
| """ | |
| if not isinstance(raw, str): | |
| raw = str(raw) | |
| # Look for the canonical submission marker | |
| marker = "FINAL ANSWER:" | |
| idx = raw.upper().rfind(marker) # rfind → take the last occurrence | |
| if idx != -1: | |
| answer = raw[idx + len(marker):].strip() | |
| # Strip trailing punctuation that may have been added | |
| answer = answer.rstrip(".") | |
| return answer | |
| # No marker found — the CodeAgent returned the bare value directly | |
| return raw.strip() | |
| def build_agent() -> CodeAgent: | |
| """ | |
| Build a ReAct CodeAgent (Thought → Code → Observation loop) powered by | |
| Qwen2.5-72B-Instruct with the following tools: | |
| - DuckDuckGoSearchTool : web search | |
| - VisitWebpageTool : fetch and read a web page | |
| - calculator : safe AST-based arithmetic / math | |
| - ocr_image : image text extraction (FireRedTeam/FireRed-OCR) | |
| - analyze_video : video understanding (lmms-lab/LLaVA-Video-7B-Qwen2) | |
| - transcribe_audio : speech-to-text (openai/whisper-large-v3) | |
| - read_task_file : download & parse task attachments | |
| (PDF, CSV, Excel, text, image, audio, video) | |
| """ | |
| # HF Spaces exposes the token as HF_TOKEN; fall back to HF_API_TOKEN for | |
| # HF Spaces exposes the token as HF_TOKEN; fall back to HF_API_TOKEN for | |
| # local / custom secret names. provider="novita" is required because | |
| # Qwen2.5-72B-Instruct is not hosted on hf-inference (causes 404). | |
| hf_token = os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN") | |
| model = InferenceClientModel( | |
| model_id="Qwen/Qwen2.5-72B-Instruct", | |
| token=hf_token, | |
| ) | |
| return CodeAgent( | |
| tools=[ | |
| DuckDuckGoSearchTool(max_results=5), | |
| VisitWebpageTool(), | |
| calculator, | |
| wikipedia_search, | |
| get_youtube_transcript, | |
| ocr_image, | |
| analyze_video, | |
| transcribe_audio, | |
| read_task_file, | |
| ], | |
| model=model, | |
| max_steps=10, | |
| additional_authorized_imports=[ | |
| "math", "statistics", "json", "re", | |
| "datetime", "collections", "itertools", | |
| "pandas", "io", "base64", "pathlib", | |
| "urllib", "urllib.request", "urllib.parse", | |
| ], | |
| ) | |
| def _run_with_retry(agent: CodeAgent, task_input: str, max_retries: int = 2) -> str: | |
| """ | |
| Run the agent with automatic retry on transient server errors (504, 503, 502). | |
| Returns the raw answer string, or raises on non-transient errors. | |
| """ | |
| transient_codes = ("504", "503", "502", "timeout", "Timeout", "timed out") | |
| for attempt in range(max_retries + 1): | |
| try: | |
| result = agent.run(task_input) | |
| # Guard against None / truly empty results | |
| if result is None or (isinstance(result, str) and not result.strip()): | |
| return "I could not determine the answer." | |
| return result | |
| except Exception as e: | |
| err = str(e) | |
| is_transient = any(code in err for code in transient_codes) | |
| if is_transient and attempt < max_retries: | |
| wait = 15 * (attempt + 1) # 15 s, 30 s | |
| print(f"Transient error on attempt {attempt + 1}, retrying in {wait}s: {err[:120]}") | |
| time.sleep(wait) | |
| continue | |
| raise # re-raise non-transient or exhausted retries | |
| def run_and_submit_all( profile: gr.OAuthProfile | None): | |
| """ | |
| Fetches all questions, runs the BasicAgent on them, submits all answers, | |
| and displays the results. | |
| """ | |
| # --- Determine HF Space Runtime URL and Repo URL --- | |
| space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
| if profile: | |
| username= f"{profile.username}" | |
| print(f"User logged in: {username}") | |
| else: | |
| print("User not logged in.") | |
| return "Please Login to Hugging Face with the button.", None | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| submit_url = f"{api_url}/submit" | |
| # 1. Instantiate Agent | |
| try: | |
| agent = build_agent() | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| return f"Error initializing agent: {e}", None | |
| # In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| print(agent_code) | |
| # 2. Fetch Questions | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| print("Fetched questions list is empty.") | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching questions: {e}") | |
| return f"Error fetching questions: {e}", None | |
| except requests.exceptions.JSONDecodeError as e: | |
| print(f"Error decoding JSON response from questions endpoint: {e}") | |
| print(f"Response text: {response.text[:500]}") | |
| return f"Error decoding server response for questions: {e}", None | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching questions: {e}") | |
| return f"An unexpected error occurred fetching questions: {e}", None | |
| # 3. Run your Agent | |
| results_log = [] | |
| answers_payload = [] | |
| print(f"Running agent on {len(questions_data)} questions...") | |
| for item in questions_data: | |
| task_id = item.get("task_id") | |
| # API returns 'Question' (capital Q); guard against both casings | |
| question_text = item.get("Question") or item.get("question") | |
| file_name = item.get("file_name", "") | |
| file_path = item.get("file_path", "") | |
| if not task_id or not question_text: | |
| print(f"Skipping item with missing task_id or question: {item}") | |
| continue | |
| # Build the task input: append file hint and ReAct instructions | |
| task_input = question_text | |
| if file_name: | |
| fp_arg = f", file_path='{file_path}'" if file_path else "" | |
| task_input += ( | |
| f"\n\n[Attached file: '{file_name}'. " | |
| f"Call read_task_file(task_id='{task_id}', file_name='{file_name}'{fp_arg}) " | |
| f"to download and read its contents before answering.]" | |
| ) | |
| task_input += REACT_INSTRUCTIONS | |
| try: | |
| raw_answer = _run_with_retry(agent, task_input) | |
| submitted_answer = _extract_final_answer(raw_answer) | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
| except Exception as e: | |
| print(f"Error running agent on task {task_id}: {e}") | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
| if not answers_payload: | |
| print("Agent did not produce any answers to submit.") | |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
| # 4. Prepare Submission | |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
| print(status_update) | |
| # 5. Submit | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| print("Submission successful.") | |
| results_df = pd.DataFrame(results_log) | |
| return final_status, results_df | |
| except requests.exceptions.HTTPError as e: | |
| error_detail = f"Server responded with status {e.response.status_code}." | |
| try: | |
| error_json = e.response.json() | |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_detail += f" Response: {e.response.text[:500]}" | |
| status_message = f"Submission Failed: {error_detail}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.Timeout: | |
| status_message = "Submission Failed: The request timed out." | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.RequestException as e: | |
| status_message = f"Submission Failed: Network error - {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except Exception as e: | |
| status_message = f"An unexpected error occurred during submission: {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| # --- Build Gradio Interface using Blocks --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Basic Agent Evaluation Runner") | |
| gr.Markdown( | |
| """ | |
| **Instructions:** | |
| 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
| 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
| 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
| --- | |
| **Disclaimers:** | |
| Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). | |
| This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. | |
| """ | |
| ) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Evaluation & Submit All Answers") | |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
| # Removed max_rows=10 from DataFrame constructor | |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
| run_button.click( | |
| fn=run_and_submit_all, | |
| outputs=[status_output, results_table] | |
| ) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " App Starting " + "-"*30) | |
| # Check for SPACE_HOST and SPACE_ID at startup for information | |
| space_host_startup = os.getenv("SPACE_HOST") | |
| space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
| if space_host_startup: | |
| print(f"✅ SPACE_HOST found: {space_host_startup}") | |
| print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
| else: | |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
| if space_id_startup: # Print repo URLs if SPACE_ID is found | |
| print(f"✅ SPACE_ID found: {space_id_startup}") | |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
| else: | |
| print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
| print("-"*(60 + len(" App Starting ")) + "\n") | |
| print("Launching Gradio Interface for Basic Agent Evaluation...") | |
| demo.launch(debug=True, share=False) |