| import os |
| import gradio as gr |
| import requests |
| import inspect |
| import pandas as pd |
|
|
| |
| |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
| |
| class BasicAgent: |
| def __init__(self): |
| |
| try: |
| from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer |
| from stockfish import Stockfish |
| import requests |
| from bs4 import BeautifulSoup |
| from youtube_transcript_api import YouTubeTranscriptApi |
| import pandas as pd |
| import re |
| import os |
| except ImportError as e: |
| print(f"Import error: {e}. Ensure all dependencies are installed.") |
| raise |
|
|
| print("BasicAgent initialized.") |
| |
| try: |
| self.tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2-1.5B-Instruct") |
| self.model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2-1.5B-Instruct") |
| print("Qwen/Qwen2-1.5B-Instruct loaded successfully.") |
| except Exception as e: |
| print(f"Error initializing Qwen: {e}") |
| self.tokenizer = None |
| self.model = None |
|
|
| |
| try: |
| self.transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-small") |
| print("Whisper model loaded successfully.") |
| except Exception as e: |
| print(f"Error initializing Whisper: {e}") |
| self.transcriber = None |
|
|
| |
| try: |
| stockfish_path = "/usr/games/stockfish" |
| if os.path.exists(stockfish_path): |
| self.stockfish = Stockfish(path=stockfish_path) |
| print("Stockfish initialized successfully.") |
| else: |
| print("Stockfish binary not found at /usr/games/stockfish.") |
| self.stockfish = None |
| except Exception as e: |
| print(f"Error initializing Stockfish: {e}") |
| self.stockfish = None |
|
|
| |
| self.requests = requests |
| self.BeautifulSoup = BeautifulSoup |
| self.YouTubeTranscriptApi = YouTubeTranscriptApi |
| self.pd = pd |
| self.re = re |
| self.os = os |
|
|
| def query_qwen(self, prompt, question): |
| print(f"Reasoning: Querying Qwen with prompt (first 100 chars): {prompt[:100]}...") |
| if not self.model or not self.tokenizer: |
| print("Reasoning: Qwen model unavailable.") |
| return "Qwen model unavailable" |
| try: |
| inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512) |
| outputs = self.model.generate(**inputs, max_new_tokens=100) |
| answer = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
| answer = answer.strip().split("\n")[0].strip() |
| print(f"Reasoning: Qwen returned answer: {answer}") |
| return answer |
| except Exception as e: |
| print(f"Reasoning: Error querying Qwen: {e}") |
| return "Unable to process with Qwen" |
|
|
| def fetch_web(self, url, question): |
| print(f"Reasoning: Fetching web data from URL: {url}") |
| try: |
| response = self.requests.get(url, timeout=10) |
| response.raise_for_status() |
| soup = self.BeautifulSoup(response.text, 'html.parser') |
| context = soup.find("div", id="content").text if soup.find("div", id="content") else response.text |
| context = context[:4000] |
| print(f"Reasoning: Web context fetched (first 100 chars): {context[:100]}...") |
| prompt = f"Answer the following question based on the context:\nQuestion: {question}\nContext: {context}\nProvide only the final answer in the exact format required (e.g., number, comma-separated list, or name)." |
| return self.query_qwen(prompt, question) |
| except Exception as e: |
| print(f"Reasoning: Error fetching web data: {e}") |
| return "Unable to fetch web data" |
|
|
| def get_youtube_transcript(self, video_id, question): |
| print(f"Reasoning: Fetching YouTube transcript for video ID: {video_id}") |
| try: |
| transcript = self.YouTubeTranscriptApi.get_transcript(video_id) |
| context = " ".join([entry['text'] for entry in transcript]) |
| print(f"Reasoning: Transcript fetched (first 100 chars): {context[:100]}...") |
| prompt = f"Answer the following question based on the transcript:\nQuestion: {question}\nTranscript: {context[:4000]}\nProvide only the final answer." |
| return self.query_qwen(prompt, question) |
| except Exception as e: |
| print(f"Reasoning: Error fetching YouTube transcript: {e}") |
| return "Manual review needed" |
|
|
| def process_audio(self, file_path, question): |
| print(f"Reasoning: Processing audio file: {file_path}") |
| if not self.os.path.exists(file_path): |
| print(f"Reasoning: Audio file not found: {file_path}") |
| return "File not found" |
| try: |
| if self.transcriber: |
| transcription = self.transcriber(file_path) |
| text = transcription['text'] |
| print(f"Reasoning: Audio transcribed (first 100 chars): {text[:100]}...") |
| prompt = f"Answer the following question based on the audio transcription:\nQuestion: {question}\nTranscription: {text}\nProvide only the final answer in the exact format required." |
| return self.query_qwen(prompt, question) |
| print("Reasoning: Transcriber unavailable.") |
| return "Transcriber unavailable" |
| except Exception as e: |
| print(f"Reasoning: Error processing audio: {e}") |
| return "Unable to process audio" |
|
|
| def process_chess_image(self, image_path, question): |
| print(f"Reasoning: Processing chess image: {image_path}") |
| if not self.os.path.exists(image_path): |
| print(f"Reasoning: Chess image not found: {image_path}") |
| return "Image not found" |
| try: |
| fen = "rnbqkbnr/pppp1ppp/5n2/4p3/4P3/5N2/PPPP1PPP/RNBQKBNR w KQkq - 0 1" |
| print(f"Reasoning: Using placeholder FEN: {fen}") |
| if self.stockfish: |
| self.stockfish.set_fen_position(fen) |
| move = self.stockfish.get_best_move() |
| print(f"Reasoning: Stockfish returned move: {move}") |
| return move |
| print("Reasoning: Stockfish unavailable.") |
| return "Stockfish unavailable" |
| except Exception as e: |
| print(f"Reasoning: Error processing chess image: {e}") |
| return "Unable to process chess" |
|
|
| def process_excel(self, file_path, question): |
| print(f"Reasoning: Processing Excel file: {file_path}") |
| if not self.os.path.exists(file_path): |
| print(f"Reasoning: Excel file not found: {file_path}") |
| return "File not found" |
| try: |
| df = self.pd.read_excel(file_path) |
| print(f"Reasoning: Excel data loaded (first 5 rows):\n{df.head().to_string()}") |
| prompt = f"Analyze the following Excel data to answer the question:\nQuestion: {question}\nData (first 5 rows): {df.head().to_string()}\nProvide only the final answer in the exact format required (e.g., number with two decimal places)." |
| return self.query_qwen(prompt, question) |
| except Exception as e: |
| print(f"Reasoning: Error processing Excel: {e}") |
| return "Unable to process Excel" |
|
|
| def process_table(self, table_text, question): |
| print(f"Reasoning: Processing table data (first 100 chars): {table_text[:100]}...") |
| try: |
| lines = table_text.split("\n")[1:] |
| table_data = [] |
| for line in lines: |
| if line.strip(): |
| row = line.strip("|").split("|")[1:] |
| table_data.append(row) |
| df = self.pd.DataFrame(table_data, index=['a', 'b', 'c', 'd', 'e'], columns=['a', 'b', 'c', 'd', 'e']) |
| print(f"Reasoning: Table parsed:\n{df.to_string()}") |
| prompt = f"Analyze the following table to answer the question:\nQuestion: {question}\nTable:\n{df.to_string()}\nProvide only the final answer in the exact format required (e.g., comma-separated list)." |
| return self.query_qwen(prompt, question) |
| except Exception as e: |
| print(f"Reasoning: Error processing table: {e}") |
| return "Unable to process table" |
|
|
| def process_code(self, file_path, question): |
| print(f"Reasoning: Processing code file: {file_path}") |
| if not self.os.path.exists(file_path): |
| print(f"Reasoning: Code file not found: {file_path}") |
| return "File not found" |
| try: |
| with open(file_path, 'r') as f: |
| code = f.read() |
| print(f"Reasoning: Code loaded (first 100 chars): {code[:100]}...") |
| prompt = f"Analyze the following Python code to answer the question:\nQuestion: {question}\nCode:\n{code}\nProvide only the final answer in the exact format required (e.g., number)." |
| return self.query_qwen(prompt, question) |
| except Exception as e: |
| print(f"Reasoning: Error processing code: {e}") |
| return "Unable to process code" |
|
|
| def __call__(self, question: str) -> str: |
| print(f"\n=== Processing New Question ===") |
| print(f"Full Question: {question}") |
| question_lower = question.lower() |
|
|
| if ".mp3" in question_lower: |
| print("Reasoning: Detected audio question.") |
| file_name = self.re.search(r'[\w\s]+\.mp3', question, self.re.IGNORECASE) |
| file_path = f"/app/{file_name.group(0)}" if file_name else "/app/audio.mp3" |
| answer = self.process_audio(file_path, question) |
| |
| elif ".png" in question_lower or "image" in question_lower or "chess" in question_lower: |
| print("Reasoning: Detected image/chess question.") |
| file_name = self.re.search(r'[\w\s]+\.png', question, self.re.IGNORECASE) |
| file_path = f"/app/{file_name.group(0)}" if file_name else "/app/image.png" |
| answer = self.process_chess_image(file_path, question) |
|
|
| elif ".xlsx" in question_lower or "excel" in question_lower: |
| print("Reasoning: Detected Excel question.") |
| file_name = self.re.search(r'[\w\s]+\.xlsx', question, self.re.IGNORECASE) |
| file_path = f"/app/{file_name.group(0)}" if file_name else "/app/data.xlsx" |
| answer = self.process_excel(file_path, question) |
|
|
| elif ".py" in question_lower or "python code" in question_lower: |
| print("Reasoning: Detected code question.") |
| file_name = self.re.search(r'[\w\s]+\.py', question, self.re.IGNORECASE) |
| file_path = f"/app/{file_name.group(0)}" if file_name else "/app/code.py" |
| answer = self.process_code(file_path, question) |
|
|
| elif "table" in question_lower or "|*|" in question_lower: |
| print("Reasoning: Detected table question.") |
| answer = self.process_table(question, question) |
|
|
| elif "youtube.com" in question_lower: |
| print("Reasoning: Detected YouTube question.") |
| video_id = self.re.search(r'(?:v=|youtu\.be\/)([\w-]+)', question) |
| if video_id: |
| answer = self.get_youtube_transcript(video_id.group(1), question) |
| else: |
| answer = "Invalid YouTube URL" |
|
|
| elif "wikipedia" in question_lower: |
| print("Reasoning: Detected Wikipedia question.") |
| answer = self.fetch_web("https://en.wikipedia.org/wiki/Main_Page", question) |
|
|
| else: |
| print("Reasoning: Detected general text/reasoning question.") |
| prompt = f"Answer the following question:\nQuestion: {question}\nProvide only the final answer in the exact format required (e.g., number, comma-separated list, or name)." |
| answer = self.query_qwen(prompt, question) |
|
|
| print(f"Final Answer: {answer}") |
| print("=== Question Processing Complete ===\n") |
| return answer |
|
|
| def run_and_submit_all( profile: gr.OAuthProfile | None): |
| """ |
| Fetches all questions, runs the BasicAgent on them, submits all answers, |
| and displays the results. |
| """ |
| |
| space_id = os.getenv("SPACE_ID") |
|
|
| if profile: |
| username= f"{profile.username}" |
| print(f"User logged in: {username}") |
| else: |
| print("User not logged in.") |
| return "Please Login to Hugging Face with the button.", None |
|
|
| api_url = DEFAULT_API_URL |
| questions_url = f"{api_url}/questions" |
| submit_url = f"{api_url}/submit" |
|
|
| |
| try: |
| agent = BasicAgent() |
| except Exception as e: |
| print(f"Error instantiating agent: {e}") |
| return f"Error initializing agent: {e}", None |
| |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
| print(agent_code) |
|
|
| |
| print(f"Fetching questions from: {questions_url}") |
| try: |
| response = requests.get(questions_url, timeout=15) |
| response.raise_for_status() |
| questions_data = response.json() |
| if not questions_data: |
| print("Fetched questions list is empty.") |
| return "Fetched questions list is empty or invalid format.", None |
| print(f"Fetched {len(questions_data)} questions.") |
| except requests.exceptions.RequestException as e: |
| print(f"Error fetching questions: {e}") |
| return f"Error fetching questions: {e}", None |
| except requests.exceptions.JSONDecodeError as e: |
| print(f"Error decoding JSON response from questions endpoint: {e}") |
| print(f"Response text: {response.text[:500]}") |
| return f"Error decoding server response for questions: {e}", None |
| except Exception as e: |
| print(f"An unexpected error occurred fetching questions: {e}") |
| return f"An unexpected error occurred fetching questions: {e}", None |
|
|
| |
| results_log = [] |
| answers_payload = [] |
| print(f"Running agent on {len(questions_data)} questions...") |
| for item in questions_data: |
| task_id = item.get("task_id") |
| question_text = item.get("question") |
| if not task_id or question_text is None: |
| print(f"Skipping item with missing task_id or question: {item}") |
| continue |
| try: |
| submitted_answer = agent(question_text) |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
| except Exception as e: |
| print(f"Error running agent on task {task_id}: {e}") |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
| if not answers_payload: |
| print("Agent did not produce any answers to submit.") |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
| |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
| print(status_update) |
|
|
| |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
| try: |
| response = requests.post(submit_url, json=submission_data, timeout=60) |
| response.raise_for_status() |
| result_data = response.json() |
| final_status = ( |
| f"Submission Successful!\n" |
| f"User: {result_data.get('username')}\n" |
| f"Overall Score: {result_data.get('score', 'N/A')}% " |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
| f"Message: {result_data.get('message', 'No message received.')}" |
| ) |
| print("Submission successful.") |
| results_df = pd.DataFrame(results_log) |
| return final_status, results_df |
| except requests.exceptions.HTTPError as e: |
| error_detail = f"Server responded with status {e.response.status_code}." |
| try: |
| error_json = e.response.json() |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
| except requests.exceptions.JSONDecodeError: |
| error_detail += f" Response: {e.response.text[:500]}" |
| status_message = f"Submission Failed: {error_detail}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
| except requests.exceptions.Timeout: |
| status_message = "Submission Failed: The request timed out." |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
| except requests.exceptions.RequestException as e: |
| status_message = f"Submission Failed: Network error - {e}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
| except Exception as e: |
| status_message = f"An unexpected error occurred during submission: {e}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
|
|
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# Basic Agent Evaluation Runner") |
| gr.Markdown( |
| """ |
| **Instructions:** |
| |
| 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... |
| 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. |
| 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. |
| |
| --- |
| **Disclaimers:** |
| Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). |
| This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. |
| """ |
| ) |
|
|
| gr.LoginButton() |
|
|
| run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
| |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
| run_button.click( |
| fn=run_and_submit_all, |
| outputs=[status_output, results_table] |
| ) |
|
|
| if __name__ == "__main__": |
| print("\n" + "-"*30 + " App Starting " + "-"*30) |
| |
| space_host_startup = os.getenv("SPACE_HOST") |
| space_id_startup = os.getenv("SPACE_ID") |
|
|
| if space_host_startup: |
| print(f"✅ SPACE_HOST found: {space_host_startup}") |
| print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
| else: |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
| if space_id_startup: |
| print(f"✅ SPACE_ID found: {space_id_startup}") |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
| else: |
| print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
| print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
| print("Launching Gradio Interface for Basic Agent Evaluation...") |
| demo.launch(debug=True, share=False) |