MasterOfHugs's picture
Update app.py
d717247 verified
#!/usr/bin/env python3
import os
import ast
import operator
import datetime
import pytz
import json
import re
import requests
import pandas as pd
import gradio as gr
from smolagents import CodeAgent, TransformersModel, tool
# -------------------------
# Minimal tools
# -------------------------
_allowed_ops = {
ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.Pow: operator.pow, ast.USub: operator.neg,
ast.Mod: operator.mod,
}
def _eval_node(node):
if isinstance(node, ast.Constant):
return node.value
if isinstance(node, ast.Num):
return node.n
if isinstance(node, ast.UnaryOp) and type(node.op) in _allowed_ops:
return _allowed_ops[type(node.op)](_eval_node(node.operand))
if isinstance(node, ast.BinOp) and type(node.op) in _allowed_ops:
return _allowed_ops[type(node.op)](_eval_node(node.left), _eval_node(node.right))
raise ValueError("Unsupported expression")
def safe_calc(expr: str):
tree = ast.parse(expr, mode='eval')
return _eval_node(tree.body)
@tool
def calculator(expr: str) -> str:
"""
Safely evaluate a mathematical expression.
Args:
expr: A string containing a math expression like "2 + 2 * 3".
Returns:
JSON string with {"expression": expr, "result": value} or {"error": "..."} on failure.
"""
try:
val = safe_calc(expr)
return json.dumps({"expression": expr, "result": float(val)})
except Exception as e:
return json.dumps({"error": f"Calc error: {e}"})
@tool
def get_current_time_in_timezone(timezone: str) -> str:
"""
Get the current local time in a specified timezone.
Args:
timezone: A valid timezone string (e.g., "Europe/Paris").
Returns:
JSON string with {"timezone": timezone, "local_time": "..."} or {"error": "..."} on failure.
"""
try:
tz = pytz.timezone(timezone)
local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
return json.dumps({"timezone": timezone, "local_time": local_time})
except Exception as e:
return json.dumps({"error": f"Timezone error: {e}"})
# -------------------------
# Load prompts.yaml if exists
# -------------------------
prompt_templates = None
try:
import yaml
with open("prompts.yaml", "r") as fh:
prompt_templates = yaml.safe_load(fh)
except Exception:
prompt_templates = None
# -------------------------
# TransformersModel + CodeAgent minimal
# -------------------------
model = TransformersModel(model_id="HuggingFaceTB/SmolLM-135M-Instruct")
code_agent = CodeAgent(
model=model,
tools=[calculator, get_current_time_in_timezone],
max_steps=6,
verbosity_level=0,
prompt_templates=prompt_templates
)
# -------------------------
# GAIA Agent wrapper
# -------------------------
class GaiaAgentMinimal:
def __init__(self, code_agent):
self.code_agent = code_agent
def _is_calc(self, q: str) -> bool:
"""Return True only for explicit calculation requests, not 'how many' type questions."""
if not q:
return False
ql = q.lower()
calc_triggers = ["calculate", "compute", "evaluate", "what is", "what's"]
if any(tr in ql for tr in calc_triggers) and re.search(r"\d", ql):
return True
if re.search(r"\d\s*[\+\-\*\/\%\^]\s*\d", q):
return True
return False
def _is_time(self, q: str) -> bool:
ql = q.lower()
return "time" in ql or "heure" in ql or "quelle heure" in ql or "what time" in ql
def _call_llm(self, q: str) -> str:
"""Wrapper to call LLM and return result or proper error if token/API missing."""
try:
resp = self.code_agent.run(q)
except Exception as e:
msg = str(e)
if "api_key" in msg.lower() or "auth" in msg.lower():
return json.dumps({"error": "LLM error: missing HF API token. Set HF_API_TOKEN secret or login with HF."})
return json.dumps({"error": f"LLM runtime error: {msg}"})
if resp is None:
return json.dumps({"error": "LLM returned no output"})
if isinstance(resp, dict):
for key in ("final_answer", "answer", "result", "output"):
if key in resp:
return str(resp[key])
return json.dumps(resp)
if isinstance(resp, (int, float)):
return str(resp)
s = str(resp).strip()
if s == "":
return json.dumps({"error": "LLM returned empty string"})
return s
def run(self, question: str) -> str:
try:
q = question.strip() if question else ""
# 1) Calculator
if self._is_calc(q):
m = re.search(r'([0-9\.\s\+\-\*\/\^\%\(\)]+)', q)
expr = m.group(1).strip() if m else ""
if not expr or not re.search(r'[\+\-\*\/\%\^]', expr):
return self._call_llm(q)
expr = expr.replace('^', '**').replace('\n', ' ').strip()
if not re.fullmatch(r"[0-9\.\s\+\-\*\/\%\(\)\*]+", expr):
return json.dumps({"error": "Expression contains invalid characters or is not a simple math expression", "original": expr})
return calculator(expr)
# 2) Time
if self._is_time(q):
tz = "Europe/Paris" if "paris" in q.lower() or "france" in q.lower() else "UTC"
return get_current_time_in_timezone(tz)
# 3) fallback LLM
return self._call_llm(q)
except Exception as e:
return json.dumps({"error": f"Agent internal error: {str(e)}"})
# instantiate GAIA agent
gaia_agent = GaiaAgentMinimal(code_agent)
# -------------------------
# GAIA runner
# -------------------------
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
def run_and_submit_all(profile: gr.OAuthProfile | None):
space_id = os.getenv("SPACE_ID")
if profile:
username = f"{profile.username}"
else:
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "unknown"
# Fetch questions
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
return "Fetched questions list is empty or invalid format.", None
except Exception as e:
return f"Error fetching questions: {e}", None
# Run agent
results_log = []
answers_payload = []
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
continue
try:
submitted_answer = gaia_agent.run(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
except Exception as e:
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
# Submit
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
results_df = pd.DataFrame(results_log)
return final_status, results_df
except Exception as e:
results_df = pd.DataFrame(results_log)
return f"Submission failed: {e}", results_df
# -------------------------
# Gradio UI
# -------------------------
with gr.Blocks() as demo:
gr.Markdown("# Minimal GAIA Agent Runner")
gr.Markdown(
"Log in to Hugging Face, click 'Run Evaluation & Submit All Answers' to fetch questions, run the agent, and submit answers."
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table])
if __name__ == "__main__":
demo.launch(debug=True, share=False)