import os import gradio as gr import requests import pandas as pd import logging import json import time from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed # Stage 1: Import GAIAAgent (LangGraph-based agent) from src.agent import GAIAAgent # Import ground truth comparison from src.utils.ground_truth import get_ground_truth # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Suppress noisy third-party logs (only show WARNING+) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("huggingface_hub").setLevel(logging.WARNING) logging.getLogger("gradio").setLevel(logging.WARNING) # (Keep Constants as is) # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # --- Helper Functions --- def check_api_keys(): """Check which API keys are configured.""" keys_status = { "GOOGLE_API_KEY (Gemini)": "✓ SET" if os.getenv("GOOGLE_API_KEY") else "✗ MISSING", "HF_TOKEN (HuggingFace)": "✓ SET" if os.getenv("HF_TOKEN") else "✗ MISSING", "ANTHROPIC_API_KEY (Claude)": "✓ SET" if os.getenv("ANTHROPIC_API_KEY") else "✗ MISSING", "TAVILY_API_KEY (Search)": "✓ SET" if os.getenv("TAVILY_API_KEY") else "✗ MISSING", "EXA_API_KEY (Search)": "✓ SET" if os.getenv("EXA_API_KEY") else "✗ MISSING", } return "\n".join([f"{k}: {v}" for k, v in keys_status.items()]) def _build_export_data( results_log: list, submission_status: str, execution_time: float = None, submission_response: dict = None, ) -> dict: """Build canonical export data structure. Single source of truth for both JSON and HTML exports. Returns dict with metadata and results arrays. Args: results_log: List of question results (source of truth) submission_status: Status message from submission execution_time: Total execution time in seconds submission_response: Response from GAIA API with correctness info Returns: Dict with {metadata: {...}, submission_status: str, results: [...]} """ from datetime import datetime # Build metadata metadata = { "generated": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), "total_questions": len(results_log), } if execution_time is not None: metadata["execution_time_seconds"] = round(execution_time, 2) metadata["execution_time_formatted"] = ( f"{int(execution_time // 60)}m {int(execution_time % 60)}s" ) if submission_response: metadata["score_percent"] = submission_response.get("score") metadata["correct_count"] = submission_response.get("correct_count") metadata["total_attempted"] = submission_response.get("total_attempted") # Build results array with all fields from results_log results_array = [] for result in results_log: result_dict = { "task_id": result.get("Task ID", "N/A"), "question": result.get("Question", "N/A"), "system_error": result.get("System Error", "no"), "submitted_answer": result.get("Submitted Answer", "N/A"), } if result.get("System Error") == "yes" and result.get("Error Log"): result_dict["error_log"] = result.get("Error Log") if result.get("Correct?"): result_dict["correct"] = ( True if result.get("Correct?") == "✅ Yes" else False ) if result.get("Ground Truth Answer"): result_dict["ground_truth_answer"] = result.get("Ground Truth Answer") if result.get("annotator_metadata"): result_dict["annotator_metadata"] = result.get("annotator_metadata") results_array.append(result_dict) return { "metadata": metadata, "submission_status": submission_status, "results": results_array, } def export_results_to_json( results_log: list, submission_status: str, execution_time: float = None, submission_response: dict = None, ) -> str: """Export evaluation results to JSON file. - Saves to ./_cache/gaia_results_TIMESTAMP.json - Uses canonical data builder for consistency with HTML export - Single source of truth: _build_export_data() Args: results_log: List of question results (single source of truth) submission_status: Status message from submission execution_time: Total execution time in seconds submission_response: Response from GAIA API with correctness info Returns: File path to JSON file """ from datetime import datetime # Get canonical data structure export_data = _build_export_data( results_log, submission_status, execution_time, submission_response ) # Generate filename timestamp = export_data["metadata"]["timestamp"] filename = f"gaia_results_{timestamp}.json" cache_dir = os.path.join(os.getcwd(), "_cache") os.makedirs(cache_dir, exist_ok=True) filepath = os.path.join(cache_dir, filename) # Write JSON file with open(filepath, "w", encoding="utf-8") as f: json.dump(export_data, f, indent=2, ensure_ascii=False) logger.info(f"JSON exported to: {filepath}") return filepath def export_results_to_html( results_log: list, submission_status: str, execution_time: float = None, submission_response: dict = None, ) -> str: """Export evaluation results to HTML file. - Saves to ./_cache/gaia_results_TIMESTAMP.html - Uses canonical data builder for consistency with JSON export - Single source of truth: _build_export_data() Args: results_log: List of question results (single source of truth) submission_status: Status message from submission execution_time: Total execution time in seconds submission_response: Response from GAIA API with correctness info Returns: File path to HTML file """ from datetime import datetime import html as html_escape # Get canonical data structure (same source as JSON) export_data = _build_export_data( results_log, submission_status, execution_time, submission_response ) metadata = export_data.get("metadata", {}) results_array = export_data.get("results", []) # Generate filename timestamp = metadata["timestamp"] filename = f"gaia_results_{timestamp}.html" cache_dir = os.path.join(os.getcwd(), "_cache") os.makedirs(cache_dir, exist_ok=True) filepath = os.path.join(cache_dir, filename) def escape(text): """Escape HTML special characters.""" if text is None: return "" return html_escape.escape(str(text)) # Build HTML content html_parts = [] html_parts.append(""" GAIA Agent Evaluation Results

GAIA Agent Evaluation Results

Metadata

Generated: """ + escape(metadata.get("generated", "N/A")) + """

Total Questions: """ + str(metadata.get("total_questions", len(results_array))) + """

""") if "execution_time_formatted" in metadata: html_parts.append(f"""

Execution Time: {escape(metadata["execution_time_formatted"])}

""") if "score_percent" in metadata: html_parts.append(f"""

Score: {escape(metadata["score_percent"])}%

Correct: {escape(metadata["correct_count"])}/{escape(metadata["total_attempted"])}

""") html_parts.append(f"""

Status: {escape(export_data.get("submission_status", "N/A"))}

Results (matching JSON structure)

""") for idx, result in enumerate(results_array, 1): task_id = escape(result.get("task_id", "N/A")) question = escape(result.get("question", "N/A")) submitted_answer = escape(result.get("submitted_answer", "N/A")) correct = result.get("correct") # boolean or null system_error = escape(result.get("system_error", "no")) error_log = escape(result.get("error_log", "")) ground_truth = escape(result.get("ground_truth_answer", "N/A")) # Format correct status (boolean from JSON) if correct is True: correct_display = 'true' elif correct is False: correct_display = 'false' else: correct_display = 'null' # Format system_error if system_error == "yes": error_display = f'yes' else: error_display = system_error html_parts.append(f""" """) html_parts.append("""
# task_id question submitted_answer correct system_error error_log ground_truth_answer
{idx} {task_id}
{question}
{submitted_answer}
{correct_display} {error_display}
{error_log if error_log else '-'}
{ground_truth}
""") # Write HTML file with open(filepath, "w", encoding="utf-8") as f: f.write("\n".join(html_parts)) logger.info(f"HTML exported to: {filepath}") return filepath def format_diagnostics(final_state: dict) -> str: """Format agent state for diagnostic display.""" diagnostics = [] # Question diagnostics.append(f"**Question:** {final_state.get('question', 'N/A')}\n") # Plan plan = final_state.get("plan", "No plan generated") diagnostics.append(f"**Plan:**\n{plan}\n") # Tool calls tool_calls = final_state.get("tool_calls", []) if tool_calls: diagnostics.append(f"**Tools Selected:** {len(tool_calls)} tool(s)") for idx, tc in enumerate(tool_calls, 1): tool_name = tc.get("tool", "unknown") params = tc.get("params", {}) diagnostics.append(f" {idx}. {tool_name}({params})") diagnostics.append("") else: diagnostics.append("**Tools Selected:** None\n") # Tool results tool_results = final_state.get("tool_results", []) if tool_results: diagnostics.append(f"**Tool Execution Results:** {len(tool_results)} result(s)") for idx, tr in enumerate(tool_results, 1): tool_name = tr.get("tool", "unknown") status = tr.get("status", "unknown") if status == "success": result_preview = ( str(tr.get("result", ""))[:100] + "..." if len(str(tr.get("result", ""))) > 100 else str(tr.get("result", "")) ) diagnostics.append(f" {idx}. {tool_name}: ✓ SUCCESS") diagnostics.append(f" Result: {result_preview}") else: error = tr.get("error", "Unknown error") diagnostics.append(f" {idx}. {tool_name}: ✗ FAILED - {error}") diagnostics.append("") # Evidence evidence = final_state.get("evidence", []) if evidence: diagnostics.append(f"**Evidence Collected:** {len(evidence)} item(s)") for idx, ev in enumerate(evidence, 1): ev_preview = ev[:150] + "..." if len(ev) > 150 else ev diagnostics.append(f" {idx}. {ev_preview}") diagnostics.append("") else: diagnostics.append("**Evidence Collected:** None\n") # Errors errors = final_state.get("errors", []) if errors: diagnostics.append(f"**Errors:** {len(errors)} error(s)") for idx, err in enumerate(errors, 1): diagnostics.append(f" {idx}. {err}") diagnostics.append("") # Answer answer = final_state.get("answer", "No answer generated") diagnostics.append(f"**Final Answer:** {answer}") return "\n".join(diagnostics) def download_task_file( task_id: str, file_name: str, save_dir: str = "_cache/gaia_files/" ): """Download file attached to a GAIA question from the GAIA dataset on HuggingFace. The evaluation API's /files/{task_id} endpoint returns 404 because files are not hosted there. Files must be downloaded from the official GAIA dataset instead. Files are cached in _cache/ directory (runtime cache, not in git). Args: task_id: Question's task_id (used for logging) file_name: Original file name from API (e.g., "task_id.png") save_dir: Directory to save file (created if not exists) Returns: File path if downloaded successfully, None if download failed """ import shutil from huggingface_hub import hf_hub_download import tempfile # GAIA dataset file structure: 2023/validation/{task_id}.{ext} # Extract file extension from file_name _, ext = os.path.splitext(file_name) ext = ext.lower() # Try validation set first (most questions are from validation) repo_id = "gaia-benchmark/GAIA" possible_paths = [ f"2023/validation/{task_id}{ext}", f"2023/test/{task_id}{ext}", ] # Create save directory if not exists (relative to script location) # Use script's directory as base to ensure paths work in all environments (local, HF Space) script_dir = Path(__file__).parent.absolute() cache_dir = script_dir / save_dir cache_dir.mkdir(exist_ok=True, parents=True) target_path = str(cache_dir / file_name) # Check if file already exists in cache (use absolute path for check) if os.path.exists(target_path): logger.info(f"Using cached file for {task_id}: {target_path}") return target_path # Try each possible path for dataset_path in possible_paths: try: logger.info(f"Attempting to download {dataset_path} from GAIA dataset...") # Download to temp dir first to get the file with tempfile.TemporaryDirectory() as temp_dir: downloaded_path = hf_hub_download( repo_id=repo_id, filename=dataset_path, repo_type="dataset", local_dir=temp_dir, ) # Copy file to target location (flat structure in cache) shutil.copy(downloaded_path, target_path) logger.info(f"Downloaded file for {task_id}: {target_path}") return target_path except Exception as e: logger.debug(f"Path {dataset_path} not found: {e}") continue logger.warning(f"File not found in GAIA dataset for task {task_id}") return None def test_single_question(question: str, llm_provider: str): """Test agent with a single question and return diagnostics.""" if not question or not question.strip(): return "Please enter a question.", "", check_api_keys() try: # Set LLM provider from UI selection (overrides .env) os.environ["LLM_PROVIDER"] = llm_provider.lower() logger.info(f"UI Config: LLM_PROVIDER={llm_provider}") # Initialize agent agent = GAIAAgent() # Run agent (this stores final_state in agent.last_state) answer = agent(question) # Get final state from agent final_state = agent.last_state or {} # Format diagnostics with LLM provider info provider_info = f"**LLM Provider:** {llm_provider}\n\n" diagnostics = provider_info + format_diagnostics(final_state) api_status = check_api_keys() return answer, diagnostics, api_status except Exception as e: logger.error(f"Error in test_single_question: {e}", exc_info=True) return f"ERROR: {str(e)}", f"Exception occurred: {str(e)}", check_api_keys() # --- GAIA Agent (Replaced BasicAgent) --- # LangGraph-based agent with sequential workflow # Stage 1: Placeholder nodes, returns fixed answer # Stage 2: Tool integration # Stage 3: Planning and reasoning logic # Stage 4: Error handling and robustness # Stage 5: Performance optimization # Stage 6: Async processing with ThreadPoolExecutor def a_determine_status(answer: str) -> tuple[bool, str | None]: """Determine if response is system error or AI answer. Returns: (is_system_error, error_log) - is_system_error: True if system error, False if AI answer - error_log: Full error message if system error, None otherwise """ if not answer: return True, "Empty answer" answer_lower = answer.lower().strip() # System/technical errors from our code if answer_lower.startswith("error:") or "no evidence collected" in answer_lower: return True, answer # Full error message as log # Everything else is an AI response (including "Unable to answer") return False, None def process_single_question(agent, item, index, total): """Process single question with agent, return result with error handling. Supports file attachments - downloads files before processing. Args: agent: GAIAAgent instance item: Question item dict with task_id, question, and optional file_name index: Question index (0-based) total: Total number of questions Returns: dict: Result containing task_id, question, answer, and error flag """ task_id = item.get("task_id") question_text = item.get("question") file_name = item.get("file_name") if not task_id or question_text is None: answer = "ERROR: Missing task_id or question" is_error, error_log = a_determine_status(answer) return { "task_id": task_id, "question": question_text, "answer": answer, "system_error": "yes" if is_error else "no", "error_log": error_log, "error": True, } # Download file if question has attachment file_path = None if file_name: file_path = download_task_file(task_id, file_name) if file_path: logger.info(f"[{index + 1}/{total}] File downloaded: {file_path}") else: logger.warning(f"[{index + 1}/{total}] File expected but not downloaded") try: logger.info(f"[{index + 1}/{total}] Processing {task_id[:8]}...") # Pass file_path to agent if available submitted_answer = agent(question_text, file_path=file_path) logger.info(f"[{index + 1}/{total}] Completed {task_id[:8]}") is_error, error_log = a_determine_status(submitted_answer) return { "task_id": task_id, "question": question_text, "answer": submitted_answer, "system_error": "yes" if is_error else "no", "error_log": error_log, "error": False, } except Exception as e: logger.error(f"[{index + 1}/{total}] Error {task_id[:8]}: {e}") answer = f"ERROR: {str(e)}" is_error, error_log = a_determine_status(answer) return { "task_id": task_id, "question": question_text, "answer": answer, "system_error": "yes" if is_error else "no", "error_log": error_log, "error": True, } def run_and_submit_all( llm_provider: str, video_mode: str = "Transcript", question_limit: int = 0, task_ids: str = "", profile: gr.OAuthProfile | None = None, ): """ Fetches all questions, runs the BasicAgent on them, submits all answers, and displays the results. Args: llm_provider: LLM provider to use video_mode: YouTube processing mode ("Transcript" or "Frames") question_limit: Limit number of questions (0 = process all) task_ids: Comma-separated task IDs to target (overrides question_limit) profile: OAuth profile for HF login """ # Start execution timer start_time = time.time() # --- Determine HF Space Runtime URL and Repo URL --- space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code if profile: username = f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", "", "" api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" # Set LLM provider from UI selection (overrides .env) os.environ["LLM_PROVIDER"] = llm_provider.lower() logger.info(f"UI Config for Full Evaluation: LLM_PROVIDER={llm_provider}") # Set YouTube video processing mode from UI selection os.environ["YOUTUBE_MODE"] = video_mode.lower() logger.info(f"UI Config for Full Evaluation: YOUTUBE_MODE={video_mode}") # 1. Instantiate Agent (Stage 1: GAIAAgent with LangGraph) try: logger.info("Initializing GAIAAgent...") agent = GAIAAgent() logger.info("GAIAAgent initialized successfully") except Exception as e: logger.error(f"Error instantiating agent: {e}") print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", "", "" # In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" print(agent_code) # 2. Fetch Questions print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None, "" # Apply question limit if configured (from UI or .env) limit = ( int(question_limit) if question_limit > 0 else int(os.getenv("DEBUG_QUESTION_LIMIT", "0")) ) if limit > 0: questions_data = questions_data[:limit] logger.warning(f"DEBUG MODE: Limited to first {limit} questions") print( f"DEBUG MODE: Processing only {limit} questions (set to 0 to process all)" ) # Filter by specific task IDs if provided (overrides question limit) if task_ids and task_ids.strip(): target_ids = [tid.strip() for tid in task_ids.split(",")] original_count = len(questions_data) questions_data = [ q for q in questions_data if q.get("task_id") in target_ids ] found_ids = [q.get("task_id") for q in questions_data] missing_ids = set(target_ids) - set(found_ids) if missing_ids: logger.warning(f"Task IDs not found: {missing_ids}") logger.warning( f"DEBUG MODE: Targeted {len(questions_data)}/{original_count} questions by task_id" ) print( f"DEBUG MODE: Processing {len(questions_data)} targeted questions " f"({len(missing_ids)} IDs not found: {missing_ids})" ) print(f"Processing {len(questions_data)} questions.") except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return f"Error fetching questions: {e}", None, "" except requests.exceptions.JSONDecodeError as e: print(f"Error decoding JSON response from questions endpoint: {e}") print(f"Response text: {response.text[:500]}") return f"Error decoding server response for questions: {e}", None, "" except Exception as e: print(f"An unexpected error occurred fetching questions: {e}") return f"An unexpected error occurred fetching questions: {e}", None, "" # 2.5. Load ground truth for local comparison (validation set only) ground_truth = get_ground_truth() if ground_truth.load_validation_set(): logger.info("Ground truth loaded - per-question correctness will be available") else: logger.warning("Ground truth not loaded - per-question correctness unavailable") # 3. Run your Agent (Stage 6: Concurrent processing) max_workers = int(os.getenv("MAX_CONCURRENT_WORKERS", "5")) results_log = [] answers_payload = [] logger.info( f"Running agent on {len(questions_data)} questions with {max_workers} workers..." ) with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all questions for concurrent processing future_to_index = { executor.submit( process_single_question, agent, item, idx, len(questions_data) ): idx for idx, item in enumerate(questions_data) } # Collect results as they complete for future in as_completed(future_to_index): result = future.result() # Compare with ground truth if available is_correct = ground_truth.compare_answer( result["task_id"], result["answer"] ) # Get ground truth answer and metadata (fetch once) gt_answer = ground_truth.get_answer(result["task_id"]) metadata_item = ground_truth.metadata.get(result["task_id"], {}) annotator_metadata = metadata_item.get("Annotator Metadata", {}) # Add to results log result_entry = { "Task ID": result["task_id"], "Question": result["question"], "System Error": result.get("system_error", "no"), "Submitted Answer": "" if result.get("system_error") == "yes" else result["answer"], } # Add error log if system error if result.get("system_error") == "yes" and result.get("error_log"): result_entry["Error Log"] = result["error_log"] # Add ground truth data if available if is_correct is not None: result_entry["Correct?"] = "✅ Yes" if is_correct else "❌ No" result_entry["Ground Truth Answer"] = gt_answer # Store metadata (both UI and JSON show identical data) result_entry["annotator_metadata"] = annotator_metadata results_log.append(result_entry) # Add to submission payload if no system error if result.get("system_error") == "no": answers_payload.append( {"task_id": result["task_id"], "submitted_answer": result["answer"]} ) # Log progress logger.info( f"Progress: {len(results_log)}/{len(questions_data)} questions processed" ) if not answers_payload: print("Agent did not produce any answers to submit.") status_message = "Agent did not produce any answers to submit." execution_time = time.time() - start_time json_path = export_results_to_json( results_log, status_message, execution_time, None ) html_path = export_results_to_html( results_log, status_message, execution_time, None ) return status_message, json_path, html_path # 4. Prepare Submission submission_data = { "username": username.strip(), "agent_code": agent_code, "answers": answers_payload, } status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." print(status_update) # 5. Submit print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") execution_time = time.time() - start_time logger.info( f"Total execution time: {execution_time:.2f} seconds ({int(execution_time // 60)}m {int(execution_time % 60)}s)" ) # LIMITATION: GAIA API does NOT provide per-question correctness data # API response structure: {username, score, correct_count, total_attempted, message, timestamp} # No "results" array exists - we only get summary stats, not which specific questions are correct # Therefore: UI table has no "Correct?" column, JSON export shows "correct": null for all questions # Export to JSON with execution time and submission response json_path = export_results_to_json( results_log, final_status, execution_time, result_data ) html_path = export_results_to_html( results_log, final_status, execution_time, result_data ) return final_status, json_path, html_path except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" print(status_message) execution_time = time.time() - start_time json_path = export_results_to_json( results_log, status_message, execution_time, None ) html_path = export_results_to_html( results_log, status_message, execution_time, None ) return status_message, json_path, html_path except requests.exceptions.Timeout: status_message = "Submission Failed: The request timed out." print(status_message) execution_time = time.time() - start_time json_path = export_results_to_json( results_log, status_message, execution_time, None ) html_path = export_results_to_html( results_log, status_message, execution_time, None ) return status_message, json_path, html_path except requests.exceptions.RequestException as e: status_message = f"Submission Failed: Network error - {e}" print(status_message) execution_time = time.time() - start_time json_path = export_results_to_json( results_log, status_message, execution_time, None ) html_path = export_results_to_html( results_log, status_message, execution_time, None ) return status_message, json_path, html_path except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) execution_time = time.time() - start_time json_path = export_results_to_json( results_log, status_message, execution_time, None ) html_path = export_results_to_html( results_log, status_message, execution_time, None ) return status_message, json_path, html_path # --- Build Gradio Interface using Blocks --- with gr.Blocks() as demo: gr.Markdown("# GAIA Agent Evaluation Runner") gr.Markdown( """ **Stage 4 Progress:** Adding diagnostics, error handling, and fallback mechanisms. """ ) with gr.Tabs(): # Tab 1: Full Evaluation (primary functionality) with gr.Tab("📊 Full Evaluation"): gr.Markdown( """ **Quick Start:** 1. **Log in** to your Hugging Face account (uses your username for leaderboard submission) 2. **Select LLM Provider** (Gemini/HuggingFace/Groq/Claude) 3. **Click "Run Evaluation & Submit All Answers"** **What happens:** - Fetches GAIA benchmark questions - Runs your agent on each question using selected LLM - Submits answers to official leaderboard - Returns downloadable results (JSON + HTML) **Expectations:** - Full evaluation takes time (agent processes all questions sequentially) - Download files appear below when complete """ ) gr.LoginButton() with gr.Row(): eval_llm_provider_dropdown = gr.Dropdown( label="LLM Provider for Evaluation", choices=["Gemini", "HuggingFace", "Groq", "Claude"], value="HuggingFace", info="Select which LLM to use for all questions", ) eval_video_mode = gr.Radio( label="YouTube Processing Mode", choices=["Transcript", "Frames"], value="Transcript", info="Transcript: Audio/subtitle extraction (fast) | Frames: Visual analysis with vision models (slower)", ) eval_question_limit = gr.Number( label="Question Limit (Debug)", value=0, precision=0, minimum=0, maximum=165, info="Limit questions for testing (0 = process all)", ) with gr.Row(): eval_task_ids = gr.Textbox( label="Target Task IDs (Debug)", value="", placeholder="task_id1, task_id2, ...", info="Comma-separated task IDs to run (overrides question limit)", lines=1, ) run_button = gr.Button("Run Evaluation & Submit All Answers") status_output = gr.Textbox( label="Run Status / Submission Result", lines=5, interactive=False ) # Export buttons - JSON and HTML json_export = gr.File(label="Download JSON Results", type="filepath") html_export = gr.File(label="Download HTML Results", type="filepath") run_button.click( fn=run_and_submit_all, inputs=[ eval_llm_provider_dropdown, eval_video_mode, eval_question_limit, eval_task_ids, ], outputs=[status_output, json_export, html_export], ) # Tab 2: Test Single Question (debugging/diagnostics) with gr.Tab("🔍 Test & Debug"): gr.Markdown(""" **Test Mode:** Run the agent on a single question and see detailed diagnostics. This mode shows: - API key status - Execution plan - Tools selected and executed - Evidence collected - Errors encountered - Final answer """) test_question_input = gr.Textbox( label="Enter Test Question", placeholder="e.g., What is the capital of France?", lines=3, ) with gr.Row(): llm_provider_dropdown = gr.Dropdown( label="LLM Provider", choices=["Gemini", "HuggingFace", "Groq", "Claude"], value="HuggingFace", info="Select which LLM to use for this test", ) test_button = gr.Button("Run Test", variant="primary") with gr.Row(): with gr.Column(scale=1): test_answer_output = gr.Textbox( label="Answer", lines=3, interactive=False ) test_api_status = gr.Textbox( label="API Keys Status", lines=5, interactive=False ) with gr.Column(scale=2): test_diagnostics_output = gr.Textbox( label="Execution Diagnostics", lines=20, interactive=False ) test_button.click( fn=test_single_question, inputs=[ test_question_input, llm_provider_dropdown, ], outputs=[test_answer_output, test_diagnostics_output, test_api_status], ) if __name__ == "__main__": print("\n" + "-" * 30 + " App Starting " + "-" * 30) # Check for SPACE_HOST and SPACE_ID at startup for information space_host_startup = os.getenv("SPACE_HOST") space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup if space_host_startup: print(f"✅ SPACE_HOST found: {space_host_startup}") print(f" Runtime URL should be: https://{space_host_startup}.hf.space") else: print("ℹ️ SPACE_HOST environment variable not found (running locally?).") if space_id_startup: # Print repo URLs if SPACE_ID is found print(f"✅ SPACE_ID found: {space_id_startup}") print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") print( f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main" ) else: print( "ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined." ) print("-" * (60 + len(" App Starting ")) + "\n") print("Launching Gradio Interface for Basic Agent Evaluation...") demo.launch(debug=True, share=False)