|
|
|
|
|
|
|
|
import os |
|
|
import ast |
|
|
import operator |
|
|
import datetime |
|
|
import pytz |
|
|
import json |
|
|
import re |
|
|
import requests |
|
|
import pandas as pd |
|
|
import gradio as gr |
|
|
|
|
|
from smolagents import CodeAgent, TransformersModel, tool |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_allowed_ops = { |
|
|
ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul, |
|
|
ast.Div: operator.truediv, ast.Pow: operator.pow, ast.USub: operator.neg, |
|
|
ast.Mod: operator.mod, |
|
|
} |
|
|
|
|
|
def _eval_node(node): |
|
|
if isinstance(node, ast.Constant): |
|
|
return node.value |
|
|
if isinstance(node, ast.Num): |
|
|
return node.n |
|
|
if isinstance(node, ast.UnaryOp) and type(node.op) in _allowed_ops: |
|
|
return _allowed_ops[type(node.op)](_eval_node(node.operand)) |
|
|
if isinstance(node, ast.BinOp) and type(node.op) in _allowed_ops: |
|
|
return _allowed_ops[type(node.op)](_eval_node(node.left), _eval_node(node.right)) |
|
|
raise ValueError("Unsupported expression") |
|
|
|
|
|
def safe_calc(expr: str): |
|
|
tree = ast.parse(expr, mode='eval') |
|
|
return _eval_node(tree.body) |
|
|
|
|
|
@tool |
|
|
def calculator(expr: str) -> str: |
|
|
""" |
|
|
Safely evaluate a mathematical expression. |
|
|
|
|
|
Args: |
|
|
expr: A string containing a math expression like "2 + 2 * 3". |
|
|
|
|
|
Returns: |
|
|
JSON string with {"expression": expr, "result": value} or {"error": "..."} on failure. |
|
|
""" |
|
|
try: |
|
|
val = safe_calc(expr) |
|
|
return json.dumps({"expression": expr, "result": float(val)}) |
|
|
except Exception as e: |
|
|
return json.dumps({"error": f"Calc error: {e}"}) |
|
|
|
|
|
|
|
|
@tool |
|
|
def get_current_time_in_timezone(timezone: str) -> str: |
|
|
""" |
|
|
Get the current local time in a specified timezone. |
|
|
|
|
|
Args: |
|
|
timezone: A valid timezone string (e.g., "Europe/Paris"). |
|
|
|
|
|
Returns: |
|
|
JSON string with {"timezone": timezone, "local_time": "..."} or {"error": "..."} on failure. |
|
|
""" |
|
|
try: |
|
|
tz = pytz.timezone(timezone) |
|
|
local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") |
|
|
return json.dumps({"timezone": timezone, "local_time": local_time}) |
|
|
except Exception as e: |
|
|
return json.dumps({"error": f"Timezone error: {e}"}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
prompt_templates = None |
|
|
try: |
|
|
import yaml |
|
|
with open("prompts.yaml", "r") as fh: |
|
|
prompt_templates = yaml.safe_load(fh) |
|
|
except Exception: |
|
|
prompt_templates = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
model = TransformersModel(model_id="HuggingFaceTB/SmolLM-135M-Instruct") |
|
|
|
|
|
code_agent = CodeAgent( |
|
|
model=model, |
|
|
tools=[calculator, get_current_time_in_timezone], |
|
|
max_steps=6, |
|
|
verbosity_level=0, |
|
|
prompt_templates=prompt_templates |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GaiaAgentMinimal: |
|
|
def __init__(self, code_agent): |
|
|
self.code_agent = code_agent |
|
|
|
|
|
def _is_calc(self, q: str) -> bool: |
|
|
"""Return True only for explicit calculation requests, not 'how many' type questions.""" |
|
|
if not q: |
|
|
return False |
|
|
ql = q.lower() |
|
|
calc_triggers = ["calculate", "compute", "evaluate", "what is", "what's"] |
|
|
if any(tr in ql for tr in calc_triggers) and re.search(r"\d", ql): |
|
|
return True |
|
|
if re.search(r"\d\s*[\+\-\*\/\%\^]\s*\d", q): |
|
|
return True |
|
|
return False |
|
|
|
|
|
def _is_time(self, q: str) -> bool: |
|
|
ql = q.lower() |
|
|
return "time" in ql or "heure" in ql or "quelle heure" in ql or "what time" in ql |
|
|
|
|
|
def _call_llm(self, q: str) -> str: |
|
|
"""Wrapper to call LLM and return result or proper error if token/API missing.""" |
|
|
try: |
|
|
resp = self.code_agent.run(q) |
|
|
except Exception as e: |
|
|
msg = str(e) |
|
|
if "api_key" in msg.lower() or "auth" in msg.lower(): |
|
|
return json.dumps({"error": "LLM error: missing HF API token. Set HF_API_TOKEN secret or login with HF."}) |
|
|
return json.dumps({"error": f"LLM runtime error: {msg}"}) |
|
|
if resp is None: |
|
|
return json.dumps({"error": "LLM returned no output"}) |
|
|
if isinstance(resp, dict): |
|
|
for key in ("final_answer", "answer", "result", "output"): |
|
|
if key in resp: |
|
|
return str(resp[key]) |
|
|
return json.dumps(resp) |
|
|
if isinstance(resp, (int, float)): |
|
|
return str(resp) |
|
|
s = str(resp).strip() |
|
|
if s == "": |
|
|
return json.dumps({"error": "LLM returned empty string"}) |
|
|
return s |
|
|
|
|
|
def run(self, question: str) -> str: |
|
|
try: |
|
|
q = question.strip() if question else "" |
|
|
|
|
|
|
|
|
if self._is_calc(q): |
|
|
m = re.search(r'([0-9\.\s\+\-\*\/\^\%\(\)]+)', q) |
|
|
expr = m.group(1).strip() if m else "" |
|
|
if not expr or not re.search(r'[\+\-\*\/\%\^]', expr): |
|
|
return self._call_llm(q) |
|
|
expr = expr.replace('^', '**').replace('\n', ' ').strip() |
|
|
if not re.fullmatch(r"[0-9\.\s\+\-\*\/\%\(\)\*]+", expr): |
|
|
return json.dumps({"error": "Expression contains invalid characters or is not a simple math expression", "original": expr}) |
|
|
return calculator(expr) |
|
|
|
|
|
|
|
|
if self._is_time(q): |
|
|
tz = "Europe/Paris" if "paris" in q.lower() or "france" in q.lower() else "UTC" |
|
|
return get_current_time_in_timezone(tz) |
|
|
|
|
|
|
|
|
return self._call_llm(q) |
|
|
|
|
|
except Exception as e: |
|
|
return json.dumps({"error": f"Agent internal error: {str(e)}"}) |
|
|
|
|
|
|
|
|
|
|
|
gaia_agent = GaiaAgentMinimal(code_agent) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
def run_and_submit_all(profile: gr.OAuthProfile | None): |
|
|
space_id = os.getenv("SPACE_ID") |
|
|
|
|
|
if profile: |
|
|
username = f"{profile.username}" |
|
|
else: |
|
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
|
|
api_url = DEFAULT_API_URL |
|
|
questions_url = f"{api_url}/questions" |
|
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "unknown" |
|
|
|
|
|
|
|
|
try: |
|
|
response = requests.get(questions_url, timeout=15) |
|
|
response.raise_for_status() |
|
|
questions_data = response.json() |
|
|
if not questions_data: |
|
|
return "Fetched questions list is empty or invalid format.", None |
|
|
except Exception as e: |
|
|
return f"Error fetching questions: {e}", None |
|
|
|
|
|
|
|
|
results_log = [] |
|
|
answers_payload = [] |
|
|
for item in questions_data: |
|
|
task_id = item.get("task_id") |
|
|
question_text = item.get("question") |
|
|
if not task_id or question_text is None: |
|
|
continue |
|
|
try: |
|
|
submitted_answer = gaia_agent.run(question_text) |
|
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
|
|
except Exception as e: |
|
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
|
|
if not answers_payload: |
|
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
|
|
|
|
|
|
|
try: |
|
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
|
response.raise_for_status() |
|
|
result_data = response.json() |
|
|
final_status = ( |
|
|
f"Submission Successful!\n" |
|
|
f"User: {result_data.get('username')}\n" |
|
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
|
) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return final_status, results_df |
|
|
except Exception as e: |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return f"Submission failed: {e}", results_df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# Minimal GAIA Agent Runner") |
|
|
gr.Markdown( |
|
|
"Log in to Hugging Face, click 'Run Evaluation & Submit All Answers' to fetch questions, run the agent, and submit answers." |
|
|
) |
|
|
gr.LoginButton() |
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
|
|
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(debug=True, share=False) |
|
|
|