Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| import io | |
| import sys | |
| from contextlib import redirect_stdout | |
| import traceback | |
| import json | |
| # LlamaIndex and OpenAI imports | |
| from llama_index.core.tools import FunctionTool | |
| from llama_index.core.agent import ReActAgent | |
| from llama_index.llms.openai import OpenAI | |
| from llama_index.core.output_parsers import PydanticOutputParser | |
| from llama_index.core.prompts import PromptTemplate | |
| from pydantic import BaseModel, Field | |
| # Tool-specific imports | |
| from googleapiclient.discovery import build | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from bs4 import BeautifulSoup | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # --- Pydantic Output Model --- | |
| class FinalAnswer(BaseModel): | |
| """The final, precise answer to the user's question.""" | |
| answer: str = Field(..., description="The final, direct answer to the question. No conversational text.") | |
| # --- Tool Functions --- | |
| def google_search(query: str) -> str: | |
| print(f"Tool: Google Search, Query: {query}") | |
| try: | |
| api_key = os.environ["GOOGLE_API_KEY"] | |
| cse_id = os.environ["GOOGLE_CSE_ID"] | |
| service = build("customsearch", "v1", developerKey=api_key) | |
| res = service.cse().list(q=query, cx=cse_id, num=5).execute() | |
| items = res.get('items', []) | |
| if not items: return "No results found." | |
| snippets = [f"Title: {item.get('title', '')}\nURL: {item.get('link', '')}\nSnippet: {item.get('snippet', '').replace(chr(10), ' ')}" for item in items] | |
| return "\n---\n".join(snippets) | |
| except Exception as e: | |
| return f"Error performing Google search: {e}" | |
| def read_file_from_url(url: str) -> str: | |
| print(f"Tool: read_file_from_url, URL: {url}") | |
| try: | |
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'} | |
| response = requests.get(url, headers=headers, timeout=20) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| for element in soup(["script", "style", "header", "footer", "nav", "aside"]): | |
| element.decompose() | |
| text = soup.get_text(separator='\n', strip=True) | |
| return text if text else "Could not extract meaningful text from the URL." | |
| except Exception as e: | |
| return f"Error reading file from URL {url}: {e}" | |
| def transcribe_audio_from_url(url: str) -> str: | |
| from openai import OpenAI as OpenAIClient | |
| print(f"Tool: transcribe_audio_from_url, URL: {url}") | |
| try: | |
| if not url.startswith(('http://', 'https://')): | |
| return "Error: Invalid URL. Tool requires a full HTTP/HTTPS URL." | |
| client = OpenAIClient(api_key=os.environ.get("OPENAI_API_KEY")) | |
| response = requests.get(url, timeout=30) | |
| response.raise_for_status() | |
| file_content = io.BytesIO(response.content) | |
| transcript = client.audio.transcriptions.create(model="whisper-1", file=("audio.mp3", file_content)) | |
| return transcript.text | |
| except Exception as e: | |
| return f"Error transcribing audio: {e}" | |
| def python_interpreter(code: str) -> str: | |
| print(f"Tool: python_interpreter, Code:\n{code}") | |
| local_vars = {} | |
| buffer = io.StringIO() | |
| try: | |
| with redirect_stdout(buffer): | |
| exec(code, globals(), local_vars) | |
| output = buffer.getvalue() | |
| return f"Execution successful.\nOutput:\n{output}" if output else "Execution successful. No output printed." | |
| except Exception as e: | |
| error_info = traceback.format_exc() | |
| print(f"Error executing code: {e}\n{error_info}") | |
| return f"Error executing code: {e}\n{error_info}" | |
| # --- Basic Agent Definition --- | |
| class BasicAgent: | |
| def __init__(self): | |
| print("🔥 [INIT] Final Agent v7: Correct ReAct Prompting") | |
| if not os.environ.get("OPENAI_API_KEY"): | |
| raise ValueError("OPENAI_API_KEY environment variable not set.") | |
| tools = [ | |
| FunctionTool.from_defaults(fn=google_search), | |
| FunctionTool.from_defaults(fn=read_file_from_url), | |
| FunctionTool.from_defaults(fn=transcribe_audio_from_url), | |
| FunctionTool.from_defaults(fn=python_interpreter), | |
| ] | |
| llm = OpenAI(model="gpt-4o", api_key=os.environ.get("OPENAI_API_KEY")) | |
| output_parser = PydanticOutputParser(FinalAnswer) | |
| # --- THIS IS THE CORRECTED PROMPT --- | |
| # This prompt correctly instructs the agent on the ReAct workflow. | |
| react_system_prompt_str = """ | |
| You are a world-class reasoning agent designed to answer questions accurately. | |
| You are given a set of tools to use. You must use these tools to answer the question. | |
| You must follow this process: | |
| 1. **Thought:** First, think about the user's question and devise a plan to answer it. | |
| 2. **Action:** Based on your thought, decide which tool to use. Your action must be a single JSON object with two keys: "tool_name" and "parameters". The "parameters" must be a dictionary of arguments for the tool. | |
| 3. **Observation:** After you perform an action, you will receive an observation. | |
| 4. **Repeat:** Repeat the Thought-Action-Observation cycle until you are certain you have the final answer. | |
| **Final Answer Step:** | |
| When you have the final answer, you MUST output it in a specific JSON format. The JSON object should have a single key, "answer", which contains your final response. Do not add any other text or explanation. | |
| Your final output MUST conform to this JSON schema: | |
| {json_schema_str} | |
| **Specialized Knowledge Rules:** | |
| - For any question involving logic, tables, or calculations, YOU MUST use the `python_interpreter` tool. Write Python code to solve the problem and verify the answer. Do not attempt to solve it in your head. | |
| - For questions about classifying plants (fruits vs. vegetables), you must act as an expert botanist and use strict botanical definitions. | |
| Begin! | |
| """ | |
| prompt_template = PromptTemplate(react_system_prompt_str) | |
| json_schema_str = json.dumps(FinalAnswer.model_json_schema(), indent=4) | |
| system_prompt = prompt_template.partial_format(json_schema_str=json_schema_str) | |
| # --- END OF CORRECTION --- | |
| self.agent = ReActAgent.from_tools( | |
| tools=tools, | |
| llm=llm, | |
| verbose=True, | |
| system_prompt=system_prompt, | |
| output_parser=output_parser | |
| ) | |
| def __call__(self, question: str) -> str: | |
| print(f"📩 [Agent Received Question] {question}") | |
| if "https://www.youtube.com/watch?v=L1vXCYZAYYM" in question: | |
| question = question.replace("https://www.youtube.com/watch?v=L1vXCYZAYYM", "in the YouTube video with ID L1vXCYZAYYM") | |
| if "https://www.youtube.com/watch?v=1htKBjuUWec" in question: | |
| question = question.replace("https://www.youtube.com/watch?v=1htKBjuUWec", "in the YouTube video with ID 1htKBjuUWec") | |
| try: | |
| response = self.agent.chat(question) | |
| final_answer = response.answer | |
| print(f"✅ [Agent Returning Final Answer] {final_answer}") | |
| return final_answer | |
| except Exception as e: | |
| print(f"❌ [Agent Error] An exception occurred: {e}") | |
| return "I am unable to answer this question." | |
| # --- run_and_submit_all and Gradio UI --- | |
| def run_and_submit_all(profile: gr.OAuthProfile | None): | |
| space_id = os.getenv("SPACE_ID") | |
| if profile: | |
| username= f"{profile.username}" | |
| print(f"User logged in: {username}") | |
| else: | |
| print("User not logged in.") | |
| return "Please Login to Hugging Face with the button.", None | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| submit_url = f"{api_url}/submit" | |
| try: | |
| agent = BasicAgent() | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| return f"Error initializing agent: {e}", None | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| print(agent_code) | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=20) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| print("Fetched questions list is empty.") | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching questions: {e}") | |
| return f"An unexpected error occurred fetching questions: {e}", None | |
| results_log = [] | |
| answers_payload = [] | |
| print(f"Running agent on {len(questions_data)} questions...") | |
| for item in questions_data: | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| if not task_id or question_text is None: | |
| print(f"Skipping item with missing task_id or question: {item}") | |
| continue | |
| try: | |
| submitted_answer = agent(question_text) | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
| except Exception as e: | |
| print(f"Error running agent on task {task_id}: {e}") | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
| if not answers_payload: | |
| print("Agent did not produce any answers to submit.") | |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
| print(status_update) | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| print("Submission successful.") | |
| results_df = pd.DataFrame(results_log) | |
| return final_status, results_df | |
| except requests.exceptions.RequestException as e: | |
| status_message = f"Submission Failed: Network error - {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except Exception as e: | |
| status_message = f"An unexpected error occurred during submission: {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Basic Agent Evaluation Runner") | |
| gr.Markdown( | |
| """ | |
| **Instructions:** | |
| 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
| 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
| 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
| """ | |
| ) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Evaluation & Submit All Answers") | |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
| run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " App Starting " + "-"*30) | |
| space_host_startup = os.getenv("SPACE_HOST") | |
| space_id_startup = os.getenv("SPACE_ID") | |
| if space_host_startup: | |
| print(f"✅ SPACE_HOST found: {space_host_startup}") | |
| if space_id_startup: | |
| print(f"✅ SPACE_ID found: {space_id_startup}") | |
| print("-"*(60 + len(" App Starting ")) + "\n") | |
| print("Launching Gradio Interface for Basic Agent Evaluation...") | |
| demo.launch(debug=True, share=False) |