Final_Assignment_Template / evaluation_app.py
AheedTahir's picture
Enhance error handling and retry logic for fetching questions and submissions
cba8124
""" Basic Agent Evaluation Runner"""
import os
import inspect
import gradio as gr
import requests
import pandas as pd
import time
from langchain_core.messages import HumanMessage
from agent import build_graph
from typing import Optional, Dict, Any
import random
# (Keep Constants as is)
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# Retry configuration
MAX_RETRIES = 5
INITIAL_RETRY_DELAY = 2 # seconds
MAX_RETRY_DELAY = 60 # seconds
# --- Helper Functions ---
def fetch_with_retry(url: str, max_retries: int = MAX_RETRIES, timeout: int = 15) -> Optional[Dict[Any, Any]]:
"""
Fetch data from URL with exponential backoff retry logic.
Args:
url: The URL to fetch from
max_retries: Maximum number of retry attempts
timeout: Request timeout in seconds
Returns:
JSON response data or None if all retries failed
Raises:
requests.exceptions.RequestException: If all retries are exhausted
"""
for attempt in range(max_retries):
try:
print(f"Attempt {attempt + 1}/{max_retries}: Fetching from {url}")
response = requests.get(url, timeout=timeout)
response.raise_for_status()
data = response.json()
print(f"✅ Successfully fetched data on attempt {attempt + 1}")
return data
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
# Rate limit error - use exponential backoff
if attempt < max_retries - 1:
# Calculate delay with exponential backoff + jitter
delay = min(INITIAL_RETRY_DELAY * (2 ** attempt), MAX_RETRY_DELAY)
jitter = random.uniform(0, delay * 0.1) # Add 10% jitter
total_delay = delay + jitter
print(f"⚠️ Rate limit hit (429). Waiting {total_delay:.1f}s before retry {attempt + 2}/{max_retries}...")
time.sleep(total_delay)
continue
else:
print(f"❌ Rate limit persists after {max_retries} attempts")
raise
else:
# Other HTTP error - raise immediately
print(f"❌ HTTP error {e.response.status_code}: {e}")
raise
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
delay = INITIAL_RETRY_DELAY * (attempt + 1)
print(f"⚠️ Request timeout. Retrying in {delay}s...")
time.sleep(delay)
continue
else:
print(f"❌ Request timed out after {max_retries} attempts")
raise
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
delay = INITIAL_RETRY_DELAY * (attempt + 1)
print(f"⚠️ Network error: {e}. Retrying in {delay}s...")
time.sleep(delay)
continue
else:
print(f"❌ Network error persists after {max_retries} attempts")
raise
return None
# --- Basic Agent Definition ---
# ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------
class BasicAgent:
"""A langgraph agent."""
def __init__(self):
print("BasicAgent initialized.")
self.graph = build_graph()
def __call__(self, question: str) -> str:
print(f"Agent received question (first 50 chars): {question[:50]}...")
# Wrap the question in a HumanMessage from langchain_core
messages = [HumanMessage(content=question)]
config = {"configurable": {"thread_id": "evaluation"}}
result = self.graph.invoke({"messages": messages}, config=config)
answer = result['messages'][-1].content
# Extract final answer if it has "Final Answer:" prefix
if "Final Answer:" in answer:
answer = answer.split("Final Answer:")[-1].strip()
return answer
def run_and_submit_all( profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the BasicAgent on them, submits all answers,
and displays the results.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code
if profile:
username= f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent ( modify this part to create your agent)
try:
agent = BasicAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public)
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions with Retry Logic
print(f"Fetching questions from: {questions_url}")
try:
questions_data = fetch_with_retry(questions_url, max_retries=MAX_RETRIES, timeout=15)
if questions_data is None:
error_msg = "Failed to fetch questions after multiple retries."
print(f"❌ {error_msg}")
return error_msg, None
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"✅ Successfully fetched {len(questions_data)} questions.")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
error_msg = (
f"Rate limit error (429): The scoring endpoint is receiving too many requests.\n\n"
f"**What to do:**\n"
f"1. Wait 5-10 minutes before trying again\n"
f"2. The endpoint may be experiencing high traffic from other students\n"
f"3. Try during off-peak hours (early morning or late evening UTC)\n\n"
f"Technical details: {e}"
)
else:
error_msg = f"HTTP error {e.response.status_code}: {e}"
print(f"❌ {error_msg}")
return error_msg, None
except requests.exceptions.Timeout:
error_msg = "Request timed out. The scoring endpoint may be slow or unavailable. Please try again later."
print(f"❌ {error_msg}")
return error_msg, None
except requests.exceptions.RequestException as e:
error_msg = f"Network error while fetching questions: {e}\n\nPlease check your internet connection and try again."
print(f"❌ {error_msg}")
return error_msg, None
except Exception as e:
error_msg = f"Unexpected error fetching questions: {e}"
print(f"❌ {error_msg}")
return error_msg, None
# 3. Run your Agent
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
time.sleep(30)
try:
submitted_answer = agent(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit with Retry Logic
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
# Try submission with retries
for attempt in range(MAX_RETRIES):
try:
print(f"Submission attempt {attempt + 1}/{MAX_RETRIES}")
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"✅ Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("✅ Submission successful!")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429 and attempt < MAX_RETRIES - 1:
# Rate limit on submission - retry with backoff
delay = min(INITIAL_RETRY_DELAY * (2 ** attempt), MAX_RETRY_DELAY)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
print(f"⚠️ Submission rate limited (429). Waiting {total_delay:.1f}s before retry...")
time.sleep(total_delay)
continue
else:
# Other HTTP error or final retry - return error
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
if e.response.status_code == 429:
error_detail += (
f"\n\n⚠️ Rate limit persists after {MAX_RETRIES} attempts. "
f"Please wait 10-15 minutes and try again."
)
status_message = f"❌ Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
if attempt < MAX_RETRIES - 1:
delay = INITIAL_RETRY_DELAY * (attempt + 1)
print(f"⚠️ Submission timeout. Retrying in {delay}s...")
time.sleep(delay)
continue
else:
status_message = "❌ Submission Failed: The request timed out after multiple attempts."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
if attempt < MAX_RETRIES - 1:
delay = INITIAL_RETRY_DELAY * (attempt + 1)
print(f"⚠️ Network error during submission: {e}. Retrying in {delay}s...")
time.sleep(delay)
continue
else:
status_message = f"❌ Submission Failed: Network error after {MAX_RETRIES} attempts - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"❌ An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# Should not reach here, but just in case
status_message = "❌ Submission failed after all retry attempts."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# Basic Agent Evaluation Runner")
gr.Markdown(
"""
**Instructions:**
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ...
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission.
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score.
---
**Disclaimers:**
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions).
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async.
"""
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
# Removed max_rows=10 from DataFrame constructor
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
# Check for SPACE_HOST and SPACE_ID at startup for information
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup: # Print repo URLs if SPACE_ID is found
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
print("-"*(60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for Basic Agent Evaluation...")
demo.launch(debug=True, share=False)