GAIATorch / app.py
ChrisSacrumCor's picture
gemini 1.5
20c6159 verified
import os
import gradio as gr
import requests
import inspect
import pandas as pd
from smolagents import (
CodeAgent,
tool,
DuckDuckGoSearchTool,
VisitWebpageTool
)
from smolagents.models import LiteLLMModel
#from smolagents.prompts import CODE_SYSTEM_PROMPT
from huggingface_hub import login
#from dotenv import load_dotenv --- don't need HF spaces
from bs4 import BeautifulSoup
import json
import time
from datetime import datetime, timedelta
from collections import deque
# Load environment variables and authenticate
#load_dotenv() --- see above. Environment variables are loaded in HF Space -> Settings -> Secrets
hf_token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
if hf_token:
login(token=hf_token)
print("Successfully logged in to Hugging Face Hub")
else:
print("No Hugging Face API token found. Some functionalities may be limited.")
#api_key = os.environ.get("ANTHROPIC_API_KEY")
#if not api_key:
# raise ValueError("ANTHROPIC_API_KEY environment variable is not set")
#api_key = os.environ.get("OPENAI_API_KEY")
#if not api_key:
#raise ValueError("OPENAI_API_KEY environment variable is not set")
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable is not set")
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
CODE_SYSTEM_PROMPT = """
IMPORTANT GUIDELINES:
1. TOOL SELECTION: Choose the most appropriate tool for each task:
- Use search_agent for web searches and browsing
- Use math_calculation for calculations
- Use memory_tool to access previous information
- Use read_pdf_file for PDF documents
- Use process_csv for data analysis
2. EFFICIENT REASONING: Break complex problems into steps:
- First understand what the question is asking
- Identify what information you need
- Gather that information using appropriate tools
- Process and analyze the information
- Formulate a precise answer
3. ANSWER FORMAT: When you have the final answer, use:
final_answer("Your concise and precise answer")
Always prefer direct, efficient approaches. Avoid unnecessary steps.
"""
# --- Rate limiting ---
class TokenRateLimiter:
"""Rate limiter for API tokens with per-minute quotas"""
def __init__(self, tokens_per_minute=40000, safety_factor=0.8):
self.tokens_per_minute = tokens_per_minute
self.safety_factor = safety_factor # Use only 80% of the limit by default
self.token_history = deque() # Track tokens used with timestamps
self.tokens_this_minute = 0
self.last_reset = datetime.now()
def add_tokens(self, token_count):
"""Record token usage"""
now = datetime.now()
# Clean up old token records (older than 1 minute)
while self.token_history and self.token_history[0][1] < now - timedelta(minutes=1):
old_count, _ = self.token_history.popleft()
self.tokens_this_minute -= old_count
# Add new token count
self.token_history.append((token_count, now))
self.tokens_this_minute += token_count
def check_and_wait(self, token_count):
"""Check if adding tokens would exceed rate limit, wait if needed"""
# First clean up old tokens
now = datetime.now()
while self.token_history and self.token_history[0][1] < now - timedelta(minutes=1):
old_count, _ = self.token_history.popleft()
self.tokens_this_minute -= old_count
# Check if adding these tokens would exceed our safe limit
safe_limit = self.tokens_per_minute * self.safety_factor
if self.tokens_this_minute + token_count > safe_limit:
# Calculate wait time needed for enough tokens to become available
tokens_needed = (self.tokens_this_minute + token_count) - safe_limit
# Wait proportionally to how many tokens we need
wait_time = 60 * (tokens_needed / self.tokens_per_minute)
print(f"Rate limit approaching ({self.tokens_this_minute}/{safe_limit}). Waiting {wait_time:.2f} seconds...")
time.sleep(wait_time)
# Recalculate after waiting
self.check_and_wait(token_count)
# Record token usage
self.add_tokens(token_count)
def estimate_tokens(self, text):
"""Estimate token count from text"""
# Rough estimate: 1 token is 4 characters for English text
return len(text) // 4
# --- Memory System ---
class AgentMemory:
def __init__(self):
self.short_term = []
self.summary = ""
self.max_short_term = 10
def add_interaction(self, step_number, thought, action, observation):
self.short_term.append({
"step": step_number,
"thought": thought,
"action": action,
"observation": observation
})
if len(self.short_term) > self.max_short_term:
self.short_term.pop(0)
def update_summary(self, new_summary):
self.summary = new_summary
def get_context(self):
context = "SUMMARY OF PREVIOUS STEPS:\n" + self.summary + "\n\n"
context += "RECENT STEPS:\n"
for item in self.short_term:
context += f"Step {item['step']}:\n"
context += f"Thought: {item['thought']}\n"
context += f"Action: {item['action']}\n"
context += f"Observation: {item['observation']}\n\n"
return context
# Create a global memory instance
agent_memory = AgentMemory()
current_step = 0
# --- Tool Definitions ---
@tool
def read_pdf_file(file_path: str) -> str:
"""
Reads a PDF file and returns its content as a string.
Args:
file_path (str): The path to the PDF file to read.
Returns:
str: The content of the PDF file as a string.
"""
try:
from pypdf import PdfReader
content = ""
reader = PdfReader(file_path)
print(f"PDF has {len(reader.pages)} pages")
max_pages = min(50, len(reader.pages))
for i in range(max_pages):
content += reader.pages[i].extract_text() + "\n"
return content
except Exception as e:
return f"Error processing PDF: {str(e)}"
@tool
def get_hugging_face_top_daily_paper() -> str:
"""
Returns the most upvoted paper on Hugging Face daily papers.
Returns:
str: The title of the most upvoted paper.
"""
try:
url = "https://huggingface.co/papers"
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.content, "html.parser")
containers = soup.find_all('div', class_='SVELTE_HYDRATER contents')
top_paper = ""
for container in containers:
data_props = container.get('data-props', '')
if data_props:
try:
json_data = json.loads(data_props.replace('&quot;', '"'))
if 'dailyPapers' in json_data:
top_paper = json_data['dailyPapers'][0]['title']
except json.JSONDecodeError:
continue
return top_paper
except requests.exceptions.RequestException as e:
return f"Failed to fetch top paper due to an error: {e}"
@tool
def math_calculation(expression: str) -> str:
"""
Evaluates a mathematical expression using Python's built-in math capabilities.
Args:
expression (str): The mathematical expression to evaluate.
Returns:
str: The result of the calculation as a string.
"""
try:
import math
safe_dict = {
'sqrt': math.sqrt,
'sin': math.sin,
'cos': math.cos,
'tan': math.tan,
'pi': math.pi,
'e': math.e,
'abs': abs,
'pow': pow,
'round': round,
'floor': math.floor,
'ceil': math.ceil,
'log': math.log,
'log10': math.log10,
'degrees': math.degrees,
'radians': math.radians
}
for name, func in math.__dict__.items():
if callable(func) and not name.startswith('_'):
safe_dict[name] = func
result = eval(expression, {"__builtins__": {}}, safe_dict)
return str(result)
except Exception as e:
return f"Error calculating: {str(e)}"
@tool
def memory_tool(operation: str, content: str = "") -> str:
"""
Interact with the agent's memory system.
Args:
operation (str): The operation to perform ('recall' or 'summarize').
content (str, optional): The content to search for or summarize. Defaults to "".
Returns:
str: The result of the memory operation.
"""
global agent_memory
if operation == "recall":
if content.lower() == "all":
return agent_memory.get_context()
elif content.lower() == "summary":
return f"Current summary: {agent_memory.summary}"
else:
matches = []
for item in agent_memory.short_term:
if content.lower() in str(item).lower():
matches.append(item)
if matches:
result = "Relevant memories:\n"
for match in matches:
result += f"Step {match['step']}: {match['thought'][:100]}...\n"
return result
return "No relevant memories found."
elif operation == "summarize":
# Implementation for summarize operation
pass
return "Invalid memory operation"
@tool
def process_csv(file_path: str, operation: str = "summary") -> str:
"""
Process CSV data files for various operations.
USE THIS TOOL FOR: Analyzing tabular data, statistics, or any CSV file processing.
Args:
file_path: Path to the CSV file
operation: The operation to perform (summary, count, mean, etc.)
Returns:
Results of the requested operation
"""
try:
import pandas as pd
# Read the CSV file
df = pd.read_csv(file_path)
if operation == "summary":
return str(df.describe())
elif operation == "columns":
return str(df.columns.tolist())
elif operation == "count":
return str(len(df))
elif operation == "head":
return str(df.head())
elif operation.startswith("mean:"):
column = operation.split(":", 1)[1].strip()
return str(df[column].mean())
else:
return f"CSV loaded with {len(df)} rows and {len(df.columns)} columns"
except Exception as e:
return f"Error processing CSV: {str(e)}"
# --- Final Agent Implementation ---
class FinalAgent:
def __init__(self):
print("FinalAgent initialized.")
# Enhanced system prompt
"""
IMPORTANT: Tool Selection and Memory Guidelines
When solving problems, carefully choose the most appropriate tool for each step and use memory to track information:
1. Use memory_tool to:
- Recall previous information with memory_tool("recall", "query")
- Summarize important findings with memory_tool("summarize", "your summary")
2. For mathematical calculations, equations, or scientific facts:
- Use the math_calculation tool for calculations and evaluations
- Examples: math_calculation("2 + 2"), math_calculation("sin(pi/4)"), math_calculation("sqrt(16)")
3. For web information:
- Use DuckDuckGoSearchTool for general web searches
- Use VisitWebpageTool when you need to view the content of a specific URL
4. For file operations:
- Use read_pdf_file for PDF documents
- Use process_csv for CSV files and data analysis
5. For research:
- Use get_hugging_face_top_daily_paper to find the top paper on Hugging Face
When you have completed the task, provide your final answer using:
final_answer("Your complete answer here")
Make sure your final answer is:
1. Directly addresses the original question
2. Is in the exact format requested (if specified)
3. Is concise and clear
4. Only includes the answer itself, not your reasoning (unless requested)
"""
# Initialize model with appropriate parameters
self.model = LiteLLMModel(
api_key=api_key,
# model_id="gpt-4o",
model_id="gemini/gemini-1.5-flash",
# model_id="anthropic/claude-3-5-sonnet-20240620",
temperature=0.5 # Added temperature parameter
)
# Initialize agent with all configuration parameters
self.agent = CodeAgent(
model=self.model,
tools=[
# Web tools
DuckDuckGoSearchTool(),
VisitWebpageTool(),
# File tools
read_pdf_file,
process_csv,
# Math tools
math_calculation,
# Research tools
get_hugging_face_top_daily_paper,
# Memory tools
memory_tool
],
#system_prompt=self.enhanced_prompt, # Added system prompt
verbosity_level=2, # Added verbosity
max_steps=20, # Added max steps
additional_authorized_imports=[ # Added authorized imports
"os", "requests", "json", "bs4",
"pandas", "numpy", "datetime", "math",
"markdownify", "re", "urllib", "time",
"collections", "itertools", "functools"
]
)
def __call__(self, question: str) -> str:
print(f"FinalAgent received question (first 50 chars): {question[:50]}...")
try:
response = self.agent.run(question)
print(f"FinalAgent returning answer: {response[:100]}...")
return response
except Exception as e:
error_msg = f"Error in FinalAgent: {str(e)}"
print(error_msg)
return error_msg
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the FinalAgent on them, submits all answers,
and displays the results.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID")
if profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent
try:
agent = FinalAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
# 3. Run your Agent
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
try:
submitted_answer = agent(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# Final Agent Evaluation Runner")
gr.Markdown(
"""
**Instructions:**
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ...
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission.
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score.
---
**Disclaimers:**
Once clicking on the "submit button, it can take quite some time (this is the time for the agent to go through all the questions).
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution.
"""
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID")
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup:
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
print("-"*(60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for Final Agent Evaluation...")
demo.launch(debug=True, share=False)