| | import os |
| | import tempfile |
| | import urllib.parse |
| | from pathlib import Path |
| | import subprocess |
| | import gradio as gr |
| | import requests |
| | import inspect |
| | import pandas as pd |
| | from smolagents import CodeAgent, PythonInterpreterTool, WebSearchTool, VisitWebpageTool, WikipediaSearchTool, InferenceClientModel, tool |
| | import numpy as np |
| |
|
| | |
| | |
| | DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
| |
|
| | |
| |
|
| |
|
| | @tool |
| | def download_youtube_video(video_url: str) -> str: |
| | """ |
| | Downloads a YouTube video using yt-dlp. |
| | |
| | Args: |
| | video_url: The YouTube URL to download |
| | |
| | Returns: |
| | str: Path to the downloaded video file or error message |
| | """ |
| | try: |
| | temp_dir = tempfile.mkdtemp() |
| | output_path = Path(temp_dir) / "video.%(ext)s" |
| |
|
| | |
| | cmd = [ |
| | "yt-dlp", |
| | |
| | "--format", "best[height<=720]", |
| | "--output", str(output_path), |
| | video_url |
| | ] |
| |
|
| | result = subprocess.run( |
| | cmd, capture_output=True, text=True, timeout=300) |
| |
|
| | if result.returncode != 0: |
| | return f"Error downloading video: {result.stderr}" |
| |
|
| | |
| | video_files = list(Path(temp_dir).glob("video.*")) |
| | if video_files: |
| | return str(video_files[0]) |
| | else: |
| | return "Error: Video file not found after download" |
| |
|
| | except subprocess.TimeoutExpired: |
| | return "Error: Video download timed out" |
| | except Exception as e: |
| | return f"Error downloading YouTube video: {e}" |
| |
|
| |
|
| | @tool |
| | def extract_video_frames(video_path: str, max_frames: int = 30) -> str: |
| | """ |
| | Extracts frames from a video file. |
| | |
| | Args: |
| | video_path: Path to the video file |
| | max_frames: Maximum number of frames to extract |
| | |
| | Returns: |
| | str: Directory path containing extracted frames or error message |
| | """ |
| | try: |
| | try: |
| | import cv2 |
| | except ImportError: |
| | return "Error: OpenCV (cv2) not available for video processing" |
| |
|
| | if not Path(video_path).exists(): |
| | return f"Video file not found: {video_path}" |
| |
|
| | cap = cv2.VideoCapture(video_path) |
| | if not cap.isOpened(): |
| | return f"Error: Cannot open video file {video_path}" |
| |
|
| | |
| | frames_dir = Path(video_path).parent / "frames" |
| | frames_dir.mkdir(exist_ok=True) |
| |
|
| | total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| |
|
| | |
| | frame_interval = max(1, total_frames // max_frames) |
| |
|
| | frame_count = 0 |
| | extracted_count = 0 |
| |
|
| | while cap.isOpened() and extracted_count < max_frames: |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| |
|
| | if frame_count % frame_interval == 0: |
| | frame_path = frames_dir / f"frame_{extracted_count:04d}.jpg" |
| | cv2.imwrite(str(frame_path), frame) |
| | extracted_count += 1 |
| |
|
| | frame_count += 1 |
| |
|
| | cap.release() |
| |
|
| | return f"Extracted {extracted_count} frames to {frames_dir}" |
| |
|
| | except ImportError: |
| | return "Error: OpenCV (cv2) not available for video processing" |
| | except Exception as e: |
| | return f"Error extracting video frames: {e}" |
| |
|
| |
|
| | @tool |
| | def analyze_image_with_description(image_path: str) -> str: |
| | """ |
| | Analyzes an image and provides a detailed description. |
| | |
| | Args: |
| | image_path: Path to the image file |
| | |
| | Returns: |
| | str: Image analysis description or error message |
| | """ |
| | try: |
| | from PIL import Image |
| |
|
| | if not Path(image_path).exists(): |
| | return f"Image file not found: {image_path}" |
| |
|
| | |
| | img = Image.open(image_path) |
| |
|
| | |
| | width, height = img.size |
| | mode = img.mode |
| | format_info = img.format or "Unknown" |
| |
|
| | |
| | if img.mode != 'RGB': |
| | img_rgb = img.convert('RGB') |
| | else: |
| | img_rgb = img |
| |
|
| | |
| | pixels = np.array(img_rgb) |
| | avg_color = np.mean(pixels, axis=(0, 1)) |
| |
|
| | analysis = { |
| | "file_path": image_path, |
| | "dimensions": f"{width}x{height}", |
| | "mode": mode, |
| | "format": format_info, |
| | "average_color_rgb": avg_color.tolist(), |
| | "file_size_bytes": Path(image_path).stat().st_size |
| | } |
| |
|
| | return str(analysis) |
| |
|
| | except ImportError: |
| | return "Error: PIL not available for image processing" |
| | except Exception as e: |
| | return f"Error analyzing image: {e}" |
| |
|
| |
|
| | @tool |
| | def process_chess_image(image_path: str) -> str: |
| | """ |
| | Processes a chess board image to identify piece positions. |
| | |
| | Args: |
| | image_path: Path to the chess board image |
| | |
| | Returns: |
| | str: Chess position description or error message |
| | """ |
| | try: |
| | from PIL import Image |
| |
|
| | if not Path(image_path).exists(): |
| | return f"Chess image file not found: {image_path}" |
| |
|
| | img = Image.open(image_path) |
| |
|
| | |
| | width, height = img.size |
| |
|
| | analysis = { |
| | "image_path": image_path, |
| | "dimensions": f"{width}x{height}", |
| | "analysis_note": "Chess position analysis requires specialized computer vision models. This tool provides basic image information. For detailed piece recognition, the vision agent should analyze the image directly using its vision capabilities.", |
| | "recommendation": "Pass this image directly to the vision agent for detailed chess position analysis." |
| | } |
| |
|
| | return str(analysis) |
| |
|
| | except Exception as e: |
| | return f"Error processing chess image: {e}" |
| |
|
| |
|
| | @tool |
| | def download_task_file(task_id: str) -> str: |
| | """ |
| | Downloads a file associated with a specific task ID from the evaluation system. |
| | |
| | Args: |
| | task_id: The task ID to download files for |
| | |
| | Returns: |
| | str: Path to the downloaded file or error message |
| | """ |
| | try: |
| | file_url = f"{DEFAULT_API_URL}/files/{task_id}" |
| | response = requests.get(file_url, timeout=30) |
| | response.raise_for_status() |
| |
|
| | |
| | temp_dir = tempfile.mkdtemp() |
| |
|
| | |
| | filename = f"task_{task_id}_file" |
| | if 'content-disposition' in response.headers: |
| | content_disposition = response.headers['content-disposition'] |
| | if 'filename=' in content_disposition: |
| | filename = content_disposition.split('filename=')[1].strip('"') |
| |
|
| | |
| | file_path = Path(temp_dir) / filename |
| | with open(file_path, 'wb') as f: |
| | f.write(response.content) |
| |
|
| | return str(file_path) |
| | except requests.exceptions.RequestException as e: |
| | return f"Error downloading task file: {e}" |
| | except Exception as e: |
| | return f"Error saving task file: {e}" |
| |
|
| |
|
| | @tool |
| | def download_file_from_url(url: str, filename: str = "") -> str: |
| | """ |
| | Downloads a file from any given URL. |
| | |
| | Args: |
| | url: The URL to download the file from |
| | filename: Optional filename to save as (if not provided, will extract from URL) |
| | |
| | Returns: |
| | str: Path to the downloaded file or error message |
| | """ |
| | try: |
| | response = requests.get(url, timeout=30) |
| | response.raise_for_status() |
| |
|
| | |
| | temp_dir = tempfile.mkdtemp() |
| |
|
| | |
| | if not filename: |
| | parsed_url = urllib.parse.urlparse(url) |
| | filename = Path(parsed_url.path).name |
| | if not filename: |
| | filename = "downloaded_file" |
| |
|
| | |
| | file_path = Path(temp_dir) / filename |
| | with open(file_path, 'wb') as f: |
| | f.write(response.content) |
| |
|
| | return str(file_path) |
| | except requests.exceptions.RequestException as e: |
| | return f"Error downloading file from URL: {e}" |
| | except Exception as e: |
| | return f"Error saving downloaded file: {e}" |
| |
|
| |
|
| | @tool |
| | def read_downloaded_file(file_path: str, encoding: str = "utf-8") -> str: |
| | """ |
| | Reads the content of a downloaded file. |
| | |
| | Args: |
| | file_path: Path to the file to read |
| | encoding: Text encoding to use (default: utf-8) |
| | |
| | Returns: |
| | str: File content or error message |
| | """ |
| | try: |
| | file_path_obj = Path(file_path) |
| | if not file_path_obj.exists(): |
| | return f"File not found: {file_path}" |
| |
|
| | |
| | try: |
| | with open(file_path_obj, 'r', encoding=encoding) as f: |
| | content = f.read() |
| | return content |
| | except UnicodeDecodeError: |
| | |
| | file_size = file_path_obj.stat().st_size |
| | file_ext = file_path_obj.suffix |
| | return f"Binary file detected. File: {file_path}, Size: {file_size} bytes, Extension: {file_ext}" |
| | except Exception as e: |
| | return f"Error reading file: {e}" |
| |
|
| |
|
| | @tool |
| | def list_file_info(file_path: str) -> str: |
| | """ |
| | Gets information about a downloaded file. |
| | |
| | Args: |
| | file_path: Path to the file |
| | |
| | Returns: |
| | str: File information including size, type, etc. |
| | """ |
| | try: |
| | file_path_obj = Path(file_path) |
| | if not file_path_obj.exists(): |
| | return f"File not found: {file_path}" |
| |
|
| | stat = file_path_obj.stat() |
| | info = { |
| | "name": file_path_obj.name, |
| | "path": str(file_path_obj), |
| | "size_bytes": stat.st_size, |
| | "size_kb": round(stat.st_size / 1024, 2), |
| | "extension": file_path_obj.suffix, |
| | "is_text": file_path_obj.suffix.lower() in ['.txt', '.md', '.json', '.xml', '.csv', '.py', '.js', '.html', '.css'] |
| | } |
| |
|
| | return str(info) |
| | except Exception as e: |
| | return f"Error getting file info: {e}" |
| |
|
| |
|
| | |
| | visual_model = InferenceClientModel( |
| | model_id="Qwen/Qwen2.5-VL-72B-Instruct", provider="nebius", temperature=0 |
| | ) |
| |
|
| | |
| |
|
| |
|
| | class MultiAgentSystem: |
| | def __init__(self): |
| | |
| | text_model = InferenceClientModel( |
| | model_id="Qwen/Qwen3-235B-A22B", provider="nebius", temperature=0 |
| | ) |
| |
|
| | |
| | self.vision_agent = CodeAgent( |
| | model=visual_model, |
| | tools=[ |
| | PythonInterpreterTool(), |
| | download_task_file, |
| | download_file_from_url, |
| | download_youtube_video, |
| | extract_video_frames, |
| | analyze_image_with_description, |
| | process_chess_image, |
| | read_downloaded_file, |
| | list_file_info |
| | ], |
| | additional_authorized_imports=[ |
| | "requests", |
| | "json", |
| | "pandas", |
| | "numpy", |
| | "PIL", |
| | "PIL.Image", |
| | "cv2", |
| | "base64", |
| | "io", |
| | "tempfile", |
| | "pathlib", |
| | "subprocess", |
| | "re", |
| | "os", |
| | "posixpath" |
| | ], |
| | max_steps=15, |
| | name="vision_agent", |
| | description="Processes images, videos, downloads files, analyzes visual content including chess positions and video frames. IMPORTANT: Answer should be a number OR as few words as possible OR a comma separated list. For numbers, don't use commas or units unless specified. For strings, don't use articles or abbreviations.", |
| | verbosity_level=0 |
| | ) |
| |
|
| | |
| | self.web_agent = CodeAgent( |
| | model=text_model, |
| | tools=[ |
| | PythonInterpreterTool(), |
| | WebSearchTool(), |
| | VisitWebpageTool(), |
| | WikipediaSearchTool(), |
| | download_task_file, |
| | download_file_from_url, |
| | download_youtube_video, |
| | read_downloaded_file, |
| | list_file_info |
| | ], |
| | additional_authorized_imports=[ |
| | "requests", |
| | "json", |
| | "pandas", |
| | "numpy", |
| | "re", |
| | "time", |
| | "datetime", |
| | "tempfile", |
| | "pathlib", |
| | "subprocess", |
| | "os", |
| | "posixpath" |
| | ], |
| | max_steps=15, |
| | name="web_agent", |
| | description="Searches web, downloads files and videos, gathers information from online sources. IMPORTANT: Answer should be a number OR as few words as possible OR a comma separated list. For numbers, don't use commas or units unless specified. For strings, don't use articles or abbreviations.", |
| | verbosity_level=0 |
| | ) |
| |
|
| | |
| | self.manager_agent = CodeAgent( |
| | model=text_model, |
| | tools=[ |
| | PythonInterpreterTool(), |
| | download_task_file, |
| | download_file_from_url, |
| | read_downloaded_file, |
| | list_file_info |
| | ], |
| | managed_agents=[self.vision_agent, self.web_agent], |
| | additional_authorized_imports=[ |
| | "requests", |
| | "json", |
| | "pandas", |
| | "numpy", |
| | "re", |
| | "time", |
| | "datetime", |
| | "os", |
| | "inspect", |
| | "random", |
| | "math", |
| | "tempfile", |
| | "pathlib", |
| | "posixpath" |
| | ], |
| | max_steps=25, |
| | name="manager_agent", |
| | description="Coordinates vision and web agents to solve complex tasks involving videos, images, and web research. IMPORTANT: Answer should be a number OR as few words as possible OR a comma separated list. For numbers, don't use commas or units unless specified. For strings, don't use articles or abbreviations.", |
| | verbosity_level=1 |
| | ) |
| |
|
| | print("Multi-Agent System initialized with enhanced Vision Agent, Web Agent, and Manager Agent.") |
| |
|
| | def run(self, question: str, images=None, task_id=None) -> str: |
| | """ |
| | Run the multi-agent system on a question. |
| | If images are provided, they will be available to the vision agent. |
| | If task_id is provided, agents can download task-specific files. |
| | """ |
| | try: |
| | |
| | enhanced_question = question + "\n\nIMPORTANT: Answer should be a number OR as few words as possible OR a comma separated list. For numbers, don't use commas or units unless specified. For strings, don't use articles or abbreviations." |
| |
|
| | if task_id: |
| | enhanced_question += f"\n\nTask ID: {task_id} (use download_task_file tool if files are needed)" |
| |
|
| | |
| | if images: |
| | result = self.vision_agent.run( |
| | enhanced_question, images=images) |
| | else: |
| | |
| | result = self.manager_agent.run(enhanced_question) |
| |
|
| | return str(result) |
| | except Exception as e: |
| | return f"Error in multi-agent system: {str(e)}" |
| |
|
| | |
| | |
| |
|
| |
|
| | class BasicAgent: |
| | def __init__(self): |
| | |
| | self.multi_agent_system = MultiAgentSystem() |
| | print("BasicAgent initialized with Enhanced Multi-Agent System.") |
| |
|
| | def __call__(self, question: str, task_id: str = "") -> str: |
| | print(f"Agent received question (first 50 chars): {question[:50]}...") |
| | try: |
| | |
| | answer = self.multi_agent_system.run(question, task_id=task_id) |
| | except Exception as e: |
| | print(f"Error running multi-agent system: {e}") |
| | answer = f"Error: {str(e)}" |
| |
|
| | print(f"Agent returning answer: {answer}") |
| | return answer |
| |
|
| |
|
| | def run_and_submit_all(profile: gr.OAuthProfile | None): |
| | """ |
| | Fetches all questions, runs the BasicAgent on them, submits all answers, |
| | and displays the results. |
| | """ |
| | |
| | |
| | space_id = os.getenv("SPACE_ID") |
| |
|
| | if profile: |
| | username = f"{profile.username}" |
| | print(f"User logged in: {username}") |
| | else: |
| | print("User not logged in.") |
| | return "Please Login to Hugging Face with the button.", None |
| |
|
| | api_url = DEFAULT_API_URL |
| | questions_url = f"{api_url}/questions" |
| | submit_url = f"{api_url}/submit" |
| |
|
| | |
| | try: |
| | agent = BasicAgent() |
| | except Exception as e: |
| | print(f"Error instantiating agent: {e}") |
| | return f"Error initializing agent: {e}", None |
| | |
| | agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
| | print(agent_code) |
| |
|
| | |
| | print(f"Fetching questions from: {questions_url}") |
| | try: |
| | response = requests.get(questions_url, timeout=15) |
| | response.raise_for_status() |
| | questions_data = response.json() |
| | if not questions_data: |
| | print("Fetched questions list is empty.") |
| | return "Fetched questions list is empty or invalid format.", None |
| | print(f"Fetched {len(questions_data)} questions.") |
| | except requests.exceptions.RequestException as e: |
| | print(f"Error fetching questions: {e}") |
| | return f"Error fetching questions: {e}", None |
| | except requests.exceptions.JSONDecodeError as e: |
| | print(f"Error decoding JSON response from questions endpoint: {e}") |
| | print(f"Response text: {response.text[:500]}") |
| | return f"Error decoding server response for questions: {e}", None |
| | except Exception as e: |
| | print(f"An unexpected error occurred fetching questions: {e}") |
| | return f"An unexpected error occurred fetching questions: {e}", None |
| |
|
| | |
| | results_log = [] |
| | answers_payload = [] |
| | print(f"Running agent on {len(questions_data)} questions...") |
| | for item in questions_data: |
| | task_id = item.get("task_id") |
| | question_text = item.get("question") |
| | if not task_id or question_text is None: |
| | print(f"Skipping item with missing task_id or question: {item}") |
| | continue |
| | try: |
| | submitted_answer = agent(question_text, task_id) |
| | answers_payload.append( |
| | {"task_id": task_id, "submitted_answer": submitted_answer}) |
| | results_log.append( |
| | {"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
| | except Exception as e: |
| | print(f"Error running agent on task {task_id}: {e}") |
| | results_log.append( |
| | {"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
| |
|
| | if not answers_payload: |
| | print("Agent did not produce any answers to submit.") |
| | return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
| |
|
| | |
| | submission_data = {"username": username.strip( |
| | ), "agent_code": agent_code, "answers": answers_payload} |
| | status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
| | print(status_update) |
| |
|
| | |
| | print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
| | try: |
| | response = requests.post(submit_url, json=submission_data, timeout=60) |
| | response.raise_for_status() |
| | result_data = response.json() |
| | final_status = ( |
| | f"Submission Successful!\n" |
| | f"User: {result_data.get('username')}\n" |
| | f"Overall Score: {result_data.get('score', 'N/A')}% " |
| | f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
| | f"Message: {result_data.get('message', 'No message received.')}" |
| | ) |
| | print("Submission successful.") |
| | results_df = pd.DataFrame(results_log) |
| | return final_status, results_df |
| | except requests.exceptions.HTTPError as e: |
| | error_detail = f"Server responded with status {e.response.status_code}." |
| | try: |
| | error_json = e.response.json() |
| | error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
| | except requests.exceptions.JSONDecodeError: |
| | error_detail += f" Response: {e.response.text[:500]}" |
| | status_message = f"Submission Failed: {error_detail}" |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| | except requests.exceptions.Timeout: |
| | status_message = "Submission Failed: The request timed out." |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| | except requests.exceptions.RequestException as e: |
| | status_message = f"Submission Failed: Network error - {e}" |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| | except Exception as e: |
| | status_message = f"An unexpected error occurred during submission: {e}" |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| |
|
| |
|
| | |
| | with gr.Blocks() as demo: |
| | gr.Markdown("# Basic Agent Evaluation Runner") |
| | gr.Markdown( |
| | """ |
| | **Instructions:** |
| | |
| | 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... |
| | 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. |
| | 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. |
| | |
| | --- |
| | **Disclaimers:** |
| | Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). |
| | This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. |
| | """ |
| | ) |
| |
|
| | gr.LoginButton() |
| |
|
| | run_button = gr.Button("Run Evaluation & Submit All Answers") |
| |
|
| | status_output = gr.Textbox( |
| | label="Run Status / Submission Result", lines=5, interactive=False) |
| | |
| | results_table = gr.DataFrame( |
| | label="Questions and Agent Answers", wrap=True) |
| |
|
| | run_button.click( |
| | fn=run_and_submit_all, |
| | outputs=[status_output, results_table] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | print("\n" + "-"*30 + " App Starting " + "-"*30) |
| | |
| | space_host_startup = os.getenv("SPACE_HOST") |
| | space_id_startup = os.getenv("SPACE_ID") |
| |
|
| | if space_host_startup: |
| | print(f"✅ SPACE_HOST found: {space_host_startup}") |
| | print( |
| | f" Runtime URL should be: https://{space_host_startup}.hf.space") |
| | else: |
| | print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
| |
|
| | if space_id_startup: |
| | print(f"✅ SPACE_ID found: {space_id_startup}") |
| | print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
| | print( |
| | f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
| | else: |
| | print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
| |
|
| | print("-"*(60 + len(" App Starting ")) + "\n") |
| |
|
| | print("Launching Gradio Interface for Basic Agent Evaluation...") |
| | demo.launch(debug=True, share=False) |
| |
|