| import os |
| import gradio as gr |
| import requests |
| import inspect |
| import pandas as pd |
| import time |
| import mimetypes |
| from pathlib import Path |
|
|
| |
| |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
| |
| |
|
|
| from smolagents import CodeAgent, LiteLLMModel |
| from my_tools import my_tool_list |
|
|
| import mimetypes |
| from pathlib import Path |
|
|
| def download_file_universal(task_id, save_dir="attachments"): |
| """ |
| 通用文件下载,自动检测文件类型和扩展名 |
| """ |
| os.makedirs(save_dir, exist_ok=True) |
| |
| url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" |
| |
| print(f"[DEBUG] Downloading from: {url}") |
| |
| try: |
| headers = { |
| 'Accept': '*/*', |
| 'User-Agent': 'Mozilla/5.0 (compatible; Agent/1.0)' |
| } |
| |
| resp = requests.get(url, headers=headers, timeout=30, stream=True) |
| print(f"[DEBUG] HTTP {resp.status_code}") |
| print(f"[DEBUG] Content-Type: {resp.headers.get('content-type', 'Unknown')}") |
| print(f"[DEBUG] Content-Disposition: {resp.headers.get('content-disposition', 'Unknown')}") |
| |
| resp.raise_for_status() |
| |
| |
| filename = None |
| content_disp = resp.headers.get('content-disposition', '') |
| if 'filename=' in content_disp: |
| filename = content_disp.split('filename=')[1].strip('"\'') |
| |
| |
| if not filename: |
| content_type = resp.headers.get('content-type', '').lower() |
| ext = mimetypes.guess_extension(content_type.split(';')[0]) |
| if not ext: |
| |
| type_map = { |
| 'image/png': '.png', |
| 'image/jpeg': '.jpg', |
| 'image/gif': '.gif', |
| 'video/mp4': '.mp4', |
| 'video/avi': '.avi', |
| 'video/mov': '.mov', |
| 'audio/mp3': '.mp3', |
| 'audio/wav': '.wav', |
| 'audio/mpeg': '.mp3', |
| 'application/pdf': '.pdf', |
| 'text/plain': '.txt', |
| 'application/json': '.json', |
| 'text/csv': '.csv' |
| } |
| ext = type_map.get(content_type.split(';')[0], '.bin') |
| filename = f"{task_id}{ext}" |
| |
| save_path = os.path.join(save_dir, filename) |
| print(f"[DEBUG] Saving as: {save_path}") |
| |
| |
| with open(save_path, "wb") as f: |
| for chunk in resp.iter_content(chunk_size=8192): |
| f.write(chunk) |
| |
| file_size = os.path.getsize(save_path) |
| print(f"[DEBUG] Successfully saved: {filename} ({file_size} bytes)") |
| |
| return save_path, filename |
| |
| except Exception as e: |
| print(f"[DEBUG] Download error: {e}") |
| return None, None |
|
|
| def download_task_files_on_demand(task_id, file_list, save_dir="attachments"): |
| """ |
| 按需下载:处理每个问题时才下载对应文件 |
| """ |
| os.makedirs(save_dir, exist_ok=True) |
| downloaded_files = [] |
| |
| if not file_list: |
| print(f"[INFO] No files listed for task {task_id}, attempting direct download...") |
| file_path, filename = download_file_universal(task_id, save_dir) |
| if file_path: |
| downloaded_files.append(file_path) |
| else: |
| print(f"[INFO] Task {task_id} has {len(file_list)} files to download") |
| for expected_filename in file_list: |
| |
| potential_path = os.path.join(save_dir, expected_filename) |
| if os.path.exists(potential_path): |
| print(f"[CACHE] File already exists: {expected_filename}") |
| downloaded_files.append(potential_path) |
| continue |
| |
| |
| file_path, actual_filename = download_file_universal(task_id, save_dir) |
| if file_path: |
| |
| if actual_filename != expected_filename: |
| new_path = os.path.join(save_dir, expected_filename) |
| try: |
| os.rename(file_path, new_path) |
| file_path = new_path |
| print(f"[INFO] Renamed {actual_filename} to {expected_filename}") |
| except: |
| print(f"[WARN] Could not rename file, keeping as {actual_filename}") |
| |
| downloaded_files.append(file_path) |
| print(f"[SUCCESS] Downloaded: {os.path.basename(file_path)}") |
| else: |
| print(f"[FAIL] Could not download: {expected_filename}") |
| |
| |
| time.sleep(0.5) |
| |
| return downloaded_files |
|
|
| class BasicAgent: |
| def __init__(self): |
| api_key = os.getenv("OPENAI_API_KEY") |
| if not api_key: |
| raise ValueError("OPENAI_API_KEY not set in environment variables!") |
| model = LiteLLMModel( |
| model_id="gpt-4.1", |
| api_key=api_key |
| ) |
| |
| self.agent_name = "Celum" |
| self.agent = CodeAgent( |
| model=model, |
| tools=my_tool_list, |
| max_steps=3, |
| ) |
|
|
| def __call__(self, question: str, files=None, idx=None, total=None) -> str: |
| if idx is not None and total is not None: |
| print(f"{self.agent_name} is answering NO. {idx+1}/{total} : {question[:80]}...") |
| else: |
| print(f"{self.agent_name} received question: {question[:80]}...") |
| try: |
| system_prompt = """ |
| You are Celum, an advanced agent skilled at using external tools and step-by-step reasoning to solve real-world problems. |
| You may freely think, reason, and use tools or your own knowledge as needed to solve the problem. |
| |
| Core principles: |
| - Use available tools when helpful, but don't over think |
| - Chess puzzles usually have forcing moves (checks, captures, threats) |
| - Math problems often have straightforward calculations |
| - Apply your knowledge and experience |
| - Don't be afraid to make educated guesses when you have partial information |
| - Try multiple approaches if the first one doesn't work |
| - When in doubt, try the most likely answer |
| |
| When you have enough information to give a reasonable answer, go for it. |
| Only use "unknown" when you truly cannot make any reasonable attempt. |
| |
| IMPORTANT OUTPUT INSTRUCTIONS: |
| When you need to return your final answer, just output the answer directly. |
| |
| Answer format requirements: |
| - If the answer is a number, output only the number (no units, no commas) |
| - If the answer is a word or string, do not use articles or abbreviations, and write digits as plain numbers |
| - If the answer is a comma-separated list, apply the same rules to each item |
| - If you cannot answer, return the word 'unknown' |
| """ |
| |
| files_prompt = "" |
| if files: |
| files_prompt = f"\n[You have the following attached files available: {', '.join(files)}]\n" |
| files_prompt += "Use your tools to analyze any files as needed.\n" |
| |
| full_question = system_prompt + files_prompt + "\n\n" + question |
| return self.agent.run(full_question) |
| except Exception as e: |
| return f"[{self.agent_name} Error: {e}]" |
| |
| def safe_run_agent(agent, question, files, idx, total, max_retries=3): |
| tries = 0 |
| while tries < max_retries: |
| try: |
| return agent(question, files, idx, total) |
| except Exception as e: |
| if "RateLimitError" in str(e) or "rate limit" in str(e).lower(): |
| wait_time = 30 + tries * 10 |
| print(f"Rate limit hit, sleeping {wait_time}s before retry... (try {tries+1}/{max_retries})") |
| time.sleep(wait_time) |
| tries += 1 |
| else: |
| return f"[Agent Error: {e}]" |
| return "[Agent Error: Rate limit retries exceeded]" |
|
|
| def run_and_submit_all( profile: gr.OAuthProfile | None): |
| """ |
| Fetches all questions, runs the BasicAgent on them, submits all answers, |
| and displays the results. |
| """ |
| |
| space_id = os.getenv("SPACE_ID") |
|
|
| if profile: |
| username= f"{profile.username}" |
| print(f"User logged in: {username}") |
| else: |
| print("User not logged in.") |
| return "Please Login to Hugging Face with the button.", None |
|
|
| api_url = DEFAULT_API_URL |
| questions_url = f"{api_url}/questions" |
| submit_url = f"{api_url}/submit" |
|
|
| |
| try: |
| agent = BasicAgent() |
| except Exception as e: |
| print(f"Error instantiating agent: {e}") |
| return f"Error initializing agent: {e}", None |
| |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
| print(agent_code) |
|
|
| |
| print(f"Fetching questions from: {questions_url}") |
| try: |
| response = requests.get(questions_url, timeout=15) |
| response.raise_for_status() |
| questions_data = response.json() |
| if not questions_data: |
| print("Fetched questions list is empty.") |
| return "Fetched questions list is empty or invalid format.", None |
| print(f"Fetched {len(questions_data)} questions.") |
| except requests.exceptions.RequestException as e: |
| print(f"Error fetching questions: {e}") |
| return f"Error fetching questions: {e}", None |
| except requests.exceptions.JSONDecodeError as e: |
| print(f"Error decoding JSON response from questions endpoint: {e}") |
| print(f"Response text: {response.text[:500]}") |
| return f"Error decoding server response for questions: {e}", None |
| except Exception as e: |
| print(f"An unexpected error occurred fetching questions: {e}") |
| return f"An unexpected error occurred fetching questions: {e}", None |
|
|
| |
| results_log = [] |
| answers_payload = [] |
| print(f"Running agent on {len(questions_data)} questions...") |
| |
| for idx, item in enumerate(questions_data): |
| task_id = item.get("task_id") |
| question_text = item.get("question") |
| file_list = item.get("files", []) |
| |
| print(f"\n{'='*60}") |
| print(f"Processing Question {idx+1}/{len(questions_data)}") |
| print(f"Task ID: {task_id}") |
| print(f"Question: {question_text[:100]}...") |
| print(f"Expected files: {file_list}") |
| print(f"{'='*60}") |
| |
| |
| local_files = [] |
| if file_list or True: |
| print(f"[DOWNLOAD] Starting download for task {task_id}...") |
| local_files = download_task_files_on_demand(task_id, file_list) |
| |
| if local_files: |
| print(f"[DOWNLOAD] Successfully got {len(local_files)} files:") |
| for f in local_files: |
| size = os.path.getsize(f) |
| print(f" - {os.path.basename(f)} ({size} bytes)") |
| else: |
| print(f"[DOWNLOAD] No files downloaded for task {task_id}") |
| |
| |
| print(f"[AGENT] Running Celum on question {idx+1}...") |
| try: |
| submitted_answer = safe_run_agent(agent, question_text, local_files, idx, len(questions_data)) |
| |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text[:100] + "...", |
| "Submitted Answer": submitted_answer, |
| "Files": [os.path.basename(f) for f in local_files] if local_files else [] |
| }) |
| |
| print(f"[AGENT] Answer: {submitted_answer}") |
| |
| except Exception as e: |
| error_msg = f"AGENT ERROR: {e}" |
| print(f"[ERROR] {error_msg}") |
| answers_payload.append({"task_id": task_id, "submitted_answer": "unknown"}) |
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text[:100] + "...", |
| "Submitted Answer": error_msg, |
| "Files": [] |
| }) |
| |
| |
| if idx < len(questions_data) - 1: |
| print(f"[WAIT] Waiting before next question...") |
| time.sleep(2) |
| |
| if not answers_payload: |
| print("Agent did not produce any answers to submit.") |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
| |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
| print(status_update) |
|
|
| |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
| try: |
| response = requests.post(submit_url, json=submission_data, timeout=60) |
| response.raise_for_status() |
| result_data = response.json() |
| final_status = ( |
| f"Submission Successful!\n" |
| f"AI: Celum\n" |
| f"Overall Score: {result_data.get('score', 'N/A')}% " |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
| f"Message: {result_data.get('message', 'No message received.')}" |
| ) |
| print("Submission successful.") |
| results_df = pd.DataFrame(results_log) |
| return final_status, results_df |
| except requests.exceptions.HTTPError as e: |
| error_detail = f"Server responded with status {e.response.status_code}." |
| try: |
| error_json = e.response.json() |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
| except requests.exceptions.JSONDecodeError: |
| error_detail += f" Response: {e.response.text[:500]}" |
| status_message = f"Submission Failed: {error_detail}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
| except requests.exceptions.Timeout: |
| status_message = "Submission Failed: The request timed out." |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
| except requests.exceptions.RequestException as e: |
| status_message = f"Submission Failed: Network error - {e}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
| except Exception as e: |
| status_message = f"An unexpected error occurred during submission: {e}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
|
|
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# Basic Agent Evaluation Runner") |
| gr.Markdown( |
| """ |
| **Instructions:** |
| |
| 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... |
| 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. |
| 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. |
| |
| --- |
| **Disclaimers:** |
| Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). |
| This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. |
| """ |
| ) |
|
|
| gr.LoginButton() |
|
|
| run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
| |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
| run_button.click( |
| fn=run_and_submit_all, |
| outputs=[status_output, results_table] |
| ) |
|
|
| if __name__ == "__main__": |
| print("\n" + "-"*30 + " App Starting " + "-"*30) |
| |
| space_host_startup = os.getenv("SPACE_HOST") |
| space_id_startup = os.getenv("SPACE_ID") |
|
|
| if space_host_startup: |
| print(f"✅ SPACE_HOST found: {space_host_startup}") |
| print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
| else: |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
| if space_id_startup: |
| print(f"✅ SPACE_ID found: {space_id_startup}") |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
| else: |
| print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
| print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
| print("Launching Gradio Interface for Basic Agent Evaluation...") |
| demo.launch(debug=True, share=False) |