| | import os |
| | import re |
| | import gradio as gr |
| | import requests |
| | import pandas as pd |
| |
|
| | try: |
| | from dotenv import load_dotenv |
| | load_dotenv() |
| | except ImportError: |
| | pass |
| |
|
| | |
| | |
| | DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
| | HF_TOKEN = os.getenv("HF_TOKEN", "") |
| | REACT_MAX_STEPS = 10 |
| | |
| | LLM_MODEL = "google/gemma-2-2b-it" |
| |
|
| |
|
| | |
| | def tool_web_search(query: str, max_results: int = 5) -> str: |
| | """Search the web using DuckDuckGo. Input: search query string.""" |
| | try: |
| | from duckduckgo_search import DDGS |
| | results = list(DDGS().text(query, max_results=max_results)) |
| | if not results: |
| | return "No search results found." |
| | out = [] |
| | for i, r in enumerate(results, 1): |
| | out.append(f"{i}. {r.get('title', '')}\n URL: {r.get('href', '')}\n {r.get('body', '')}") |
| | return "\n\n".join(out) |
| | except Exception as e: |
| | return f"Web search error: {e}" |
| |
|
| |
|
| | def tool_web_page_view(url: str) -> str: |
| | """View the main text content of a web page. Input: full URL string.""" |
| | try: |
| | headers = {"User-Agent": "Mozilla/5.0 (compatible; ReActAgent/1.0)"} |
| | r = requests.get(url, timeout=15, headers=headers) |
| | r.raise_for_status() |
| | html = r.text |
| | try: |
| | from bs4 import BeautifulSoup |
| | soup = BeautifulSoup(html, "html.parser") |
| | for tag in soup(["script", "style", "nav", "footer", "header"]): |
| | tag.decompose() |
| | text = soup.get_text(separator="\n", strip=True) |
| | except ImportError: |
| | text = re.sub(r"<script[^>]*>.*?</script>", "", html, flags=re.DOTALL | re.IGNORECASE) |
| | text = re.sub(r"<style[^>]*>.*?</style>", "", text, flags=re.DOTALL | re.IGNORECASE) |
| | text = re.sub(r"<[^>]+>", " ", text) |
| | text = re.sub(r"\s+", " ", text).strip() |
| | return text[:8000] if len(text) > 8000 else text or "No text content found." |
| | except Exception as e: |
| | return f"Web page view error: {e}" |
| |
|
| |
|
| | def tool_code_agent(code: str) -> str: |
| | """Run Python code to compute an answer. Input: a single Python expression or block (e.g. print(2+2)). No file or network access.""" |
| | import builtins |
| | import io |
| | import sys |
| | safe_builtins = { |
| | "abs": builtins.abs, "all": builtins.all, "any": builtins.any, |
| | "bin": builtins.bin, "bool": builtins.bool, "chr": builtins.chr, |
| | "dict": builtins.dict, "divmod": builtins.divmod, "enumerate": builtins.enumerate, |
| | "filter": builtins.filter, "float": builtins.float, "format": builtins.format, |
| | "hash": builtins.hash, "int": builtins.int, "len": builtins.len, |
| | "list": builtins.list, "map": builtins.map, "max": builtins.max, |
| | "min": builtins.min, "next": builtins.next, "pow": builtins.pow, |
| | "print": builtins.print, "range": builtins.range, "repr": builtins.repr, |
| | "reversed": builtins.reversed, "round": builtins.round, "set": builtins.set, |
| | "sorted": builtins.sorted, "str": builtins.str, "sum": builtins.sum, |
| | "tuple": builtins.tuple, "zip": builtins.zip, |
| | } |
| | try: |
| | code = code.strip() |
| | if not code.startswith("print(") and "print(" not in code: |
| | code = f"print({code})" |
| | buf = io.StringIO() |
| | old_stdout = sys.stdout |
| | sys.stdout = buf |
| | try: |
| | exec(code, {"__builtins__": safe_builtins, "print": builtins.print}, {}) |
| | finally: |
| | sys.stdout = old_stdout |
| | return buf.getvalue().strip() or "Code ran (no printed output)." |
| | except Exception as e: |
| | return f"Code error: {e}" |
| |
|
| |
|
| | TOOLS = { |
| | "web_search": tool_web_search, |
| | "web_page_view": tool_web_page_view, |
| | "code_agent": tool_code_agent, |
| | } |
| |
|
| | TOOL_DESCRIPTIONS = """Available tools: |
| | - web_search: search the web with DuckDuckGo. Input: search query (string). |
| | - web_page_view: get main text from a web page. Input: URL (string). |
| | - code_agent: run Python code (math, string ops). Input: code (string).""" |
| |
|
| |
|
| | |
| | class ReActAgent: |
| | def __init__(self, token: str | None = None, model: str = LLM_MODEL, max_steps: int = REACT_MAX_STEPS): |
| | self.token = (token or HF_TOKEN or "").strip() |
| | self.model = model |
| | self.max_steps = max_steps |
| | print("ReActAgent initialized (plan -> act -> observe -> reflect).") |
| |
|
| | def _llm(self, messages: list[dict]) -> str: |
| | if not self.token: |
| | return "Error: HF_TOKEN not set. Add it in your Space: Settings → Variables and secrets → New secret (name: HF_TOKEN)." |
| | url = f"https://api-inference.huggingface.co/models/{self.model}" |
| | headers = {"Authorization": f"Bearer {self.token}", "Content-Type": "application/json"} |
| | payload = {"inputs": self._messages_to_prompt(messages), "parameters": {"max_new_tokens": 512, "return_full_text": False}} |
| | try: |
| | r = requests.post(url, json=payload, headers=headers, timeout=60) |
| | r.raise_for_status() |
| | data = r.json() |
| | if isinstance(data, list) and len(data) > 0: |
| | return (data[0].get("generated_text") or "").strip() |
| | if isinstance(data, dict) and "generated_text" in data: |
| | return (data["generated_text"] or "").strip() |
| | return "" |
| | except Exception as e: |
| | return f"LLM error: {e}" |
| |
|
| | def _messages_to_prompt(self, messages: list[dict]) -> str: |
| | out = [] |
| | for m in messages: |
| | role = m.get("role", "user") |
| | content = m.get("content", "") |
| | if role == "system": |
| | out.append(f"System: {content}") |
| | elif role == "user": |
| | out.append(f"User: {content}") |
| | else: |
| | out.append(f"Assistant: {content}") |
| | out.append("Assistant:") |
| | return "\n\n".join(out) |
| |
|
| | def _parse_action(self, text: str) -> tuple[str | None, str | None, str | None]: |
| | """Returns (thought, action, action_input) or (None, None, final_answer).""" |
| | text = text.strip() |
| | final_match = re.search(r"Final Answer\s*:\s*(.+?)(?=\n\n|\Z)", text, re.DOTALL | re.IGNORECASE) |
| | if final_match: |
| | return None, None, final_match.group(1).strip() |
| | action_match = re.search(r"Action\s*:\s*(\w+)", text, re.IGNORECASE) |
| | input_match = re.search(r"Action Input\s*:\s*(.+?)(?=\n\n|\nThought:|\Z)", text, re.DOTALL | re.IGNORECASE) |
| | thought = None |
| | thought_match = re.search(r"Thought\s*:\s*(.+?)(?=\nAction:|\Z)", text, re.DOTALL | re.IGNORECASE) |
| | if thought_match: |
| | thought = thought_match.group(1).strip() |
| | action = action_match.group(1).strip() if action_match else None |
| | action_input = input_match.group(1).strip() if input_match else None |
| | if action_input: |
| | action_input = action_input.strip().strip('"\'') |
| | return thought, action, action_input |
| |
|
| | def __call__(self, question: str) -> str: |
| | print(f"ReAct agent received question (first 50 chars): {question[:50]}...") |
| | if not self.token: |
| | return "HF_TOKEN not set. In your Hugging Face Space go to Settings → Variables and secrets, add a secret named HF_TOKEN with your token." |
| | system = ( |
| | "You are a ReAct agent. For each turn you must either:\n" |
| | "1. Output: Thought: <reasoning> then Action: <tool_name> then Action Input: <input>\n" |
| | "2. Or when you have the answer: Final Answer: <your answer>\n\n" |
| | + TOOL_DESCRIPTIONS |
| | ) |
| | messages = [ |
| | {"role": "system", "content": system}, |
| | {"role": "user", "content": f"Question: {question}\n\nFirst, plan which tool(s) to use, then take action, then observe, then reflect. Give your final answer when done."}, |
| | ] |
| | for step in range(self.max_steps): |
| | response = self._llm(messages) |
| | thought, action, action_input = self._parse_action(response) |
| | if thought is None and action is None and action_input is not None: |
| | return action_input |
| | if not action or action not in TOOLS: |
| | messages.append({"role": "assistant", "content": response}) |
| | messages.append({"role": "user", "content": "You must use one of the tools (Action: tool_name, Action Input: input) or give Final Answer: your answer. Try again."}) |
| | continue |
| | try: |
| | observation = TOOLS[action](action_input) |
| | except Exception as e: |
| | observation = f"Tool error: {e}" |
| | observation = (observation[:3000] + "...") if len(observation) > 3000 else observation |
| | messages.append({"role": "assistant", "content": response}) |
| | messages.append({"role": "user", "content": f"Observation: {observation}\n\nReflect: does this answer the question? If yes, reply with Final Answer: <answer>. If not, use another tool (Thought / Action / Action Input)."}) |
| | last_assistant = next((m["content"] for m in reversed(messages) if m.get("role") == "assistant"), "") |
| | final = self._parse_action(last_assistant) |
| | if final[2] and final[0] is None and final[1] is None: |
| | return final[2] |
| | return last_assistant[:500] if last_assistant else "ReAct agent reached max steps without a final answer." |
| |
|
| | def run_and_submit_all( profile: gr.OAuthProfile | None): |
| | """ |
| | Fetches all questions, runs the BasicAgent on them, submits all answers, |
| | and displays the results. |
| | """ |
| | |
| | space_id = os.getenv("SPACE_ID") |
| |
|
| | if profile: |
| | username= f"{profile.username}" |
| | print(f"User logged in: {username}") |
| | else: |
| | print("User not logged in.") |
| | return "Please Login to Hugging Face with the button.", None |
| |
|
| | api_url = DEFAULT_API_URL |
| | questions_url = f"{api_url}/questions" |
| | submit_url = f"{api_url}/submit" |
| |
|
| | |
| | try: |
| | agent = ReActAgent(token=os.getenv("HF_TOKEN"), max_steps=REACT_MAX_STEPS) |
| | except Exception as e: |
| | print(f"Error instantiating agent: {e}") |
| | return f"Error initializing agent: {e}", None |
| | |
| | agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
| | print(agent_code) |
| |
|
| | |
| | print(f"Fetching questions from: {questions_url}") |
| | try: |
| | response = requests.get(questions_url, timeout=15) |
| | response.raise_for_status() |
| | questions_data = response.json() |
| | if not questions_data: |
| | print("Fetched questions list is empty.") |
| | return "Fetched questions list is empty or invalid format.", None |
| | print(f"Fetched {len(questions_data)} questions.") |
| | except requests.exceptions.RequestException as e: |
| | print(f"Error fetching questions: {e}") |
| | return f"Error fetching questions: {e}", None |
| | except requests.exceptions.JSONDecodeError as e: |
| | print(f"Error decoding JSON response from questions endpoint: {e}") |
| | print(f"Response text: {response.text[:500]}") |
| | return f"Error decoding server response for questions: {e}", None |
| | except Exception as e: |
| | print(f"An unexpected error occurred fetching questions: {e}") |
| | return f"An unexpected error occurred fetching questions: {e}", None |
| |
|
| | |
| | results_log = [] |
| | answers_payload = [] |
| | print(f"Running agent on {len(questions_data)} questions...") |
| | for item in questions_data: |
| | task_id = item.get("task_id") |
| | question_text = item.get("question") |
| | if not task_id or question_text is None: |
| | print(f"Skipping item with missing task_id or question: {item}") |
| | continue |
| | try: |
| | submitted_answer = agent(question_text) |
| | answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
| | results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
| | except Exception as e: |
| | print(f"Error running agent on task {task_id}: {e}") |
| | results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
| |
|
| | if not answers_payload: |
| | print("Agent did not produce any answers to submit.") |
| | return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
| |
|
| | |
| | submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
| | status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
| | print(status_update) |
| |
|
| | |
| | print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
| | try: |
| | response = requests.post(submit_url, json=submission_data, timeout=60) |
| | response.raise_for_status() |
| | result_data = response.json() |
| | final_status = ( |
| | f"Submission Successful!\n" |
| | f"User: {result_data.get('username')}\n" |
| | f"Overall Score: {result_data.get('score', 'N/A')}% " |
| | f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
| | f"Message: {result_data.get('message', 'No message received.')}" |
| | ) |
| | print("Submission successful.") |
| | results_df = pd.DataFrame(results_log) |
| | return final_status, results_df |
| | except requests.exceptions.HTTPError as e: |
| | error_detail = f"Server responded with status {e.response.status_code}." |
| | try: |
| | error_json = e.response.json() |
| | error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
| | except requests.exceptions.JSONDecodeError: |
| | error_detail += f" Response: {e.response.text[:500]}" |
| | status_message = f"Submission Failed: {error_detail}" |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| | except requests.exceptions.Timeout: |
| | status_message = "Submission Failed: The request timed out." |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| | except requests.exceptions.RequestException as e: |
| | status_message = f"Submission Failed: Network error - {e}" |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| | except Exception as e: |
| | status_message = f"An unexpected error occurred during submission: {e}" |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| |
|
| |
|
| | |
| | with gr.Blocks() as demo: |
| | gr.Markdown("# ReAct Agent Evaluation Runner") |
| | gr.Markdown( |
| | """ |
| | **Multi-step ReAct agent:** Plan → Act (tools) → Observe → Reflect. The agent has access to: |
| | **DuckDuckGo search**, **web page view**, and **code agent** (safe Python). |
| | **Token:** In this Space go to **Settings → Variables and secrets** (Secrets), add a secret named **HF_TOKEN** with your Hugging Face token so the LLM can run. |
| | 1. Log in with the button below. 2. Click 'Run Evaluation & Submit All Answers'. Submission can take a while while the agent runs on all questions. |
| | """ |
| | ) |
| |
|
| | gr.LoginButton() |
| |
|
| | run_button = gr.Button("Run Evaluation & Submit All Answers") |
| |
|
| | status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
| | |
| | results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
| |
|
| | run_button.click( |
| | fn=run_and_submit_all, |
| | outputs=[status_output, results_table] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | print("\n" + "-"*30 + " App Starting " + "-"*30) |
| | |
| | space_host_startup = os.getenv("SPACE_HOST") |
| | space_id_startup = os.getenv("SPACE_ID") |
| |
|
| | if space_host_startup: |
| | print(f"✅ SPACE_HOST found: {space_host_startup}") |
| | print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
| | else: |
| | print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
| |
|
| | if space_id_startup: |
| | print(f"✅ SPACE_ID found: {space_id_startup}") |
| | print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
| | print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
| | else: |
| | print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
| |
|
| | print("-"*(60 + len(" App Starting ")) + "\n") |
| |
|
| | print("Launching Gradio Interface for Basic Agent Evaluation...") |
| | demo.launch(debug=True, share=False) |