| import os |
| import re |
| import base64 |
| import mimetypes |
| import subprocess |
| from pathlib import Path |
|
|
| import gradio as gr |
| import requests |
| import pandas as pd |
| from openai import OpenAI |
| from youtube_transcript_api import YouTubeTranscriptApi |
|
|
| print("BOOT: imports loaded", flush=True) |
|
|
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
| MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4.1-mini") |
| TRANSCRIBE_MODEL = os.getenv("TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe") |
| LLM_API_KEY = os.getenv("LLM_API_KEY", "") |
| TEST_MODE = os.getenv("TEST_MODE", "1") == "1" |
|
|
|
|
| def to_data_url(file_path: str) -> str: |
| mime, _ = mimetypes.guess_type(file_path) |
| if not mime: |
| mime = "application/octet-stream" |
| with open(file_path, "rb") as f: |
| encoded = base64.b64encode(f.read()).decode("utf-8") |
| return f"data:{mime};base64,{encoded}" |
|
|
|
|
| def clean_final_answer(text: str) -> str: |
| if not text: |
| return "" |
| text = text.strip() |
| text = re.sub(r"^\s*(final answer|answer)\s*[:\-]\s*", "", text, flags=re.I) |
| return text.strip().strip('"').strip("'") |
|
|
|
|
| def extract_youtube_id(text: str) -> str | None: |
| patterns = [ |
| r"youtube\.com/watch\?v=([A-Za-z0-9_-]{11})", |
| r"youtu\.be/([A-Za-z0-9_-]{11})", |
| ] |
| for pattern in patterns: |
| m = re.search(pattern, text) |
| if m: |
| return m.group(1) |
| return None |
|
|
|
|
| def answer_rules(question: str) -> str: |
| return ( |
| "Return ONLY the final answer.\n" |
| "Do not explain.\n" |
| "Do not include reasoning.\n" |
| "Do not say FINAL ANSWER.\n" |
| "Match the required format exactly.\n" |
| "If the question asks for a comma-separated list, return only that list.\n" |
| "If it asks for sorted/alphabetical output, obey exactly.\n" |
| f"\nQUESTION:\n{question}" |
| ) |
|
|
|
|
| class BasicAgent: |
| def __init__(self): |
| if not LLM_API_KEY: |
| raise ValueError("Missing LLM_API_KEY secret.") |
| self.client = OpenAI(api_key=LLM_API_KEY) |
| self.api_url = DEFAULT_API_URL |
| print(f"BOOT: agent initialized with model={MODEL_NAME}", flush=True) |
|
|
| def download_task_file(self, task_id: str, file_name: str) -> str | None: |
| if not file_name: |
| return None |
| url = f"{self.api_url}/files/{task_id}" |
| r = requests.get(url, timeout=60) |
| r.raise_for_status() |
| suffix = Path(file_name).suffix |
| local_path = f"/tmp/{task_id}{suffix}" |
| with open(local_path, "wb") as f: |
| f.write(r.content) |
| return local_path |
|
|
| def ask_plain(self, question: str, extra_context: str = "", image_path: str | None = None) -> str: |
| content = [{"type": "input_text", "text": answer_rules(question) + "\n\n" + extra_context}] |
| if image_path: |
| content.append({"type": "input_image", "image_url": to_data_url(image_path)}) |
|
|
| response = self.client.responses.create( |
| model=MODEL_NAME, |
| input=[{"role": "user", "content": content}], |
| ) |
| return clean_final_answer(response.output_text) |
|
|
| def ask_web(self, question: str, extra_context: str = "") -> str: |
| prompt = answer_rules(question) |
| if extra_context: |
| prompt += "\n\nCONTEXT:\n" + extra_context |
|
|
| response = self.client.responses.create( |
| model=MODEL_NAME, |
| tools=[{"type": "web_search"}], |
| input=prompt, |
| ) |
| return clean_final_answer(response.output_text) |
|
|
| def transcribe_audio(self, file_path: str) -> str: |
| with open(file_path, "rb") as audio_file: |
| transcript = self.client.audio.transcriptions.create( |
| model=TRANSCRIBE_MODEL, |
| file=audio_file, |
| ) |
| return getattr(transcript, "text", "") or "" |
|
|
| def get_youtube_transcript(self, question: str) -> str | None: |
| video_id = extract_youtube_id(question) |
| if not video_id: |
| return None |
| try: |
| transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=["en"]) |
| return " ".join(chunk["text"] for chunk in transcript) |
| except Exception as e: |
| print(f"YouTube transcript failed: {e}", flush=True) |
| return None |
|
|
| def summarize_excel(self, file_path: str) -> str: |
| blocks = [] |
| xls = pd.ExcelFile(file_path) |
| for sheet_name in xls.sheet_names[:5]: |
| df = pd.read_excel(file_path, sheet_name=sheet_name) |
| blocks.append(f"SHEET: {sheet_name}") |
| blocks.append("COLUMNS: " + ", ".join(map(str, df.columns.tolist()))) |
| blocks.append("ROWS:") |
| blocks.append(df.to_csv(index=False)) |
| blocks.append("") |
| return "\n".join(blocks)[:50000] |
|
|
| def execute_python_file(self, file_path: str) -> str: |
| result = subprocess.run( |
| ["python", file_path], |
| capture_output=True, |
| text=True, |
| timeout=30, |
| ) |
| return f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" |
|
|
| def read_text_file(self, file_path: str) -> str: |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: |
| return f.read() |
|
|
| def __call__(self, task: dict) -> str: |
| task_id = task.get("task_id", "") |
| question = task.get("question", "") |
| file_name = task.get("file_name", "") or "" |
|
|
| print(f"SOLVING task={task_id} file={file_name}", flush=True) |
|
|
| yt_transcript = self.get_youtube_transcript(question) |
| if yt_transcript: |
| return self.ask_plain( |
| question, |
| extra_context=f"YOUTUBE TRANSCRIPT:\n{yt_transcript[:40000]}", |
| ) |
|
|
| local_file = self.download_task_file(task_id, file_name) if file_name else None |
| if local_file: |
| ext = Path(local_file).suffix.lower() |
|
|
| if ext in {".mp3", ".wav", ".m4a", ".mpeg", ".mp4", ".webm"}: |
| transcript = self.transcribe_audio(local_file) |
| return self.ask_plain( |
| question, |
| extra_context=f"AUDIO TRANSCRIPT:\n{transcript[:30000]}", |
| ) |
|
|
| if ext in {".png", ".jpg", ".jpeg", ".webp"}: |
| return self.ask_plain(question, image_path=local_file) |
|
|
| if ext in {".xlsx", ".xls"}: |
| sheet_dump = self.summarize_excel(local_file) |
| return self.ask_plain( |
| question, |
| extra_context=f"SPREADSHEET CONTENT:\n{sheet_dump}", |
| ) |
|
|
| if ext == ".py": |
| code_text = self.read_text_file(local_file) |
| exec_text = self.execute_python_file(local_file) |
| return self.ask_plain( |
| question, |
| extra_context=f"PYTHON FILE:\n{code_text}\n\nEXECUTION RESULT:\n{exec_text}", |
| ) |
|
|
| text_data = self.read_text_file(local_file) |
| return self.ask_plain(question, extra_context=text_data[:40000]) |
|
|
| return self.ask_web(question) |
|
|
|
|
| def run_and_submit_all(profile: gr.OAuthProfile | None): |
| space_id = os.getenv("SPACE_ID", "") |
|
|
| if profile: |
| username = f"{profile.username}" |
| print(f"User logged in: {username}", flush=True) |
| else: |
| print("User not logged in.", flush=True) |
| return "Please login to Hugging Face first.", None |
|
|
| api_url = DEFAULT_API_URL |
| questions_url = f"{api_url}/questions" |
| submit_url = f"{api_url}/submit" |
|
|
| try: |
| agent = BasicAgent() |
| except Exception as e: |
| print(f"Agent init error: {e}", flush=True) |
| return f"Error initializing agent: {e}", None |
|
|
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
|
| try: |
| if TEST_MODE: |
| print("TEST_MODE=1 -> fetching /random-question", flush=True) |
| response = requests.get(f"{api_url}/random-question", timeout=30) |
| response.raise_for_status() |
| questions_data = [response.json()] |
| else: |
| print("TEST_MODE=0 -> fetching /questions", flush=True) |
| response = requests.get(questions_url, timeout=30) |
| response.raise_for_status() |
| questions_data = response.json() |
|
|
| if not questions_data: |
| return "No questions returned by API.", None |
|
|
| print(f"Fetched {len(questions_data)} questions.", flush=True) |
| except Exception as e: |
| print(f"Question fetch error: {e}", flush=True) |
| return f"Error fetching questions: {e}", None |
|
|
| results_log = [] |
| answers_payload = [] |
|
|
| for item in questions_data: |
| task_id = item.get("task_id") |
| question_text = item.get("question") |
| if not task_id or question_text is None: |
| continue |
|
|
| try: |
| submitted_answer = agent(item) |
| answers_payload.append({ |
| "task_id": task_id, |
| "submitted_answer": submitted_answer |
| }) |
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text, |
| "File": item.get("file_name", ""), |
| "Submitted Answer": submitted_answer |
| }) |
| except Exception as e: |
| print(f"Task error {task_id}: {e}", flush=True) |
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text, |
| "File": item.get("file_name", ""), |
| "Submitted Answer": f"AGENT ERROR: {e}" |
| }) |
|
|
| if TEST_MODE: |
| return "Test mode finished. Check the answer table below before running full evaluation.", pd.DataFrame(results_log) |
|
|
| if not answers_payload: |
| return "Agent produced no answers.", pd.DataFrame(results_log) |
|
|
| submission_data = { |
| "username": username.strip(), |
| "agent_code": agent_code, |
| "answers": answers_payload |
| } |
|
|
| try: |
| response = requests.post(submit_url, json=submission_data, timeout=120) |
| response.raise_for_status() |
| result_data = response.json() |
|
|
| final_status = ( |
| f"Submission Successful!\n" |
| f"User: {result_data.get('username')}\n" |
| f"Overall Score: {result_data.get('score', 'N/A')}% " |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
| f"Message: {result_data.get('message', 'No message received.')}" |
| ) |
| return final_status, pd.DataFrame(results_log) |
|
|
| except requests.exceptions.HTTPError as e: |
| detail = f"Server responded with status {e.response.status_code}." |
| try: |
| detail += f" Detail: {e.response.json()}" |
| except Exception: |
| detail += f" Response: {e.response.text[:500]}" |
| return f"Submission failed: {detail}", pd.DataFrame(results_log) |
|
|
| except Exception as e: |
| return f"Submission failed: {e}", pd.DataFrame(results_log) |
|
|
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("# Basic Agent Evaluation Runner") |
| gr.Markdown( |
| """ |
| 1. Login with Hugging Face. |
| 2. In TEST_MODE=1, this runs one random question only. |
| 3. Change TEST_MODE=0 for full evaluation and submission. |
| """ |
| ) |
|
|
| gr.LoginButton() |
| run_button = gr.Button("Run Evaluation & Submit All Answers") |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=6, interactive=False) |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
| run_button.click( |
| fn=run_and_submit_all, |
| outputs=[status_output, results_table] |
| ) |
|
|
| print("BOOT: gradio blocks created", flush=True) |
|
|
| if __name__ == "__main__": |
| print("BOOT: launching gradio", flush=True) |
| port = int(os.environ.get("PORT", "7860")) |
| demo.launch(server_name="0.0.0.0", server_port=port, show_error=True) |