Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import requests | |
| import inspect | |
| import pandas as pd | |
| from smolagents import ( | |
| CodeAgent, | |
| tool, | |
| DuckDuckGoSearchTool, | |
| VisitWebpageTool | |
| ) | |
| from smolagents.models import LiteLLMModel | |
| #from smolagents.prompts import CODE_SYSTEM_PROMPT | |
| from huggingface_hub import login | |
| #from dotenv import load_dotenv --- don't need HF spaces | |
| from bs4 import BeautifulSoup | |
| import json | |
| import time | |
| from datetime import datetime, timedelta | |
| from collections import deque | |
| # Load environment variables and authenticate | |
| #load_dotenv() --- see above. Environment variables are loaded in HF Space -> Settings -> Secrets | |
| hf_token = os.environ.get("HUGGINGFACEHUB_API_TOKEN") | |
| if hf_token: | |
| login(token=hf_token) | |
| print("Successfully logged in to Hugging Face Hub") | |
| else: | |
| print("No Hugging Face API token found. Some functionalities may be limited.") | |
| #api_key = os.environ.get("ANTHROPIC_API_KEY") | |
| #if not api_key: | |
| # raise ValueError("ANTHROPIC_API_KEY environment variable is not set") | |
| #api_key = os.environ.get("OPENAI_API_KEY") | |
| #if not api_key: | |
| #raise ValueError("OPENAI_API_KEY environment variable is not set") | |
| api_key = os.environ.get("GOOGLE_API_KEY") | |
| if not api_key: | |
| raise ValueError("GOOGLE_API_KEY environment variable is not set") | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| CODE_SYSTEM_PROMPT = """ | |
| IMPORTANT GUIDELINES: | |
| 1. TOOL SELECTION: Choose the most appropriate tool for each task: | |
| - Use search_agent for web searches and browsing | |
| - Use math_calculation for calculations | |
| - Use memory_tool to access previous information | |
| - Use read_pdf_file for PDF documents | |
| - Use process_csv for data analysis | |
| 2. EFFICIENT REASONING: Break complex problems into steps: | |
| - First understand what the question is asking | |
| - Identify what information you need | |
| - Gather that information using appropriate tools | |
| - Process and analyze the information | |
| - Formulate a precise answer | |
| 3. ANSWER FORMAT: When you have the final answer, use: | |
| final_answer("Your concise and precise answer") | |
| Always prefer direct, efficient approaches. Avoid unnecessary steps. | |
| """ | |
| # --- Rate limiting --- | |
| class TokenRateLimiter: | |
| """Rate limiter for API tokens with per-minute quotas""" | |
| def __init__(self, tokens_per_minute=40000, safety_factor=0.8): | |
| self.tokens_per_minute = tokens_per_minute | |
| self.safety_factor = safety_factor # Use only 80% of the limit by default | |
| self.token_history = deque() # Track tokens used with timestamps | |
| self.tokens_this_minute = 0 | |
| self.last_reset = datetime.now() | |
| def add_tokens(self, token_count): | |
| """Record token usage""" | |
| now = datetime.now() | |
| # Clean up old token records (older than 1 minute) | |
| while self.token_history and self.token_history[0][1] < now - timedelta(minutes=1): | |
| old_count, _ = self.token_history.popleft() | |
| self.tokens_this_minute -= old_count | |
| # Add new token count | |
| self.token_history.append((token_count, now)) | |
| self.tokens_this_minute += token_count | |
| def check_and_wait(self, token_count): | |
| """Check if adding tokens would exceed rate limit, wait if needed""" | |
| # First clean up old tokens | |
| now = datetime.now() | |
| while self.token_history and self.token_history[0][1] < now - timedelta(minutes=1): | |
| old_count, _ = self.token_history.popleft() | |
| self.tokens_this_minute -= old_count | |
| # Check if adding these tokens would exceed our safe limit | |
| safe_limit = self.tokens_per_minute * self.safety_factor | |
| if self.tokens_this_minute + token_count > safe_limit: | |
| # Calculate wait time needed for enough tokens to become available | |
| tokens_needed = (self.tokens_this_minute + token_count) - safe_limit | |
| # Wait proportionally to how many tokens we need | |
| wait_time = 60 * (tokens_needed / self.tokens_per_minute) | |
| print(f"Rate limit approaching ({self.tokens_this_minute}/{safe_limit}). Waiting {wait_time:.2f} seconds...") | |
| time.sleep(wait_time) | |
| # Recalculate after waiting | |
| self.check_and_wait(token_count) | |
| # Record token usage | |
| self.add_tokens(token_count) | |
| def estimate_tokens(self, text): | |
| """Estimate token count from text""" | |
| # Rough estimate: 1 token is 4 characters for English text | |
| return len(text) // 4 | |
| # --- Memory System --- | |
| class AgentMemory: | |
| def __init__(self): | |
| self.short_term = [] | |
| self.summary = "" | |
| self.max_short_term = 10 | |
| def add_interaction(self, step_number, thought, action, observation): | |
| self.short_term.append({ | |
| "step": step_number, | |
| "thought": thought, | |
| "action": action, | |
| "observation": observation | |
| }) | |
| if len(self.short_term) > self.max_short_term: | |
| self.short_term.pop(0) | |
| def update_summary(self, new_summary): | |
| self.summary = new_summary | |
| def get_context(self): | |
| context = "SUMMARY OF PREVIOUS STEPS:\n" + self.summary + "\n\n" | |
| context += "RECENT STEPS:\n" | |
| for item in self.short_term: | |
| context += f"Step {item['step']}:\n" | |
| context += f"Thought: {item['thought']}\n" | |
| context += f"Action: {item['action']}\n" | |
| context += f"Observation: {item['observation']}\n\n" | |
| return context | |
| # Create a global memory instance | |
| agent_memory = AgentMemory() | |
| current_step = 0 | |
| # --- Tool Definitions --- | |
| def read_pdf_file(file_path: str) -> str: | |
| """ | |
| Reads a PDF file and returns its content as a string. | |
| Args: | |
| file_path (str): The path to the PDF file to read. | |
| Returns: | |
| str: The content of the PDF file as a string. | |
| """ | |
| try: | |
| from pypdf import PdfReader | |
| content = "" | |
| reader = PdfReader(file_path) | |
| print(f"PDF has {len(reader.pages)} pages") | |
| max_pages = min(50, len(reader.pages)) | |
| for i in range(max_pages): | |
| content += reader.pages[i].extract_text() + "\n" | |
| return content | |
| except Exception as e: | |
| return f"Error processing PDF: {str(e)}" | |
| def get_hugging_face_top_daily_paper() -> str: | |
| """ | |
| Returns the most upvoted paper on Hugging Face daily papers. | |
| Returns: | |
| str: The title of the most upvoted paper. | |
| """ | |
| try: | |
| url = "https://huggingface.co/papers" | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, "html.parser") | |
| containers = soup.find_all('div', class_='SVELTE_HYDRATER contents') | |
| top_paper = "" | |
| for container in containers: | |
| data_props = container.get('data-props', '') | |
| if data_props: | |
| try: | |
| json_data = json.loads(data_props.replace('"', '"')) | |
| if 'dailyPapers' in json_data: | |
| top_paper = json_data['dailyPapers'][0]['title'] | |
| except json.JSONDecodeError: | |
| continue | |
| return top_paper | |
| except requests.exceptions.RequestException as e: | |
| return f"Failed to fetch top paper due to an error: {e}" | |
| def math_calculation(expression: str) -> str: | |
| """ | |
| Evaluates a mathematical expression using Python's built-in math capabilities. | |
| Args: | |
| expression (str): The mathematical expression to evaluate. | |
| Returns: | |
| str: The result of the calculation as a string. | |
| """ | |
| try: | |
| import math | |
| safe_dict = { | |
| 'sqrt': math.sqrt, | |
| 'sin': math.sin, | |
| 'cos': math.cos, | |
| 'tan': math.tan, | |
| 'pi': math.pi, | |
| 'e': math.e, | |
| 'abs': abs, | |
| 'pow': pow, | |
| 'round': round, | |
| 'floor': math.floor, | |
| 'ceil': math.ceil, | |
| 'log': math.log, | |
| 'log10': math.log10, | |
| 'degrees': math.degrees, | |
| 'radians': math.radians | |
| } | |
| for name, func in math.__dict__.items(): | |
| if callable(func) and not name.startswith('_'): | |
| safe_dict[name] = func | |
| result = eval(expression, {"__builtins__": {}}, safe_dict) | |
| return str(result) | |
| except Exception as e: | |
| return f"Error calculating: {str(e)}" | |
| def memory_tool(operation: str, content: str = "") -> str: | |
| """ | |
| Interact with the agent's memory system. | |
| Args: | |
| operation (str): The operation to perform ('recall' or 'summarize'). | |
| content (str, optional): The content to search for or summarize. Defaults to "". | |
| Returns: | |
| str: The result of the memory operation. | |
| """ | |
| global agent_memory | |
| if operation == "recall": | |
| if content.lower() == "all": | |
| return agent_memory.get_context() | |
| elif content.lower() == "summary": | |
| return f"Current summary: {agent_memory.summary}" | |
| else: | |
| matches = [] | |
| for item in agent_memory.short_term: | |
| if content.lower() in str(item).lower(): | |
| matches.append(item) | |
| if matches: | |
| result = "Relevant memories:\n" | |
| for match in matches: | |
| result += f"Step {match['step']}: {match['thought'][:100]}...\n" | |
| return result | |
| return "No relevant memories found." | |
| elif operation == "summarize": | |
| # Implementation for summarize operation | |
| pass | |
| return "Invalid memory operation" | |
| def process_csv(file_path: str, operation: str = "summary") -> str: | |
| """ | |
| Process CSV data files for various operations. | |
| USE THIS TOOL FOR: Analyzing tabular data, statistics, or any CSV file processing. | |
| Args: | |
| file_path: Path to the CSV file | |
| operation: The operation to perform (summary, count, mean, etc.) | |
| Returns: | |
| Results of the requested operation | |
| """ | |
| try: | |
| import pandas as pd | |
| # Read the CSV file | |
| df = pd.read_csv(file_path) | |
| if operation == "summary": | |
| return str(df.describe()) | |
| elif operation == "columns": | |
| return str(df.columns.tolist()) | |
| elif operation == "count": | |
| return str(len(df)) | |
| elif operation == "head": | |
| return str(df.head()) | |
| elif operation.startswith("mean:"): | |
| column = operation.split(":", 1)[1].strip() | |
| return str(df[column].mean()) | |
| else: | |
| return f"CSV loaded with {len(df)} rows and {len(df.columns)} columns" | |
| except Exception as e: | |
| return f"Error processing CSV: {str(e)}" | |
| # --- Final Agent Implementation --- | |
| class FinalAgent: | |
| def __init__(self): | |
| print("FinalAgent initialized.") | |
| # Enhanced system prompt | |
| """ | |
| IMPORTANT: Tool Selection and Memory Guidelines | |
| When solving problems, carefully choose the most appropriate tool for each step and use memory to track information: | |
| 1. Use memory_tool to: | |
| - Recall previous information with memory_tool("recall", "query") | |
| - Summarize important findings with memory_tool("summarize", "your summary") | |
| 2. For mathematical calculations, equations, or scientific facts: | |
| - Use the math_calculation tool for calculations and evaluations | |
| - Examples: math_calculation("2 + 2"), math_calculation("sin(pi/4)"), math_calculation("sqrt(16)") | |
| 3. For web information: | |
| - Use DuckDuckGoSearchTool for general web searches | |
| - Use VisitWebpageTool when you need to view the content of a specific URL | |
| 4. For file operations: | |
| - Use read_pdf_file for PDF documents | |
| - Use process_csv for CSV files and data analysis | |
| 5. For research: | |
| - Use get_hugging_face_top_daily_paper to find the top paper on Hugging Face | |
| When you have completed the task, provide your final answer using: | |
| final_answer("Your complete answer here") | |
| Make sure your final answer is: | |
| 1. Directly addresses the original question | |
| 2. Is in the exact format requested (if specified) | |
| 3. Is concise and clear | |
| 4. Only includes the answer itself, not your reasoning (unless requested) | |
| """ | |
| # Initialize model with appropriate parameters | |
| self.model = LiteLLMModel( | |
| api_key=api_key, | |
| # model_id="gpt-4o", | |
| model_id="gemini/gemini-1.5-flash", | |
| # model_id="anthropic/claude-3-5-sonnet-20240620", | |
| temperature=0.5 # Added temperature parameter | |
| ) | |
| # Initialize agent with all configuration parameters | |
| self.agent = CodeAgent( | |
| model=self.model, | |
| tools=[ | |
| # Web tools | |
| DuckDuckGoSearchTool(), | |
| VisitWebpageTool(), | |
| # File tools | |
| read_pdf_file, | |
| process_csv, | |
| # Math tools | |
| math_calculation, | |
| # Research tools | |
| get_hugging_face_top_daily_paper, | |
| # Memory tools | |
| memory_tool | |
| ], | |
| #system_prompt=self.enhanced_prompt, # Added system prompt | |
| verbosity_level=2, # Added verbosity | |
| max_steps=20, # Added max steps | |
| additional_authorized_imports=[ # Added authorized imports | |
| "os", "requests", "json", "bs4", | |
| "pandas", "numpy", "datetime", "math", | |
| "markdownify", "re", "urllib", "time", | |
| "collections", "itertools", "functools" | |
| ] | |
| ) | |
| def __call__(self, question: str) -> str: | |
| print(f"FinalAgent received question (first 50 chars): {question[:50]}...") | |
| try: | |
| response = self.agent.run(question) | |
| print(f"FinalAgent returning answer: {response[:100]}...") | |
| return response | |
| except Exception as e: | |
| error_msg = f"Error in FinalAgent: {str(e)}" | |
| print(error_msg) | |
| return error_msg | |
| def run_and_submit_all(profile: gr.OAuthProfile | None): | |
| """ | |
| Fetches all questions, runs the FinalAgent on them, submits all answers, | |
| and displays the results. | |
| """ | |
| # --- Determine HF Space Runtime URL and Repo URL --- | |
| space_id = os.getenv("SPACE_ID") | |
| if profile: | |
| username = f"{profile.username}" | |
| print(f"User logged in: {username}") | |
| else: | |
| print("User not logged in.") | |
| return "Please Login to Hugging Face with the button.", None | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| submit_url = f"{api_url}/submit" | |
| # 1. Instantiate Agent | |
| try: | |
| agent = FinalAgent() | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| return f"Error initializing agent: {e}", None | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| print(agent_code) | |
| # 2. Fetch Questions | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| print("Fetched questions list is empty.") | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching questions: {e}") | |
| return f"Error fetching questions: {e}", None | |
| except requests.exceptions.JSONDecodeError as e: | |
| print(f"Error decoding JSON response from questions endpoint: {e}") | |
| print(f"Response text: {response.text[:500]}") | |
| return f"Error decoding server response for questions: {e}", None | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching questions: {e}") | |
| return f"An unexpected error occurred fetching questions: {e}", None | |
| # 3. Run your Agent | |
| results_log = [] | |
| answers_payload = [] | |
| print(f"Running agent on {len(questions_data)} questions...") | |
| for item in questions_data: | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| if not task_id or question_text is None: | |
| print(f"Skipping item with missing task_id or question: {item}") | |
| continue | |
| try: | |
| submitted_answer = agent(question_text) | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
| except Exception as e: | |
| print(f"Error running agent on task {task_id}: {e}") | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
| if not answers_payload: | |
| print("Agent did not produce any answers to submit.") | |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
| # 4. Prepare Submission | |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
| print(status_update) | |
| # 5. Submit | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| print("Submission successful.") | |
| results_df = pd.DataFrame(results_log) | |
| return final_status, results_df | |
| except requests.exceptions.HTTPError as e: | |
| error_detail = f"Server responded with status {e.response.status_code}." | |
| try: | |
| error_json = e.response.json() | |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_detail += f" Response: {e.response.text[:500]}" | |
| status_message = f"Submission Failed: {error_detail}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.Timeout: | |
| status_message = "Submission Failed: The request timed out." | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.RequestException as e: | |
| status_message = f"Submission Failed: Network error - {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except Exception as e: | |
| status_message = f"An unexpected error occurred during submission: {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| # --- Build Gradio Interface using Blocks --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Final Agent Evaluation Runner") | |
| gr.Markdown( | |
| """ | |
| **Instructions:** | |
| 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
| 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
| 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
| --- | |
| **Disclaimers:** | |
| Once clicking on the "submit button, it can take quite some time (this is the time for the agent to go through all the questions). | |
| This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. | |
| """ | |
| ) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Evaluation & Submit All Answers") | |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
| run_button.click( | |
| fn=run_and_submit_all, | |
| outputs=[status_output, results_table] | |
| ) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " App Starting " + "-"*30) | |
| space_host_startup = os.getenv("SPACE_HOST") | |
| space_id_startup = os.getenv("SPACE_ID") | |
| if space_host_startup: | |
| print(f"✅ SPACE_HOST found: {space_host_startup}") | |
| print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
| else: | |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
| if space_id_startup: | |
| print(f"✅ SPACE_ID found: {space_id_startup}") | |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
| else: | |
| print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
| print("-"*(60 + len(" App Starting ")) + "\n") | |
| print("Launching Gradio Interface for Final Agent Evaluation...") | |
| demo.launch(debug=True, share=False) |