Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import json | |
| import re | |
| import traceback | |
| import contextlib | |
| import uuid | |
| import time | |
| import ast | |
| from typing import List, Optional, TypedDict, Annotated, Dict | |
| from pathlib import Path | |
| from collections import Counter | |
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| from pydantic import BaseModel, Field | |
| # Multimodal & Web Tools | |
| from transformers import pipeline | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from bs4 import BeautifulSoup | |
| import requests | |
| # LangChain & LangGraph | |
| from langgraph.graph.message import add_messages | |
| from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, SystemMessage, AnyMessage, ToolCall | |
| from langchain_core.tools import tool | |
| from langgraph.prebuilt import ToolNode | |
| from langgraph.graph import START, END, StateGraph | |
| from langchain_groq import ChatGroq | |
| # RAG | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.tools import DuckDuckGoSearchRun | |
| from langchain_core.documents import Document | |
| # ============================================================================= | |
| # CONFIGURATION | |
| # ============================================================================= | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| MAX_TURNS = 25 # Increased for planning/reflection | |
| MAX_MESSAGE_LENGTH = 8000 | |
| REFLECT_EVERY_N_TURNS = 5 | |
| # ============================================================================= | |
| # GLOBAL RAG COMPONENTS | |
| # ============================================================================= | |
| global_embeddings = None | |
| global_text_splitter = None | |
| def initialize_rag_components(): | |
| """Initialize RAG components globally.""" | |
| global global_embeddings, global_text_splitter | |
| if global_embeddings is None: | |
| print("Initializing RAG embeddings...") | |
| try: | |
| global_embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| model_kwargs={'device': 'cpu'} | |
| ) | |
| print("✅ Embeddings initialized.") | |
| except Exception as e: | |
| print(f"⚠️ Failed to initialize embeddings: {e}") | |
| return False | |
| if global_text_splitter is None: | |
| print("Initializing text splitter...") | |
| global_text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| length_function=len, | |
| separators=["\n\n", "\n", ". ", " ", ""] | |
| ) | |
| print("✅ Text splitter initialized.") | |
| return True | |
| # ============================================================================= | |
| # ASR INITIALIZATION | |
| # ============================================================================= | |
| asr_pipeline = None | |
| try: | |
| print("Loading ASR (Whisper) pipeline globally...") | |
| device = 0 if torch.cuda.is_available() else -1 | |
| device_name = "cuda:0" if device == 0 else "cpu" | |
| print(f"Attempting to use device: {device_name} for ASR.") | |
| asr_pipeline = pipeline( | |
| "automatic-speech-recognition", | |
| model="openai/whisper-base", | |
| torch_dtype=torch.float16 if device == 0 else torch.float32, | |
| device=device | |
| ) | |
| print("✅ ASR (Whisper) pipeline loaded successfully.") | |
| except Exception as e: | |
| print(f"⚠️ Warning: Could not load ASR pipeline globally. Error: {e}") | |
| asr_pipeline = None | |
| # ============================================================================= | |
| # UTILITY FUNCTIONS | |
| # ============================================================================= | |
| def remove_fences_simple(text): | |
| """Remove code fences from text.""" | |
| original_text = text | |
| text = text.strip() | |
| if text.startswith("```") and text.endswith("```"): | |
| text = text[3:-3].strip() | |
| if '\n' in text: | |
| first_line, rest = text.split('\n', 1) | |
| if first_line.strip().replace('_','').isalnum() and len(first_line.strip()) < 15: | |
| text = rest.strip() | |
| return text | |
| return original_text | |
| def truncate_if_needed(content: str, max_length: int = MAX_MESSAGE_LENGTH) -> str: | |
| """Truncate content if it exceeds max length.""" | |
| if len(content) > max_length: | |
| return content[:max_length] + f"\n...[truncated, {len(content)} total chars]" | |
| return content | |
| def find_file(path: str) -> Optional[Path]: | |
| """Find a file by trying multiple path variations.""" | |
| script_dir = Path.cwd() | |
| safe_path = Path(path).as_posix() | |
| paths_to_try = [ | |
| script_dir / safe_path, | |
| Path(safe_path), | |
| script_dir / Path(path).name | |
| ] | |
| for attempt_path in paths_to_try: | |
| if attempt_path.exists(): | |
| return attempt_path | |
| return None | |
| # ============================================================================= | |
| # PLANNING & REFLECTION TOOLS | |
| # ============================================================================= | |
| class PlanInput(BaseModel): | |
| question: str = Field(description="The question to create a plan for") | |
| def create_plan(question: str) -> str: | |
| """ | |
| Creates a step-by-step plan for answering a question. | |
| CRITICAL: Call this FIRST for any multi-step or complex question. | |
| This helps you think through: | |
| 1. What information do you need? | |
| 2. In what order should you gather it? | |
| 3. What tools will you use? | |
| After calling this, execute the plan step-by-step. | |
| """ | |
| print(f"📋 Planning phase initiated for: {question[:100]}...") | |
| return f"""✅ Plan Created. Now execute these steps methodically: | |
| PLANNING FRAMEWORK: | |
| 1. GOAL: What exact answer format is needed? | |
| 2. REQUIREMENTS: What data/information is required? | |
| 3. STRATEGY: What's the most efficient path? | |
| 4. EXECUTION: List concrete actions in order | |
| Now proceed with Step 1 of your plan.""" | |
| class ReflectInput(BaseModel): | |
| current_situation: str = Field(description="Brief summary of what you've tried and where you are stuck") | |
| def reflect_on_progress(current_situation: str) -> str: | |
| """ | |
| Reflects on your progress and suggests what to do next. | |
| Call this when: | |
| - You feel stuck or uncertain | |
| - Tools keep failing | |
| - You're not making progress | |
| - You've taken 5+ steps without getting closer to the answer | |
| This helps you step back and reconsider your approach. | |
| """ | |
| print(f"🤔 Reflection initiated: {current_situation[:100]}...") | |
| return f"""🔍 REFLECTION ANALYSIS: | |
| Current situation: {current_situation} | |
| CRITICAL QUESTIONS TO ASK YOURSELF: | |
| 1. Have I gathered the information I actually need? | |
| 2. Am I using the right tools for this task? | |
| 3. Am I going in circles (repeating similar actions)? | |
| 4. Should I try a completely different approach? | |
| 5. Do I have enough information to answer now? | |
| NEXT STEPS: | |
| - If stuck: Try a different tool or search query | |
| - If missing info: Identify exactly what's missing | |
| - If have info: Proceed to final_answer_tool | |
| - If uncertain: Break problem into smaller pieces | |
| Take a different approach now.""" | |
| class ValidateInput(BaseModel): | |
| proposed_answer: str = Field(description="The answer you plan to submit") | |
| original_question: str = Field(description="The original question") | |
| def validate_answer(proposed_answer: str, original_question: str) -> str: | |
| """ | |
| Validates your proposed answer before submission. | |
| CRITICAL: ALWAYS call this before final_answer_tool. | |
| Checks: | |
| - Does the answer match what was asked? | |
| - Is it in the correct format? | |
| - Are there any obvious issues? | |
| If validation passes, then call final_answer_tool. | |
| If validation fails, gather more information or correct the format. | |
| """ | |
| print(f"✓ Validating answer: '{proposed_answer[:50]}...'") | |
| issues = [] | |
| warnings = [] | |
| # Check for conversational fluff | |
| fluff_phrases = ["the answer is", "based on", "according to", "i found that", "here is", "final answer"] | |
| if any(phrase in proposed_answer.lower() for phrase in fluff_phrases): | |
| issues.append("❌ Remove conversational text. Provide ONLY the answer.") | |
| # Check for number format if question asks for numbers | |
| number_keywords = ["how many", "what number", "count", "total", "sum"] | |
| if any(kw in original_question.lower() for kw in number_keywords): | |
| if not any(char.isdigit() for char in proposed_answer): | |
| warnings.append("⚠️ Question seems to ask for a number, but answer contains no digits.") | |
| # Check for list format | |
| if "list" in original_question.lower() and "," not in proposed_answer: | |
| warnings.append("⚠️ Question asks for a list, consider comma-separated format.") | |
| # Check for yes/no questions | |
| if original_question.lower().strip().startswith(("is ", "are ", "was ", "were ", "do ", "does ", "did ", "can ", "will ")): | |
| if proposed_answer.lower() not in ["yes", "no", "true", "false"]: | |
| warnings.append("⚠️ This looks like a yes/no question. Consider simple yes/no answer.") | |
| # Check for code fences or markdown | |
| if "```" in proposed_answer: | |
| issues.append("❌ Remove code fences (```) from the answer.") | |
| # Check length | |
| if len(proposed_answer) > 500: | |
| warnings.append("⚠️ Answer is quite long. Are you sure this is just the answer and not an explanation?") | |
| if issues: | |
| return "🚫 VALIDATION FAILED:\n" + "\n".join(issues) + "\n\nFix these issues before calling final_answer_tool." | |
| if warnings: | |
| return "⚠️ VALIDATION WARNINGS:\n" + "\n".join(warnings) + "\n\nConsider these points, but you may proceed if confident." | |
| return "✅ VALIDATION PASSED: Answer looks good! Proceed with final_answer_tool now." | |
| # ============================================================================= | |
| # CORE TOOLS | |
| # ============================================================================= | |
| class SearchInput(BaseModel): | |
| query: str = Field(description="The search query.") | |
| def search_tool(query: str) -> str: | |
| """ | |
| Searches the web using DuckDuckGo. | |
| Use for: recent information, facts, general web searches. | |
| Tips: | |
| - Keep queries concise and specific | |
| - Include year for time-sensitive queries (e.g., "GDP Brazil 2016") | |
| - Try different phrasings if first search doesn't help | |
| """ | |
| if not isinstance(query, str) or not query.strip(): | |
| return "Error: Invalid input. 'query' must be a non-empty string." | |
| print(f"🔍 Searching: {query}") | |
| try: | |
| search = DuckDuckGoSearchRun() | |
| result = search.run(query) | |
| if len(result) > MAX_MESSAGE_LENGTH: | |
| result = result[:MAX_MESSAGE_LENGTH] + f"\n...[truncated, {len(result)} total chars]" | |
| return result | |
| except Exception as e: | |
| return f"Error running search for '{query}': {str(e)}" | |
| class CalcInput(BaseModel): | |
| expression: str = Field(description="Mathematical expression to evaluate (e.g., '2 + 2', 'sqrt(16)', '45 * 1.2')") | |
| def calculator(expression: str) -> str: | |
| """ | |
| Evaluates mathematical expressions. | |
| Use this for ANY calculations instead of code_interpreter. | |
| Supports: +, -, *, /, **, sqrt, sin, cos, tan, log, exp, pi, e, abs, round | |
| Examples: | |
| - calculator("127 * 83") | |
| - calculator("sqrt(144)") | |
| - calculator("(45 + 23) / 2") | |
| """ | |
| if not isinstance(expression, str) or not expression.strip(): | |
| return "Error: Invalid expression." | |
| print(f"🧮 Calculating: {expression}") | |
| try: | |
| # Create safe namespace with math functions | |
| import math | |
| safe_dict = { | |
| 'sqrt': math.sqrt, 'sin': math.sin, 'cos': math.cos, 'tan': math.tan, | |
| 'log': math.log, 'log10': math.log10, 'exp': math.exp, | |
| 'pi': math.pi, 'e': math.e, 'abs': abs, 'round': round, | |
| 'pow': pow, 'sum': sum, 'min': min, 'max': max | |
| } | |
| result = eval(expression, {"__builtins__": {}}, safe_dict) | |
| return f"{result}" | |
| except Exception as e: | |
| return f"Error evaluating '{expression}': {str(e)}\nMake sure to use proper syntax (e.g., sqrt(16), not sqrt 16)" | |
| class CodeInput(BaseModel): | |
| code: str = Field(description="Python code to execute. MUST include print() for output.") | |
| def code_interpreter(code: str) -> str: | |
| """ | |
| Executes Python code for complex data processing. | |
| WHEN TO USE: | |
| - Data analysis (CSV, Excel files) | |
| - Complex calculations with loops/conditionals | |
| - String manipulation | |
| - Date/time calculations | |
| WHEN NOT TO USE: | |
| - Simple math (use calculator instead) | |
| - Web searches (use search_tool) | |
| Available libraries: pandas as pd, numpy as np, json, re, datetime | |
| CRITICAL: Always use print() to output results! | |
| """ | |
| if not isinstance(code, str): | |
| return "Error: Invalid input. 'code' must be a string." | |
| # Safety checks | |
| dangerous_patterns = ['__import__', 'eval(', 'compile(', 'subprocess', 'os.system', 'exec('] | |
| code_lower = code.lower() | |
| for pattern in dangerous_patterns: | |
| if pattern in code_lower: | |
| return f"Error: Potentially dangerous operation '{pattern}' is not allowed." | |
| if 'open(' in code_lower and any(mode in code for mode in ["'w'", '"w"', "'a'", '"a"', "'wb'", '"wb"']): | |
| return "Error: Writing files is not allowed in code_interpreter. Use write_file tool instead." | |
| print(f"💻 Executing code...") | |
| output_stream = io.StringIO() | |
| error_stream = io.StringIO() | |
| try: | |
| with contextlib.redirect_stdout(output_stream), contextlib.redirect_stderr(error_stream): | |
| safe_globals = { | |
| "pd": pd, | |
| "np": np, | |
| "json": json, | |
| "re": re, | |
| "__builtins__": __builtins__ | |
| } | |
| exec(code, safe_globals, {}) | |
| stdout = output_stream.getvalue() | |
| stderr = error_stream.getvalue() | |
| if stderr: | |
| return f"Error in execution:\n{stderr}\n\nStdout (if any):\n{stdout}" | |
| if stdout: | |
| if len(stdout) > MAX_MESSAGE_LENGTH: | |
| stdout = stdout[:MAX_MESSAGE_LENGTH] + f"\n...[truncated, {len(stdout)} total chars]" | |
| return f"{stdout}" | |
| return "Code executed but produced no output. Remember to use print() to display results!" | |
| except Exception as e: | |
| tb_str = traceback.format_exc() | |
| return f"Execution failed:\n{tb_str}" | |
| class ReadFileInput(BaseModel): | |
| path: str = Field(description="Path to the file to read") | |
| def read_file(path: str) -> str: | |
| """Reads a file from the filesystem.""" | |
| if not isinstance(path, str) or not path.strip(): | |
| return "Error: Invalid input. 'path' must be a non-empty string." | |
| print(f"📄 Reading file: {path}") | |
| file_path = find_file(path) | |
| if not file_path: | |
| cwd_files = os.listdir(".") | |
| return (f"Error: File not found: '{path}'\n" | |
| f"Files in current directory: {cwd_files}") | |
| try: | |
| content = file_path.read_text(encoding='utf-8') | |
| return truncate_if_needed(content) | |
| except UnicodeDecodeError: | |
| size = file_path.stat().st_size | |
| ext = file_path.suffix | |
| return (f"File appears to be binary ({size} bytes). Cannot display as text.\n" | |
| f"File type: {ext}\n" | |
| f"Consider using audio_transcription_tool for audio files.") | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| class WriteFileInput(BaseModel): | |
| path: str = Field(description="Path where file should be written") | |
| content: str = Field(description="Content to write to the file") | |
| def write_file(path: str, content: str) -> str: | |
| """Writes content to a file.""" | |
| if not isinstance(path, str) or not path.strip(): | |
| return "Error: Invalid input. 'path' must be a non-empty string." | |
| if not isinstance(content, str): | |
| return "Error: Invalid input. 'content' must be a string." | |
| print(f"✍️ Writing file: {path}") | |
| try: | |
| file_path = Path.cwd() / path | |
| file_path.parent.mkdir(parents=True, exist_ok=True) | |
| file_path.write_text(content, encoding='utf-8') | |
| return f"Successfully wrote {len(content)} characters to '{path}'." | |
| except Exception as e: | |
| return f"Error writing file '{path}': {str(e)}" | |
| class ListDirInput(BaseModel): | |
| path: str = Field(description="Directory path to list", default=".") | |
| def list_directory(path: str = ".") -> str: | |
| """Lists files and directories in a path.""" | |
| print(f"📁 Listing directory: {path}") | |
| try: | |
| dir_path = Path.cwd() / path if path != "." else Path.cwd() | |
| if not dir_path.is_dir(): | |
| return f"Error: '{path}' is not a valid directory." | |
| items = sorted(dir_path.iterdir()) | |
| if not items: | |
| return f"Directory '{path}' is empty." | |
| files, directories = [], [] | |
| for item in items: | |
| if item.is_dir(): | |
| directories.append(f"📁 {item.name}/") | |
| else: | |
| size = item.stat().st_size | |
| files.append(f"📄 {item.name} ({size} bytes)") | |
| result = f"Contents of '{path}':\n\n" | |
| if directories: | |
| result += "Directories:\n" + "\n".join(directories) + "\n\n" | |
| if files: | |
| result += "Files:\n" + "\n".join(files) | |
| return result | |
| except Exception as e: | |
| return f"Error listing directory '{path}': {str(e)}" | |
| class AudioInput(BaseModel): | |
| file_path: str = Field(description="Path to audio file to transcribe") | |
| def audio_transcription_tool(file_path: str) -> str: | |
| """Transcribes audio files to text using Whisper.""" | |
| if not isinstance(file_path, str) or not file_path.strip(): | |
| return "Error: Invalid input. 'file_path' must be a non-empty string." | |
| print(f"🎤 Transcribing audio: {file_path}") | |
| if asr_pipeline is None: | |
| return "Error: ASR pipeline is not available." | |
| audio_path = find_file(file_path) | |
| if not audio_path: | |
| return f"Error: Audio file not found: '{file_path}'" | |
| try: | |
| transcription = asr_pipeline(str(audio_path)) | |
| result_text = transcription.get("text", "") | |
| if not result_text: | |
| return "Error: Transcription produced no text." | |
| return f"Transcription:\n{truncate_if_needed(result_text)}" | |
| except Exception as e: | |
| return f"Error transcribing '{file_path}': {str(e)}" | |
| class YoutubeInput(BaseModel): | |
| video_url: str = Field(description="YouTube video URL") | |
| def get_youtube_transcript(video_url: str) -> str: | |
| """Fetches transcript/captions from a YouTube video.""" | |
| if not isinstance(video_url, str) or not video_url.strip(): | |
| return "Error: Invalid input. 'video_url' must be a non-empty string." | |
| print(f"📺 Getting YouTube transcript: {video_url}") | |
| try: | |
| video_id = None | |
| if "watch?v=" in video_url: | |
| video_id = video_url.split("v=")[1].split("&")[0] | |
| elif "youtu.be/" in video_url: | |
| video_id = video_url.split("youtu.be/")[1].split("?")[0] | |
| if not video_id: | |
| return f"Error: Could not extract YouTube video ID from '{video_url}'." | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
| if not transcript_list: | |
| return "Error: No transcript found for this video." | |
| full_transcript = " ".join([item["text"] for item in transcript_list]) | |
| return f"YouTube Transcript:\n{truncate_if_needed(full_transcript)}" | |
| except Exception as e: | |
| return f"Error getting transcript for '{video_url}': {str(e)}" | |
| class ScrapeInput(BaseModel): | |
| url: str = Field(description="URL to scrape (must start with http:// or https://)") | |
| query: str = Field(description="Specific question or information to find on the page") | |
| def scrape_and_retrieve(url: str, query: str) -> str: | |
| """ | |
| Scrapes a webpage and uses RAG to find relevant information. | |
| Use when: | |
| - You need specific information from a known webpage | |
| - Search results give you a URL that contains the answer | |
| - You need to extract data from a specific website | |
| """ | |
| if not (url.lower().startswith(('http://', 'https://'))): | |
| return f"Error: Invalid URL. Must start with http:// or https://. Got: '{url}'" | |
| if not query or not query.strip(): | |
| return "Error: A query is required to search the page content." | |
| if global_embeddings is None or global_text_splitter is None: | |
| if not initialize_rag_components(): | |
| return "Error: RAG components could not be initialized." | |
| print(f"🌐 Scraping & retrieving from: {url}") | |
| try: | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=20) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| for tag in soup(["script", "style", "nav", "footer", "aside", "header", "iframe", "noscript"]): | |
| tag.extract() | |
| main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile('content|main|article', re.I)) or soup.body | |
| if not main_content: | |
| return "Error: Could not find main content on the page." | |
| text = main_content.get_text(separator='\n', strip=True) | |
| lines = [line.strip() for line in text.splitlines()] | |
| text = '\n'.join(line for line in lines if line) | |
| if not text or len(text) < 50: | |
| return f"Error: Scraped content was too short or empty (length: {len(text)})." | |
| chunks = global_text_splitter.split_text(text) | |
| if not chunks: | |
| return "Error: Text could not be split into chunks." | |
| docs = [Document(page_content=chunk, metadata={"source": url}) for chunk in chunks] | |
| db = FAISS.from_documents(docs, global_embeddings) | |
| retriever = db.as_retriever(search_kwargs={"k": 5}) | |
| retrieved_docs = retriever.invoke(query) | |
| if not retrieved_docs: | |
| return f"No relevant information found on {url} for query: '{query}'\n\nThe page was successfully scraped but doesn't seem to contain information matching your query." | |
| context_parts = [] | |
| for i, doc in enumerate(retrieved_docs, 1): | |
| context_parts.append(f"[Chunk {i}]\n{doc.page_content}") | |
| context = "\n\n---\n\n".join(context_parts) | |
| result = f"Relevant information from {url}:\n\n{context}" | |
| return truncate_if_needed(result) | |
| except requests.RequestException as e: | |
| return f"Error fetching URL {url}: {str(e)}\n\nThe website may be blocking requests or may be temporarily unavailable." | |
| except Exception as e: | |
| tb_str = traceback.format_exc() | |
| return f"Error processing {url}: {str(e)}\n\nDetails:\n{tb_str}" | |
| class FinalAnswerInput(BaseModel): | |
| answer: str = Field(description="The final answer - EXACTLY what was asked for, nothing more") | |
| def final_answer_tool(answer: str) -> str: | |
| """ | |
| Submit your final answer. | |
| CRITICAL RULES: | |
| 1. ALWAYS call validate_answer() before this | |
| 2. The answer must be EXACTLY what was asked for | |
| 3. NO conversational text (no "The answer is...", etc.) | |
| 4. NO explanations | |
| 5. Match the requested format exactly | |
| Examples: | |
| - If asked for a number: "42" (not "The answer is 42") | |
| - If asked for a list: "red, blue, green" (not "The colors are: red, blue, green") | |
| - If asked yes/no: "yes" (not "Yes, it is true") | |
| """ | |
| if not isinstance(answer, str): | |
| try: | |
| answer = str(answer) | |
| except: | |
| return "Error: Invalid input. 'answer' must be a string." | |
| print(f"✅ FINAL ANSWER SUBMITTED: {answer}") | |
| return answer | |
| # ============================================================================= | |
| # DEFINED TOOLS LIST | |
| # ============================================================================= | |
| defined_tools = [ | |
| # Planning & Reflection (use these first!) | |
| create_plan, | |
| reflect_on_progress, | |
| validate_answer, | |
| # Core tools | |
| search_tool, | |
| calculator, | |
| code_interpreter, | |
| # File operations | |
| read_file, | |
| write_file, | |
| list_directory, | |
| # Specialized tools | |
| audio_transcription_tool, | |
| get_youtube_transcript, | |
| scrape_and_retrieve, | |
| # Final answer | |
| final_answer_tool | |
| ] | |
| # ============================================================================= | |
| # AGENT STATE | |
| # ============================================================================= | |
| class AgentState(TypedDict): | |
| messages: Annotated[List[AnyMessage], add_messages] | |
| turn: int | |
| has_plan: bool | |
| consecutive_errors: int | |
| tool_history: List[str] | |
| # ============================================================================= | |
| # FALLBACK PARSER | |
| # ============================================================================= | |
| def parse_tool_call_from_string(content: str, tools: List) -> List[ToolCall]: | |
| """Parses malformed tool call strings from an LLM response.""" | |
| print(f"Fallback parsing LLM content (first 500 chars):\n{content[:500]}") | |
| tool_name = None | |
| tool_input = None | |
| cleaned_str = None | |
| func_match = re.search( | |
| r"<function[(=]\s*([^)]+)\s*[)>](.*)", | |
| content, | |
| re.DOTALL | re.IGNORECASE | |
| ) | |
| if func_match: | |
| try: | |
| tool_name = func_match.group(1).strip().replace("'", "").replace('"', '') | |
| remaining_content = func_match.group(2) | |
| json_start_index = remaining_content.find('{') | |
| if json_start_index != -1: | |
| json_str = remaining_content[json_start_index:] | |
| cleaned_str = json_str.strip() | |
| cleaned_str = ''.join(c for c in cleaned_str if c.isprintable() or c in '\n\r\t') | |
| cleaned_str = cleaned_str.strip().rstrip(',') | |
| tool_input = json.loads(cleaned_str) | |
| print(f"🔧 Fallback: Parsed tool call for '{tool_name}'") | |
| else: | |
| print(f"⚠️ Fallback: Found <function> but no JSON blob.") | |
| tool_name = None | |
| except json.JSONDecodeError as e: | |
| print(f"⚠️ Fallback: json.loads failed, trying ast.literal_eval.") | |
| try: | |
| if cleaned_str: | |
| potential_input = ast.literal_eval(cleaned_str) | |
| if isinstance(potential_input, dict): | |
| tool_input = potential_input | |
| print(f"🔧 Fallback: Parsed with ast.literal_eval for '{tool_name}'") | |
| else: | |
| tool_name = None | |
| else: | |
| tool_name = None | |
| except: | |
| tool_name = None | |
| if tool_name and tool_input is not None: | |
| if any(t.name == tool_name for t in tools): | |
| tool_call = ToolCall( | |
| name=tool_name, | |
| args=tool_input, | |
| id=str(uuid.uuid4()) | |
| ) | |
| print(f"✅ Successfully created tool call: {tool_name}") | |
| return [tool_call] | |
| else: | |
| print(f"❌ Tool '{tool_name}' not found in available tools") | |
| print("❌ Failed to parse any valid tool call from content") | |
| return [] | |
| # ============================================================================= | |
| # CONDITIONAL EDGE FUNCTION | |
| # ============================================================================= | |
| def should_continue(state: AgentState): | |
| """Decide whether to continue, call tools, or end.""" | |
| last_message = state['messages'][-1] | |
| current_turn = state.get('turn', 0) | |
| # Check for final_answer_tool | |
| if isinstance(last_message, AIMessage) and last_message.tool_calls: | |
| for tool_call in last_message.tool_calls: | |
| if tool_call.get("name") == "final_answer_tool": | |
| print("--- Condition: final_answer_tool called, ending. ---") | |
| return END | |
| # Check turn limit | |
| if current_turn >= MAX_TURNS: | |
| print(f"--- Condition: Max turns ({MAX_TURNS}) reached. Ending. ---") | |
| return END | |
| # Route to tools if tool calls exist | |
| if isinstance(last_message, AIMessage) and last_message.tool_calls: | |
| print("--- Condition: Tools called, routing to tools node. ---") | |
| return "tools" | |
| # Loop prevention | |
| if len(state['messages']) > 2 and isinstance(last_message, AIMessage) and isinstance(state['messages'][-2], AIMessage): | |
| print(f"--- Condition: Detected 2+ consecutive AI messages (Turn {current_turn}). Ending to prevent loop. ---") | |
| return END | |
| # Loop back to agent | |
| print(f"--- Condition: No tool call (Turn {current_turn}). Continuing to agent. ---") | |
| return "agent" | |
| # ============================================================================= | |
| # ENHANCED AGENT CLASS WITH PLANNING & REFLECTION | |
| # ============================================================================= | |
| class PlanningReflectionAgent: | |
| def __init__(self): | |
| print("🧠 PlanningReflectionAgent initializing...") | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| if not GROQ_API_KEY: | |
| raise ValueError("GROQ_API_KEY environment variable is not set!") | |
| self.tools = defined_tools | |
| # Initialize RAG Components | |
| if not initialize_rag_components(): | |
| print("⚠️ Warning: RAG components failed to initialize.") | |
| # Build tool descriptions | |
| tool_desc_list = [] | |
| for tool in self.tools: | |
| if tool.args_schema: | |
| schema = tool.args_schema.model_json_schema() | |
| args_desc = [] | |
| for prop, details in schema.get('properties', {}).items(): | |
| desc = details.get('description', '') | |
| args_desc.append(f" - {prop}: {desc}") | |
| args_str = "\n".join(args_desc) | |
| desc = f"- {tool.name}:\n {tool.description}\n Args:\n{args_str}" | |
| else: | |
| desc = f"- {tool.name}: {tool.description}" | |
| tool_desc_list.append(desc) | |
| tool_descriptions = "\n".join(tool_desc_list) | |
| # Enhanced System Prompt with Planning & Reflection | |
| self.system_prompt = f"""You are an elite AI agent designed for the GAIA benchmark - the most challenging question-answering tasks. | |
| 🎯 YOUR MISSION: Provide the EXACT answer in the EXACT format requested. | |
| ═══════════════════════════════════════════════════════════════ | |
| 📋 MANDATORY PROTOCOL - FOLLOW THIS RELIGIOUSLY: | |
| ═══════════════════════════════════════════════════════════════ | |
| **PHASE 1: PLANNING (For complex/multi-step questions)** | |
| ├─ 1. Call create_plan() to think through your approach | |
| ├─ 2. Identify what information you need | |
| └─ 3. Determine the sequence of steps | |
| **PHASE 2: EXECUTION (One step at a time)** | |
| ├─ 1. Take ONE action per turn | |
| ├─ 2. Use the RIGHT tool for each task: | |
| │ • Simple math → calculator() | |
| │ • Complex data → code_interpreter() | |
| │ • Web info → search_tool() | |
| │ • Specific page → scrape_and_retrieve() | |
| │ • Files → read_file() | |
| ├─ 3. After EACH tool, evaluate the result | |
| └─ 4. Ask: "Do I have enough to answer now?" | |
| **PHASE 3: REFLECTION (If stuck)** | |
| ├─ If no progress after 3-5 turns → call reflect_on_progress() | |
| ├─ If tools keep failing → try different approach | |
| └─ If going in circles → step back and reconsider | |
| **PHASE 4: VALIDATION & SUBMISSION** | |
| ├─ 1. When you have the answer → call validate_answer() | |
| ├─ 2. If validation passes → call final_answer_tool() | |
| └─ 3. If validation fails → fix the issue first | |
| ═══════════════════════════════════════════════════════════════ | |
| 🎓 EXAMPLES - LEARN FROM THESE: | |
| ═══════════════════════════════════════════════════════════════ | |
| **Example 1: Simple Math** | |
| Q: What is 127 × 83? | |
| Turn 1: calculator("127 * 83") → 10541 | |
| Turn 2: validate_answer("10541", "What is 127 × 83?") → ✅ Pass | |
| Turn 3: final_answer_tool("10541") | |
| **Example 2: Multi-step Research** | |
| Q: What was the population of Einstein's birthplace in 1900? | |
| Turn 1: create_plan("What was the population of Einstein's birthplace in 1900?") | |
| Turn 2: search_tool("Albert Einstein birthplace") → Ulm, Germany | |
| Turn 3: search_tool("Ulm Germany population 1900") → approximately 50,000 | |
| Turn 4: validate_answer("50000", "What was the population...") → ✅ Pass | |
| Turn 5: final_answer_tool("50000") | |
| **Example 3: File + Calculation** | |
| Q: What's the average of the 'score' column in data.csv? | |
| Turn 1: list_directory(".") → [files shown] | |
| Turn 2: read_file("data.csv") → [content] | |
| Turn 3: code_interpreter("import pandas as pd; df = pd.read_csv('data.csv'); print(df['score'].mean())") | |
| → 78.5 | |
| Turn 4: validate_answer("78.5", "What's the average...") → ✅ Pass | |
| Turn 5: final_answer_tool("78.5") | |
| **Example 4: Getting Unstuck** | |
| Q: What's the GDP of the 2016 Olympics host? | |
| Turn 1: search_tool("2016 Olympics") → [general info, no clear answer] | |
| Turn 2: search_tool("Olympics 2016 location") → [still unclear] | |
| Turn 3: reflect_on_progress("Tried searching but not getting clear host country") | |
| → Try: "2016 Summer Olympics host country" | |
| Turn 4: search_tool("2016 Summer Olympics host country") → Brazil | |
| Turn 5: search_tool("Brazil GDP 2016") → $1.796 trillion | |
| Turn 6: validate_answer("1.796 trillion", original_q) → ✅ Pass | |
| Turn 7: final_answer_tool("1.796 trillion") | |
| ═══════════════════════════════════════════════════════════════ | |
| ⚠️ CRITICAL RULES - NEVER VIOLATE THESE: | |
| ═══════════════════════════════════════════════════════════════ | |
| 1. **NO GUESSING**: Always use tools. Never use your own knowledge. | |
| 2. **ONE STEP AT A TIME**: Don't try to do multiple things in one turn. | |
| 3. **EXACT FORMAT**: Answer must be EXACTLY what was asked for. | |
| 4. **NO FLUFF**: Never add "The answer is" or explanations in final answer. | |
| 5. **ALWAYS VALIDATE**: Call validate_answer() before final_answer_tool(). | |
| 6. **PLAN COMPLEX TASKS**: Multi-step questions need create_plan() first. | |
| 7. **REFLECT WHEN STUCK**: If no progress after 5 turns, call reflect_on_progress(). | |
| ═══════════════════════════════════════════════════════════════ | |
| 📚 AVAILABLE TOOLS: | |
| ═══════════════════════════════════════════════════════════════ | |
| {tool_descriptions} | |
| ═══════════════════════════════════════════════════════════════ | |
| 🎯 REMEMBER: Quality over speed. Think carefully, plan ahead, execute methodically. | |
| ═══════════════════════════════════════════════════════════════ | |
| """ | |
| print("Initializing Groq LLM...") | |
| try: | |
| self.llm_with_tools = ChatGroq( | |
| temperature=0, | |
| groq_api_key=GROQ_API_KEY, | |
| model_name="llama-3.3-70b-versatile", | |
| max_tokens=4096, | |
| timeout=60 | |
| ).bind_tools(self.tools, tool_choice="auto") | |
| print("✅ LLM initialized.") | |
| except Exception as e: | |
| print(f"❌ Error initializing Groq: {e}") | |
| raise | |
| # Agent Node with Enhanced Logic | |
| def agent_node(state: AgentState): | |
| current_turn = state.get('turn', 0) + 1 | |
| print(f"\n{'='*70}") | |
| print(f"🤖 AGENT TURN {current_turn}/{MAX_TURNS}") | |
| print('='*70) | |
| if current_turn > MAX_TURNS: | |
| return { | |
| "messages": [SystemMessage(content="Max turns reached. Submitting best available answer.")], | |
| "turn": current_turn | |
| } | |
| # Check if we should auto-trigger reflection | |
| should_reflect = False | |
| consecutive_errors = state.get('consecutive_errors', 0) | |
| if current_turn > 5 and current_turn % REFLECT_EVERY_N_TURNS == 0: | |
| should_reflect = True | |
| print("🤔 Auto-triggering reflection (periodic check)") | |
| if consecutive_errors >= 3: | |
| should_reflect = True | |
| print("🤔 Auto-triggering reflection (multiple errors)") | |
| # Add reflection hint if needed | |
| messages_to_send = state["messages"].copy() | |
| if should_reflect and not state.get('has_plan', False): | |
| hint = SystemMessage( | |
| content="⚠️ SYSTEM HINT: You've been working for several turns. Consider calling reflect_on_progress() to evaluate your approach." | |
| ) | |
| messages_to_send.append(hint) | |
| # Invoke LLM | |
| max_retries = 3 | |
| ai_message = None | |
| for attempt in range(max_retries): | |
| try: | |
| ai_message = self.llm_with_tools.invoke(messages_to_send) | |
| break | |
| except Exception as e: | |
| print(f"⚠️ LLM attempt {attempt+1}/{max_retries} failed: {e}") | |
| if attempt == max_retries - 1: | |
| ai_message = AIMessage( | |
| content=f"Error: LLM failed after {max_retries} attempts: {e}" | |
| ) | |
| time.sleep(2 ** attempt) | |
| # Fallback Parsing | |
| if not ai_message.tool_calls and isinstance(ai_message.content, str) and ai_message.content.strip(): | |
| parsed_tool_calls = parse_tool_call_from_string(ai_message.content, self.tools) | |
| if parsed_tool_calls: | |
| print("🔧 Fallback: Successfully rebuilt tool call") | |
| ai_message.tool_calls = parsed_tool_calls | |
| ai_message.content = "" | |
| # Track tool usage | |
| tool_history = state.get('tool_history', []) | |
| has_plan = state.get('has_plan', False) | |
| if ai_message.tool_calls: | |
| tool_name = ai_message.tool_calls[0]['name'] | |
| print(f"🔧 Tool Call: {tool_name}") | |
| tool_history.append(tool_name) | |
| if tool_name == "create_plan": | |
| has_plan = True | |
| else: | |
| print(f"💭 Reasoning: {ai_message.content[:200]}...") | |
| return { | |
| "messages": [ai_message], | |
| "turn": current_turn, | |
| "has_plan": has_plan, | |
| "tool_history": tool_history | |
| } | |
| # Tool Node with Error Tracking | |
| def tool_node_wrapper(state: AgentState): | |
| """Wraps tool execution to track errors""" | |
| tool_node = ToolNode(self.tools) | |
| result = tool_node(state) | |
| # Check if last message is a tool error | |
| if result['messages']: | |
| last_msg = result['messages'][-1] | |
| if isinstance(last_msg, ToolMessage) and "Error" in last_msg.content: | |
| consecutive_errors = state.get('consecutive_errors', 0) + 1 | |
| result['consecutive_errors'] = consecutive_errors | |
| else: | |
| result['consecutive_errors'] = 0 | |
| return result | |
| # Build Graph | |
| print("Building Planning & Reflection Agent graph...") | |
| graph_builder = StateGraph(AgentState) | |
| graph_builder.add_node("agent", agent_node) | |
| graph_builder.add_node("tools", tool_node_wrapper) | |
| graph_builder.add_edge(START, "agent") | |
| graph_builder.add_conditional_edges( | |
| "agent", | |
| should_continue, | |
| { | |
| "tools": "tools", | |
| "agent": "agent", | |
| END: END | |
| } | |
| ) | |
| graph_builder.add_edge("tools", "agent") | |
| self.graph = graph_builder.compile() | |
| print("✅ Planning & Reflection Agent graph compiled successfully.") | |
| def __call__(self, question: str) -> str: | |
| print(f"\n--- Starting Agent Run for Question ---") | |
| print(f"Agent received question (first 100 chars): {question[:100]}...") | |
| graph_input = { | |
| "messages": [ | |
| SystemMessage(content=self.system_prompt), | |
| HumanMessage(content=question) | |
| ], | |
| "turn": 0 | |
| } | |
| final_answer = "AGENT FAILED TO PRODUCE ANSWER" | |
| try: | |
| config = {"recursion_limit": MAX_TURNS + 5} | |
| for event in self.graph.stream(graph_input, stream_mode="values", config=config): | |
| if event.get('messages'): # Ensure messages exist | |
| last_message = event["messages"][-1] | |
| else: | |
| continue # Skip if no messages yet | |
| # Check for final answer extraction | |
| if isinstance(last_message, AIMessage) and last_message.tool_calls: | |
| if last_message.tool_calls[0].get("name") == "final_answer_tool": | |
| final_answer_args = last_message.tool_calls[0].get('args', {}) | |
| if 'answer' in final_answer_args: | |
| final_answer = final_answer_args['answer'] | |
| print(f"--- Final Answer Captured from tool call: '{final_answer}' ---") | |
| break | |
| else: | |
| print(f"⚠️ Final Answer tool called without 'answer' argument: {final_answer_args}") | |
| final_answer = "ERROR: FINAL_ANSWER_TOOL CALLED WITHOUT ANSWER" | |
| break | |
| elif isinstance(last_message, ToolMessage): | |
| print(f"Tool Result ({last_message.tool_call_id}): {last_message.content[:500]}...") | |
| elif isinstance(last_message, AIMessage) and not last_message.tool_calls: | |
| print(f"AI Message (Reasoning): {last_message.content[:500]}...") | |
| elif isinstance(last_message, SystemMessage): | |
| print(f"System Message: {last_message.content[:500]}...") | |
| # --- Final Answer Cleaning --- | |
| cleaned_answer = str(final_answer).strip() | |
| prefixes_to_remove = ["The answer is:", "Here is the answer:", "Based on the information:", "Final Answer:", "Answer:"] | |
| original_cleaned = cleaned_answer | |
| for prefix in prefixes_to_remove: | |
| if cleaned_answer.lower().startswith(prefix.lower()): | |
| potential_answer = cleaned_answer[len(prefix):].strip() | |
| if potential_answer: | |
| cleaned_answer = potential_answer | |
| break | |
| cleaned_answer = remove_fences_simple(cleaned_answer) | |
| if cleaned_answer.startswith("`") and cleaned_answer.endswith("`"): | |
| cleaned_answer = cleaned_answer[1:-1].strip() | |
| print(f"Agent returning final answer (cleaned): '{cleaned_answer}'") | |
| return cleaned_answer | |
| except Exception as e: | |
| print(f"Error running agent graph: {e}") | |
| tb_str = traceback.format_exc() | |
| print(tb_str) | |
| return f"AGENT GRAPH ERROR: {e}" | |
| # ============================================================================= | |
| # GLOBAL AGENT INSTANTIATION | |
| # ============================================================================= | |
| try: | |
| initialize_rag_components() | |
| agent = PlanningReflectionAgent() | |
| print("✅ Global PlanningReflectionAgent instantiated successfully.") | |
| if asr_pipeline is None: | |
| print("⚠️ Global ASR Pipeline failed to load.") | |
| except Exception as e: | |
| print(f"❌ FATAL: Could not instantiate global agent: {e}") | |
| traceback.print_exc() | |
| agent = None | |
| # ==================================================== | |
| # --- (Original Template Code - Mock Questions Version) --- | |
| def run_and_submit_all( profile: gr.OAuthProfile | None): # Corrected type hint | |
| """ | |
| Fetches MOCK questions, runs the BasicAgent on them, simulates submission prep, | |
| and displays the results. DOES NOT SUBMIT. | |
| """ | |
| space_id = os.getenv("SPACE_ID") | |
| username = profile.username if profile else "local_test_user" | |
| print(f"User: {username}{'' if profile else ' (dummy)'}") | |
| # Check if global agent initialized | |
| if not agent: | |
| return "FATAL ERROR: Global agent failed to initialize. Check logs.", None | |
| print("Using globally instantiated agent.") | |
| agent_code = f"httpsS://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local_run" # Corrected URL | |
| print(f"Agent code URL: {agent_code}") | |
| print("--- USING MOCK QUESTIONS ---") | |
| # --- MOCK QUESTIONS --- | |
| # | |
| # vvv PASTE YOUR FULL LIST OF 20 MOCK QUESTIONS HERE vvv | |
| # | |
| mock_questions_data = [ | |
| { | |
| "task_id": "mock_level1_001", | |
| "question": r"""Here's a fun riddle that I'd like you to try.\n\nAn adventurer exploring an ancient tomb came across a horde of gold coins, all neatly stacked in columns. As he reached to scoop them into his backpack, a mysterious voice filled the room. \"You have fallen for my trap adventurer,\" the voice began, and suddenly the doorway to the chamber was sealed by a heavy rolling disk of stone. The adventurer tried to move the stone disk but was unable to budge the heavy stone. Trapped, he was startled when the voice again spoke. \n\n\"If you solve my riddle, I will reward you with a portion of my riches, but if you are not clever, you will never leave this treasure chamber. Before you are 200 gold coins. I pose a challenge to you, adventurer. Within these stacks of coins, all but 30 are face-up. You must divide the coins into two piles, one is yours, and one is mine. You may place as many coins as you like in either pile. You may flip any coins over, but you may not balance any coins on their edges. For every face-down coin in your pile, you will be rewarded with two gold coins. But be warned, if both piles do not contain the same number of face-down coins, the door will remain sealed for all eternity!\"\n\nThe adventurer smiled, as this would be an easy task. All he had to do was flip over every coin so it was face down, and he would win the entire treasure! As he moved to the columns of coins, however, the light suddenly faded, and he was left in total darkness. The adventurer reached forward and picked up one of the coins, and was shocked when he realized that both sides felt almost the same. Without the light, he was unable to determine which side of the coin was heads and which side was tails. He carefully replaced the coin in its original orientation and tried to think of a way to solve the puzzle. Finally, out of desperation, the adventurer removed 30 coins to create his pile. He then carefully flipped over each coin in his pile, so its orientation was inverted from its original state.\n\n\"I've finished,\" he said, and the lights returned. Looking at the two piles, he noticed that the larger pile contained 14 face-down coins.\n\nWhat was the outcome for the adventurer? If he failed the challenge, please respond with \"The adventurer died.\" Otherwise, please provide the number of coins the adventurer won at the conclusion of the riddle. If the adventurer won any coins, provide your response as the number of coins, with no other text.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_002", | |
| "question": r"""If you use some of the letters in the given Letter Bank to spell out the sentence "I am a penguin halfway to the moon", which of the remaining unused letters would have to be changed to spell out, "The moon is made of cheese"? Return a comma-separated alphabetized list.\nLetter Bank: {OAMFETIMPECRFSHTDNIWANEPNOFAAIYOOMGUTNAHHLNEHCME}""" | |
| }, | |
| { | |
| "task_id": "mock_level1_003", | |
| "question": r"""A data annotator stayed up too late creating test questions to check that a system was working properly and submitted several questions with mathematical errors. On nights when they created 15 test questions, they made 1 error. On nights when they created fewer than 15 questions, they also corrected 3 errors. On nights they created 20 questions, they made 0 errors. On nights when they created 25 or more, they made 4 errors. Over the course of five nights, the worker produced a total of 6 errors. When asked how many nights they created 15 questions, they gave three possible numbers as responses. What are the three numbers, presented in the format x, y, z in ascending order?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_004", | |
| "question": r"""Please solve the following crossword:\n\n|1|2|3|4|5|\n|6| | | | |\n|7| | | | |\n|8| | | | |\n|X|9| | | |\n\nI have indicated by numbers where the hints start, so you should replace numbers and spaces by the answers.\nAnd X denotes a black square that isn\u2019t to fill.\n\nACROSS\n- 1 Wooden strips on a bed frame\n- 6 _ Minhaj, Peabody-winning comedian for "Patriot Act"\n- 7 Japanese city of 2.6+ million\n- 8 Stopwatch, e.g.\n- 9 Pain in the neck\n\nDOWN\n- 1 Quick drink of whiskey\n- 2 Eye procedure\n- 3 "Same here," in a three-word phrase\n- 4 Already occupied, as a seat\n- 5 Sarcastically critical commentary. Answer by concatenating the characters you choose to fill the crossword, in row-major order.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_005", | |
| "question": r"""I wanted to make another batch of cherry melomel. I remember liking the last recipe I tried, but I can't remember it off the top of my head. It was from the Reddit, r/mead. I remember that the user who made it had a really distinct name, I think it was StormBeforeDawn. Could you please look up the recipe for me? I'm not sure if it has been changed, so please make sure that the recipe you review wasn't updated after July 14, 2022. That's the last time I tried the recipe.\n\nWhat I want to know is how many cherries I'm supposed to use. I'm making a 10-gallon batch in two 5-gallon carboys. Please just respond with the integer number of pounds of whole cherries with pits that are supposed to be used for a 10-gallon batch.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_006", | |
| "question": r"""Verify each of the following ISBN 13 numbers:\n\n1. 9783518188156\n2. 9788476540746\n3. 9788415091004\n4. 9788256014590\n5. 9782046407331\n\nIf any are invalid, correct them by changing the final digit. Then, return the list, comma separated, in the same order as in the question.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_007", | |
| "question": r"""A porterhouse by any other name is centered around a letter. What does Three Dog Night think about the first natural number that starts with that letter? Give the first line from the lyrics that references it.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_008", | |
| "question": r"""Bob has genome type Aa, and Linda has genome type Aa. Assuming that a child of theirs also has a child with someone who also has genome type Aa, what is the probability that Bob and Linda's grandchild will have Genome type Aa? Write the answer as a percentage, rounding to the nearest integer if necessary.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_009", | |
| "question": r"""An array of candy is set out to choose from including gumballs, candy corn, gumdrops, banana taffy, chocolate chips, and gummy bears. There is one bag of each type of candy. The gumballs come in red, orange, yellow, green, blue, and brown. The candy corn is yellow, white, and orange. The gumdrops are red, green, purple, yellow, and orange. The banana taffy is yellow. The chocolate chips are brown and white. The gummy bears are red, green, yellow, and orange. Five people pass through and each selects one bag. The first selects one with only primary colors. The second selects one with no primary colors. The third selects one with all the primary colors. The fourth selects one that has neither the most nor the least colors of the remaining bags. The fifth selects the one with their favorite color, green. A second bag of the candy the first person chose is added to the remaining bag of candy. Which two candies are in the remaining bag after the addition? Give me them in a comma separated list, in alphabetical order""" | |
| }, | |
| { | |
| "task_id": "mock_level1_010", | |
| "question": r"""In the year 2020, where were koi fish found in the watershed with the id 02040203? Give only the name of the pond, lake, or stream where the fish were found, and not the name of the city or county.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_011", | |
| "question": r"""In Sonia Sanchez\u2019s poem \u201cfather\u2019s voice\u201d, what primary colour is evoked by the imagery in the beginning of the tenth stanza? Answer with a capitalized word.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_012", | |
| "question": r"""According to Papers with Code, what was the name of the first model to go beyond 70% of accuracy on ImageNet ?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_013", | |
| "question": r"""What is the dimension of the boundary of the tame twindragon rounded to two decimal places?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_014", | |
| "question": r"""In what year was the home village of the subject of British Museum item #Bb,11.118 founded?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_015", | |
| "question": r"""What is the ISSN of the journal that included G. Scott's potato article that mentioned both a fast food restaurant and a Chinese politician in the title in a 2012 issue?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_016", | |
| "question": r"""VNV Nation has a song that shares its title with the nickname of Louis XV. What album was it released with?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_017", | |
| "question": r"""If I combine a Beatle's first name and a type of beer, in what category and year of Nobel Prize do I have a winner? Answer using the format CATEGORY, YEAR.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_018", | |
| "question": r"""In the version of NumPy where the numpy.msort function was deprecated, which attribute was added to the numpy.polynomial package's polynomial classes?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_019", | |
| "question": r"""A word meaning dramatic or theatrical forms a species of duck when appended with two letters and then duplicated. What is that word?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_020", | |
| "question": r"""As of August 2023, how many in-text citations on the West African Vodun Wikipedia page reference a source that was cited using Scopus?""" | |
| } | |
| ] | |
| questions_data = mock_questions_data | |
| print(f"Using {len(questions_data)} mock questions.") | |
| results_log, answers_payload = [], [] | |
| print(f"Running agent on {len(questions_data)} mock questions...") | |
| for i, item in enumerate(questions_data): | |
| task_id, question_text = item.get("task_id"), item.get("question") | |
| if not task_id or question_text is None: print(f"Skipping mock item {i+1}"); continue | |
| print(f"\n--- Running Mock Task {i+1} (ID: {task_id}) ---") | |
| try: | |
| file_path = item.get("file_path") | |
| question_text_with_context = question_text | |
| if file_path: | |
| question_text_with_context = f"{question_text}\n\n[Attached File: {file_path}]" | |
| print(f"Q includes file: {file_path}") | |
| submitted_answer = agent(question_text_with_context) | |
| submitted_answer_str = str(submitted_answer) if submitted_answer is not None else "" | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer_str}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer_str}) | |
| print(f"--- Mock Task {task_id} Complete ---") | |
| except Exception as e: | |
| print(f"FATAL ERROR on mock task {task_id}: {e}") | |
| import traceback; traceback.print_exc() | |
| submitted_answer = f"AGENT CRASH: {e}" | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
| if not answers_payload: return "Agent produced no answers.", pd.DataFrame(results_log) | |
| status_update = f"Finished mock run. Processed {len(answers_payload)} answers for '{username}'." | |
| print(status_update); print("--- MOCK RUN - SUBMISSION SKIPPED ---") | |
| final_status = "--- Mock RUN COMPLETE ---\n" + status_update + "\nSubmission SKIPPED." # Corrected typo | |
| results_df = pd.DataFrame(results_log); results_df['Correct'] = 'N/A (Mock)' | |
| return final_status, results_df | |
| # --- Build Gradio Interface --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# GAIA Agent - MOCK TEST (Groq Llama3.1)") | |
| gr.Markdown(""" | |
| **Instructions:** Click 'Run Mock Evaluation'. | |
| **Notes:** Uses Groq (Llama-3.3-70b Executor). Ensure `GROQ_API_KEY` secret/env var exists. **DOES NOT** fetch official Qs or submit. Check logs for details. | |
| """) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Mock Evaluation") | |
| status_output = gr.Textbox(label="Run Status / Mock Result", lines=5, interactive=False) | |
| results_table = gr.DataFrame(label="Mock Qs, Agent Answers, Results", wrap=True) | |
| run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " App Starting " + "-"*30) | |
| space_host_startup = os.getenv("SPACE_ID"); space_id_startup = os.getenv("SPACE_ID") # Corrected variable name | |
| if space_host_startup: print(f"✅ SPACE_HOST: {space_host_startup}\n Runtime URL: https://{space_host_startup}.hf.space") | |
| else: print("ℹ️ No SPACE_HOST (local?).") | |
| if space_id_startup: print(f"✅ SPACE_ID: {space_id_startup}\n Repo URL: https://huggingface.co/spaces/{space_id_startup}\n Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
| else: print("ℹ️ No SPACE_ID (local?).") | |
| try: script_dir = os.path.dirname(os.path.realpath(__file__)) | |
| except NameError: script_dir = os.getcwd() | |
| print(f"Script directory: {script_dir}") | |
| print(f"CWD: {os.getcwd()}") | |
| try: print("Files in CWD:", os.listdir(".")) | |
| except FileNotFoundError: print("Warning: CWD listing failed.") | |
| print("-"*(60 + len(" App Starting ")) + "\n") | |
| print("Launching Gradio Interface...") | |
| demo.queue().launch(debug=True, share=False) |