Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import requests | |
| import inspect | |
| import pandas as pd | |
| import json | |
| import time | |
| from typing import List, Dict, Any, Optional | |
| from litellm import completion | |
| from duckduckgo_search import DDGS | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # --- Tool Implementations --- | |
| class DuckDuckGoSearchTool: | |
| def __init__(self): | |
| self.name = "duckduckgo_search" | |
| self.description = "Search the web using DuckDuckGo" | |
| def search(self, query: str, max_results: int = 5) -> List[Dict[str, str]]: | |
| """ | |
| Search the web using DuckDuckGo and return results. | |
| Args: | |
| query: The search query | |
| max_results: Maximum number of results to return | |
| Returns: | |
| List of dictionaries with search results | |
| """ | |
| try: | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=max_results)) | |
| return results | |
| except Exception as e: | |
| print(f"DuckDuckGo search error: {e}") | |
| return [{"title": f"Search error: {e}", "body": "", "href": ""}] | |
| def __call__(self, query: str, max_results: int = 5) -> Dict[str, Any]: | |
| """ | |
| Execute the search and return results in a structured format. | |
| Args: | |
| query: The search query | |
| max_results: Maximum number of results to return | |
| Returns: | |
| Dictionary with search results and metadata | |
| """ | |
| start_time = time.time() | |
| results = self.search(query, max_results) | |
| end_time = time.time() | |
| return { | |
| "tool_name": self.name, | |
| "query": query, | |
| "results": results, | |
| "result_count": len(results), | |
| "time_taken": end_time - start_time | |
| } | |
| # --- LiteLLM Model Wrapper --- | |
| class LiteLLMModel: | |
| def __init__(self, model_id: str, api_key: str): | |
| self.model_id = model_id | |
| self.api_key = api_key | |
| print(f"Initialized LiteLLM with model: {model_id}") | |
| def generate(self, prompt: str, system_prompt: str = None) -> str: | |
| """ | |
| Generate text using the LiteLLM model. | |
| Args: | |
| prompt: The user prompt | |
| system_prompt: Optional system prompt | |
| Returns: | |
| Generated text response | |
| """ | |
| try: | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| messages.append({"role": "user", "content": prompt}) | |
| response = completion( | |
| model=self.model_id, | |
| messages=messages, | |
| api_key=self.api_key | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| print(f"LiteLLM generation error: {e}") | |
| return f"Error generating response: {str(e)}" | |
| # --- Advanced Agent Implementation --- | |
| class CodeAgent: | |
| def __init__(self, tools: List[Any], model: LiteLLMModel): | |
| self.tools = tools | |
| self.model = model | |
| self.search_tool = next((tool for tool in tools if isinstance(tool, DuckDuckGoSearchTool)), None) | |
| print(f"CodeAgent initialized with {len(tools)} tools and model {model.model_id}") | |
| def format_search_results(self, results: List[Dict[str, str]]) -> str: | |
| """Format search results into a readable string""" | |
| formatted = "Search Results:\n" | |
| for i, result in enumerate(results, 1): | |
| formatted += f"{i}. {result.get('title', 'No title')}\n" | |
| formatted += f" {result.get('body', 'No description')[:200]}...\n" | |
| formatted += f" URL: {result.get('href', 'No URL')}\n\n" | |
| return formatted | |
| def create_prompt(self, question: str, search_results: Optional[List[Dict[str, str]]] = None) -> str: | |
| """Create a prompt for the model with optional search results""" | |
| prompt = f"Question: {question}\n\n" | |
| if search_results: | |
| prompt += self.format_search_results(search_results) | |
| prompt += "\nPlease provide a concise, factual answer to the question. " | |
| prompt += "Your answer should be direct and to the point, without any explanations or reasoning. " | |
| prompt += "For example, if asked 'What is the capital of France?', just answer 'Paris'. " | |
| prompt += "If asked for a numerical value, provide only the number. " | |
| prompt += "If asked for a list, provide comma-separated values without numbering. " | |
| prompt += "If you don't know the answer, respond with 'Unknown' rather than speculating.\n\n" | |
| prompt += "Answer: " | |
| return prompt | |
| def create_system_prompt(self) -> str: | |
| """Create a system prompt for the model""" | |
| return ( | |
| "You are a helpful AI assistant specialized in answering factual questions. " | |
| "You always provide direct, concise answers without explanations or reasoning. " | |
| "Your answers are factual, accurate, and to the point. " | |
| "For questions requiring specific formats, you follow those formats exactly. " | |
| "You never include phrases like 'the answer is' or 'I believe' in your responses." | |
| ) | |
| def __call__(self, question: str) -> str: | |
| """ | |
| Process a question and return an answer. | |
| Args: | |
| question: The question to answer | |
| Returns: | |
| The answer to the question | |
| """ | |
| print(f"Agent received question: {question[:100]}...") | |
| # Determine if we should use search for this question | |
| should_search = ( | |
| "what is" in question.lower() or | |
| "who is" in question.lower() or | |
| "when" in question.lower() or | |
| "where" in question.lower() or | |
| "how many" in question.lower() or | |
| "which" in question.lower() | |
| ) | |
| search_results = None | |
| if should_search and self.search_tool: | |
| print(f"Searching for information about: {question}") | |
| search_response = self.search_tool(question, max_results=3) | |
| search_results = search_response.get("results", []) | |
| print(f"Found {len(search_results)} search results") | |
| # Create prompt and generate response | |
| prompt = self.create_prompt(question, search_results) | |
| system_prompt = self.create_system_prompt() | |
| print("Generating response with LLM...") | |
| response = self.model.generate(prompt, system_prompt) | |
| # Clean up the response | |
| answer = response.strip() | |
| # Remove common prefixes that models tend to add | |
| prefixes_to_remove = [ | |
| "Answer:", "The answer is:", "I believe", "I think", | |
| "Based on", "According to", "The answer would be" | |
| ] | |
| for prefix in prefixes_to_remove: | |
| if answer.startswith(prefix): | |
| answer = answer[len(prefix):].strip() | |
| # Remove quotes if they wrap the entire answer | |
| if (answer.startswith('"') and answer.endswith('"')) or \ | |
| (answer.startswith("'") and answer.endswith("'")): | |
| answer = answer[1:-1].strip() | |
| print(f"Final answer: {answer[:100]}...") | |
| return answer | |
| def run_and_submit_all(profile: gr.OAuthProfile | None): | |
| """ | |
| Fetches all questions, runs the Agent on them, submits all answers, | |
| and displays the results. | |
| """ | |
| # --- Determine HF Space Runtime URL and Repo URL --- | |
| space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
| if profile: | |
| username= f"{profile.username}" | |
| print(f"User logged in: {username}") | |
| else: | |
| print("User not logged in.") | |
| return "Please Login to Hugging Face with the button.", None | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| submit_url = f"{api_url}/submit" | |
| # 1. Instantiate Agent with Gemini model and DuckDuckGo search | |
| try: | |
| # Get API key from environment variable | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if not api_key: | |
| return "Error: GEMINI_API_KEY environment variable not found. Please set it in your Space settings.", None | |
| model = LiteLLMModel(model_id="gemini/gemini-2.0-flash-lite", api_key=api_key) | |
| agent = CodeAgent(tools=[DuckDuckGoSearchTool()], model=model) | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| return f"Error initializing agent: {e}", None | |
| # In the case of an app running as a hugging Face space, this link points toward your codebase | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| print(agent_code) | |
| # 2. Fetch Questions | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| print("Fetched questions list is empty.") | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching questions: {e}") | |
| return f"Error fetching questions: {e}", None | |
| except requests.exceptions.JSONDecodeError as e: | |
| print(f"Error decoding JSON response from questions endpoint: {e}") | |
| print(f"Response text: {response.text[:500]}") | |
| return f"Error decoding server response for questions: {e}", None | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching questions: {e}") | |
| return f"An unexpected error occurred fetching questions: {e}", None | |
| # 3. Run your Agent | |
| results_log = [] | |
| answers_payload = [] | |
| print(f"Running agent on {len(questions_data)} questions...") | |
| for item in questions_data: | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| if not task_id or question_text is None: | |
| print(f"Skipping item with missing task_id or question: {item}") | |
| continue | |
| try: | |
| print(f"Processing task {task_id}: {question_text[:50]}...") | |
| submitted_answer = agent(question_text) | |
| # Important: Use "model_answer" as the key, not "submitted_answer" | |
| answers_payload.append({"task_id": task_id, "model_answer": submitted_answer}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
| print(f"Answer for task {task_id}: {submitted_answer[:50]}...") | |
| except Exception as e: | |
| print(f"Error running agent on task {task_id}: {e}") | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
| if not answers_payload: | |
| print("Agent did not produce any answers to submit.") | |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
| # 4. Submit answers directly as a list of dictionaries | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| # Important: Submit the answers_payload directly as JSON | |
| response = requests.post(submit_url, json=answers_payload, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| ) | |
| print("Submission successful.") | |
| results_df = pd.DataFrame(results_log) | |
| return final_status, results_df | |
| except requests.exceptions.HTTPError as e: | |
| error_detail = f"Server responded with status {e.response.status_code}." | |
| try: | |
| error_json = e.response.json() | |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_detail += f" Response: {e.response.text[:500]}" | |
| status_message = f"Submission Failed: {error_detail}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.Timeout: | |
| status_message = "Submission Failed: The request timed out." | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.RequestException as e: | |
| status_message = f"Submission Failed: Network error - {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except Exception as e: | |
| status_message = f"An unexpected error occurred during submission: {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| # --- Build Gradio Interface using Blocks --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Gemini Agent for GAIA Benchmark") | |
| gr.Markdown( | |
| """ | |
| **Instructions:** | |
| 1. Make sure you have set the GEMINI_API_KEY environment variable in your Space settings. | |
| 2. Log in to your Hugging Face account using the button below. | |
| 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run the agent, and submit answers. | |
| This agent uses: | |
| - Gemini 2.0 Flash Lite model for reasoning | |
| - DuckDuckGo search for retrieving information | |
| """ | |
| ) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Evaluation & Submit All Answers") | |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
| # Add a single question test feature | |
| gr.Markdown("## Test Single Question") | |
| with gr.Row(): | |
| question_in = gr.Textbox(label="Question", lines=3) | |
| answer_out = gr.Textbox(label="Answer", lines=3, interactive=False) | |
| test_btn = gr.Button("Test Question", variant="secondary") | |
| # Add a function to test a single question | |
| def test_single_question(question): | |
| try: | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if not api_key: | |
| return "Error: GEMINI_API_KEY environment variable not found" | |
| model = LiteLLMModel(model_id="gemini/gemini-2.0-flash-lite", api_key=AIzaSyAhmwogxZFBtt7_OUsKQGNeOYF7ced39bM) | |
| agent = CodeAgent(tools=[DuckDuckGoSearchTool()], model=model) | |
| answer = agent(question) | |
| return answer | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| run_button.click( | |
| fn=run_and_submit_all, | |
| inputs=gr.OAuthProfile(), | |
| outputs=[status_output, results_table] | |
| ) | |
| run_button.click( | |
| fn=run_and_submit_all, | |
| inputs=login_button, | |
| outputs=[status_output, results_table] | |
| ) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " App Starting " + "-"*30) | |
| # Check for SPACE_HOST and SPACE_ID at startup for information | |
| space_host_startup = os.getenv("SPACE_HOST") | |
| space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
| if space_host_startup: | |
| print(f"✅ SPACE_HOST found: {space_host_startup}") | |
| print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
| else: | |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
| if space_id_startup: # Print repo URLs if SPACE_ID is found | |
| print(f"✅ SPACE_ID found: {space_id_startup}") | |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
| else: | |
| print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
| print("-"*(60 + len(" App Starting ")) + "\n") | |
| print("Launching Gradio Interface for Gemini Agent Evaluation...") | |
| demo.launch(debug=True, share=False) | |