| import os |
| import re |
| import io |
| import base64 |
| import mimetypes |
| import tempfile |
| from pathlib import Path |
|
|
| import gradio as gr |
| import requests |
| import pandas as pd |
| from openai import OpenAI |
|
|
| |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
| |
| |
| |
| |
| |
| LLM_API_KEY = os.getenv("LLM_API_KEY", "") |
| LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1") |
| MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") |
| TRANSCRIBE_MODEL = os.getenv("TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe") |
|
|
|
|
| def to_data_url(file_path: str) -> str: |
| mime, _ = mimetypes.guess_type(file_path) |
| if not mime: |
| mime = "application/octet-stream" |
| with open(file_path, "rb") as f: |
| b64 = base64.b64encode(f.read()).decode("utf-8") |
| return f"data:{mime};base64,{b64}" |
|
|
|
|
| def clean_final_answer(text: str) -> str: |
| if not text: |
| return "" |
| text = text.strip() |
| text = re.sub(r"^\s*(final answer|answer)\s*[:\-]\s*", "", text, flags=re.I) |
| text = text.strip().strip('"').strip("'") |
| return text |
|
|
|
|
| def extract_urls(text: str): |
| return re.findall(r"https?://[^\s)\]]+", text or "") |
|
|
|
|
| class BasicAgent: |
| def __init__(self): |
| if not LLM_API_KEY: |
| raise ValueError("Missing LLM_API_KEY in Space Secrets.") |
| self.client = OpenAI(api_key=LLM_API_KEY, base_url=LLM_BASE_URL) |
| self.api_url = DEFAULT_API_URL |
| print("BasicAgent initialized.") |
|
|
| def download_task_file(self, task_id: str, file_name: str) -> str | None: |
| if not file_name: |
| return None |
|
|
| url = f"{self.api_url}/files/{task_id}" |
| print(f"Downloading attached file from {url}") |
|
|
| r = requests.get(url, timeout=60) |
| if r.status_code != 200: |
| print(f"Could not fetch file for task {task_id}: {r.status_code}") |
| return None |
|
|
| suffix = Path(file_name).suffix or "" |
| fd, tmp_path = tempfile.mkstemp(suffix=suffix) |
| os.close(fd) |
| with open(tmp_path, "wb") as f: |
| f.write(r.content) |
| return tmp_path |
|
|
| def read_text_like_file(self, file_path: str) -> str | None: |
| suffix = Path(file_path).suffix.lower() |
| if suffix not in {".txt", ".md", ".json", ".csv", ".py", ".html"}: |
| return None |
|
|
| try: |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: |
| data = f.read() |
| return data[:15000] |
| except Exception as e: |
| return f"[Could not read text file: {e}]" |
|
|
| def summarize_spreadsheet(self, file_path: str) -> str: |
| try: |
| xls = pd.ExcelFile(file_path) |
| out = [] |
| for sheet_name in xls.sheet_names[:3]: |
| df = pd.read_excel(file_path, sheet_name=sheet_name) |
| out.append(f"Sheet: {sheet_name}") |
| out.append("Columns: " + ", ".join(map(str, df.columns.tolist()))) |
| out.append("Preview:") |
| out.append(df.head(20).to_csv(index=False)) |
| out.append("") |
| return "\n".join(out)[:15000] |
| except Exception as e: |
| return f"[Could not read spreadsheet: {e}]" |
|
|
| def transcribe_audio(self, file_path: str) -> str: |
| try: |
| with open(file_path, "rb") as audio_file: |
| transcript = self.client.audio.transcriptions.create( |
| model=TRANSCRIBE_MODEL, |
| file=audio_file, |
| ) |
| text = getattr(transcript, "text", "") or "" |
| return text[:12000] |
| except Exception as e: |
| return f"[Could not transcribe audio: {e}]" |
|
|
| def fetch_web_context(self, question: str) -> str: |
| urls = extract_urls(question) |
| if not urls: |
| return "" |
|
|
| chunks = [] |
| for url in urls[:2]: |
| try: |
| r = requests.get(url, timeout=30, headers={"User-Agent": "Mozilla/5.0"}) |
| content = r.text[:12000] |
| chunks.append(f"URL: {url}\nCONTENT:\n{content}\n") |
| except Exception as e: |
| chunks.append(f"URL: {url}\n[Could not fetch: {e}]") |
| return "\n\n".join(chunks) |
|
|
| def ask_model(self, question: str, extra_context: str = "", image_paths=None) -> str: |
| image_paths = image_paths or [] |
|
|
| system_prompt = ( |
| "You solve benchmark questions carefully.\n" |
| "Return ONLY the final answer.\n" |
| "Do not add explanations.\n" |
| "Do not add 'FINAL ANSWER'.\n" |
| "Keep formatting exactly as requested in the question.\n" |
| "If the question asks for alphabetical order, preserve it.\n" |
| "If it asks for comma-separated output, return only that comma-separated output.\n" |
| "If it asks for a name, return only the name requested.\n" |
| ) |
|
|
| user_parts = [] |
| user_parts.append({ |
| "type": "text", |
| "text": f"QUESTION:\n{question}\n\nEXTRA CONTEXT:\n{extra_context[:20000]}" |
| }) |
|
|
| for img in image_paths[:3]: |
| user_parts.append({ |
| "type": "image_url", |
| "image_url": {"url": to_data_url(img)} |
| }) |
|
|
| response = self.client.chat.completions.create( |
| model=MODEL_NAME, |
| temperature=0, |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_parts}, |
| ], |
| ) |
|
|
| answer = response.choices[0].message.content or "" |
| return clean_final_answer(answer) |
|
|
| def __call__(self, task: dict) -> str: |
| task_id = task.get("task_id", "") |
| question = task.get("question", "") |
| file_name = task.get("file_name", "") or "" |
|
|
| print(f"Task {task_id} | file_name={file_name}") |
|
|
| extra_context = [] |
| image_paths = [] |
|
|
| |
| web_context = self.fetch_web_context(question) |
| if web_context: |
| extra_context.append("WEB CONTEXT:\n" + web_context) |
|
|
| |
| local_file = None |
| if file_name: |
| local_file = self.download_task_file(task_id, file_name) |
|
|
| |
| if local_file: |
| suffix = Path(local_file).suffix.lower() |
|
|
| if suffix in {".png", ".jpg", ".jpeg", ".webp"}: |
| image_paths.append(local_file) |
|
|
| elif suffix in {".mp3", ".wav", ".m4a", ".mpeg"}: |
| transcript = self.transcribe_audio(local_file) |
| extra_context.append("AUDIO TRANSCRIPT:\n" + transcript) |
|
|
| elif suffix in {".xlsx", ".xls"}: |
| sheet_summary = self.summarize_spreadsheet(local_file) |
| extra_context.append("SPREADSHEET CONTENT:\n" + sheet_summary) |
|
|
| else: |
| text_data = self.read_text_like_file(local_file) |
| if text_data: |
| extra_context.append(f"ATTACHED FILE CONTENT ({file_name}):\n{text_data}") |
|
|
| |
| final_answer = self.ask_model( |
| question=question, |
| extra_context="\n\n".join(extra_context), |
| image_paths=image_paths, |
| ) |
|
|
| print(f"Final answer for task {task_id}: {final_answer}") |
| return final_answer |
|
|
|
|
| def run_and_submit_all(profile: gr.OAuthProfile | None): |
| """ |
| Fetches all questions, runs the BasicAgent on them, submits all answers, |
| and displays the results. |
| """ |
| space_id = os.getenv("SPACE_ID") |
|
|
| if profile: |
| username = f"{profile.username}" |
| print(f"User logged in: {username}") |
| else: |
| print("User not logged in.") |
| return "Please Login to Hugging Face with the button.", None |
|
|
| api_url = DEFAULT_API_URL |
| questions_url = f"{api_url}/questions" |
| submit_url = f"{api_url}/submit" |
|
|
| try: |
| agent = BasicAgent() |
| except Exception as e: |
| print(f"Error instantiating agent: {e}") |
| return f"Error initializing agent: {e}", None |
|
|
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
| print(agent_code) |
|
|
| print(f"Fetching questions from: {questions_url}") |
| try: |
| response = requests.get(questions_url, timeout=30) |
| response.raise_for_status() |
| questions_data = response.json() |
| if not questions_data: |
| return "Fetched questions list is empty or invalid format.", None |
| print(f"Fetched {len(questions_data)} questions.") |
| except Exception as e: |
| return f"Error fetching questions: {e}", None |
|
|
| results_log = [] |
| answers_payload = [] |
|
|
| print(f"Running agent on {len(questions_data)} questions...") |
| for item in questions_data: |
| task_id = item.get("task_id") |
| question_text = item.get("question") |
| if not task_id or question_text is None: |
| print(f"Skipping invalid item: {item}") |
| continue |
|
|
| try: |
| submitted_answer = agent(item) |
| answers_payload.append({ |
| "task_id": task_id, |
| "submitted_answer": submitted_answer |
| }) |
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text, |
| "File": item.get("file_name", ""), |
| "Submitted Answer": submitted_answer |
| }) |
| except Exception as e: |
| print(f"Error running agent on task {task_id}: {e}") |
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text, |
| "File": item.get("file_name", ""), |
| "Submitted Answer": f"AGENT ERROR: {e}" |
| }) |
|
|
| if not answers_payload: |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
| submission_data = { |
| "username": username.strip(), |
| "agent_code": agent_code, |
| "answers": answers_payload |
| } |
|
|
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
| try: |
| response = requests.post(submit_url, json=submission_data, timeout=120) |
| response.raise_for_status() |
| result_data = response.json() |
|
|
| final_status = ( |
| f"Submission Successful!\n" |
| f"User: {result_data.get('username')}\n" |
| f"Overall Score: {result_data.get('score', 'N/A')}% " |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
| f"Message: {result_data.get('message', 'No message received.')}" |
| ) |
|
|
| results_df = pd.DataFrame(results_log) |
| return final_status, results_df |
|
|
| except requests.exceptions.HTTPError as e: |
| error_detail = f"Server responded with status {e.response.status_code}." |
| try: |
| error_json = e.response.json() |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
| except Exception: |
| error_detail += f" Response: {e.response.text[:500]}" |
| return f"Submission Failed: {error_detail}", pd.DataFrame(results_log) |
|
|
| except requests.exceptions.Timeout: |
| return "Submission Failed: The request timed out.", pd.DataFrame(results_log) |
|
|
| except requests.exceptions.RequestException as e: |
| return f"Submission Failed: Network error - {e}", pd.DataFrame(results_log) |
|
|
| except Exception as e: |
| return f"An unexpected error occurred during submission: {e}", pd.DataFrame(results_log) |
|
|
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("# Basic Agent Evaluation Runner") |
| gr.Markdown( |
| """ |
| **Instructions:** |
| 1. Edit this Space to define your agent's logic and tools. |
| 2. Log in to your Hugging Face account using the button below. |
| 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. |
| """ |
| ) |
|
|
| gr.LoginButton() |
| run_button = gr.Button("Run Evaluation & Submit All Answers") |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
| run_button.click( |
| fn=run_and_submit_all, |
| outputs=[status_output, results_table] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(debug=True, share=False) |