Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import requests | |
| import inspect | |
| import pandas as pd | |
| import io | |
| import contextlib | |
| import traceback | |
| from typing import TypedDict, Annotated, List | |
| import torch | |
| import json | |
| import re | |
| import uuid | |
| import time | |
| # --- Multimodal & Web Tool Imports --- | |
| from transformers import pipeline | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from bs4 import BeautifulSoup | |
| # --- LangChain & LangGraph Imports --- | |
| from langgraph.graph.message import add_messages | |
| from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage, ToolCall | |
| from langgraph.prebuilt import ToolNode | |
| from langgraph.graph import START, END, StateGraph | |
| from langchain_community.tools import DuckDuckGoSearchRun | |
| from langchain_core.tools import tool | |
| from langchain_groq import ChatGroq | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| MAX_TURNS = 20 # Increased from 15 for complex questions | |
| MAX_MESSAGE_LENGTH = 8000 # Truncate long outputs | |
| # --- Initialize ASR Pipeline --- | |
| asr_pipeline = None | |
| try: | |
| print("Loading ASR (Whisper) pipeline globally...") | |
| device = 0 if torch.cuda.is_available() else -1 | |
| device_name = "cuda:0" if device == 0 else "cpu" | |
| print(f"Attempting to use device: {device_name} for ASR.") | |
| asr_pipeline = pipeline( | |
| "automatic-speech-recognition", | |
| model="openai/whisper-base", | |
| torch_dtype=torch.float16 if device == 0 else torch.float32, | |
| device=device | |
| ) | |
| print("✅ ASR (Whisper) pipeline loaded successfully.") | |
| except Exception as e: | |
| print(f"⚠️ Warning: Could not load ASR pipeline globally. Error: {e}") | |
| asr_pipeline = None | |
| # ==================================================== | |
| # --- Tool Definitions --- | |
| def search_tool(query: str) -> str: | |
| """Calls DuckDuckGo search and returns the results. Use this for recent information or general web searches.""" | |
| if not isinstance(query, str) or not query.strip(): | |
| return "Error: Invalid input. 'query' must be a non-empty string." | |
| print(f"--- Calling Search Tool with query: {query} ---") | |
| try: | |
| search = DuckDuckGoSearchRun() | |
| result = search.run(query) | |
| # Truncate if too long | |
| if len(result) > MAX_MESSAGE_LENGTH: | |
| result = result[:MAX_MESSAGE_LENGTH] + f"\n...[truncated, {len(result)} total chars]" | |
| return result | |
| except Exception as e: | |
| tb_str = traceback.format_exc() | |
| print(f"--- Search Tool FAILED ---\n{tb_str}\n---") | |
| return f"Error running search for '{query}': {str(e)}" | |
| def code_interpreter(code: str) -> str: | |
| """ | |
| Executes a string of Python code and returns its stdout, stderr, and any error. | |
| Use for calculations, data manipulation (pandas), logic puzzles, file processing. | |
| CRITICAL RULES: | |
| 1. ALWAYS use print() to output your final answer. | |
| 2. Write simple, focused code. One task per execution. | |
| 3. Add comments (#) to explain your logic. | |
| Available: pandas as pd, basic Python libraries. | |
| """ | |
| if not isinstance(code, str): | |
| return "Error: Invalid input. 'code' must be a string." | |
| # Basic safety checks | |
| dangerous_patterns = ['__import__', 'eval(', 'compile(', 'subprocess', 'os.system'] | |
| code_lower = code.lower() | |
| for pattern in dangerous_patterns: | |
| if pattern in code_lower: | |
| return f"Error: Potentially dangerous operation '{pattern}' is not allowed." | |
| # Check for file writing in code | |
| if 'open(' in code_lower and any(mode in code for mode in ["'w'", '"w"', "'a'", '"a"', "'wb'", '"wb"']): | |
| return "Error: Writing files is not allowed in code_interpreter. Use write_file tool instead." | |
| print(f"--- Calling Code Interpreter ---\nCode:\n{code}\n---") | |
| output_stream = io.StringIO() | |
| error_stream = io.StringIO() | |
| try: | |
| with contextlib.redirect_stdout(output_stream), contextlib.redirect_stderr(error_stream): | |
| safe_globals = { | |
| "pd": pd, | |
| "__builtins__": __builtins__ | |
| } | |
| exec(code, safe_globals, {}) | |
| stdout = output_stream.getvalue() | |
| stderr = error_stream.getvalue() | |
| if stderr: | |
| return f"Error in execution:\n{stderr}\n\nStdout (if any):\n{stdout}" | |
| if stdout: | |
| # Truncate if too long | |
| if len(stdout) > MAX_MESSAGE_LENGTH: | |
| stdout = stdout[:MAX_MESSAGE_LENGTH] + f"\n...[truncated, {len(stdout)} total chars]" | |
| return f"Success:\n{stdout}" | |
| return "Success: Code executed without error but produced no output.\n⚠️ Remember to use print() to output your results!" | |
| except Exception as e: | |
| tb_str = traceback.format_exc() | |
| print(f"--- Code Interpreter FAILED ---\n{tb_str}\n---") | |
| error_msg = f"Execution failed:\n{tb_str}\n\n💡 Hints:\n- Check your syntax\n- Ensure you're using print() for output\n- Verify variable names and types" | |
| return error_msg | |
| def read_file(path: str) -> str: | |
| """Reads the content of a file at the specified path. Use this to examine uploaded files or files you've created.""" | |
| if not isinstance(path, str) or not path.strip(): | |
| return "Error: Invalid input. 'path' must be a non-empty string." | |
| print(f"--- Calling Read File Tool: {path} ---") | |
| try: | |
| script_dir = os.getcwd() | |
| safe_path = os.path.normpath(path) | |
| # Try multiple path strategies | |
| paths_to_try = [ | |
| os.path.join(script_dir, safe_path), # Relative to CWD | |
| safe_path, # Direct/absolute | |
| os.path.join(os.getcwd(), os.path.basename(safe_path)) # Basename in CWD | |
| ] | |
| full_path = None | |
| for attempt_path in paths_to_try: | |
| if os.path.exists(attempt_path): | |
| full_path = attempt_path | |
| break | |
| if not full_path: | |
| try: | |
| cwd_files = os.listdir(".") | |
| except Exception: | |
| cwd_files = ["(could not list)"] | |
| return (f"Error: File not found: '{path}'\n" | |
| f"Tried paths:\n" + "\n".join(f" - {p}" for p in paths_to_try) + | |
| f"\n\nFiles in current directory: {cwd_files}") | |
| print(f"Reading file: {full_path}") | |
| # Try to detect file type | |
| _, ext = os.path.splitext(full_path) | |
| try: | |
| with open(full_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Truncate if too long | |
| if len(content) > MAX_MESSAGE_LENGTH: | |
| content = content[:MAX_MESSAGE_LENGTH] + f"\n...[truncated, {len(content)} total chars]" | |
| return content | |
| except UnicodeDecodeError: | |
| # Try binary read for non-text files | |
| try: | |
| with open(full_path, 'rb') as f: | |
| binary_content = f.read() | |
| return f"File appears to be binary ({len(binary_content)} bytes). Cannot display as text.\nFile type: {ext}\nConsider using audio_transcription_tool for audio files." | |
| except Exception as bin_e: | |
| return f"Error: Could not read file as text or binary: {str(bin_e)}" | |
| except PermissionError: | |
| return f"Error: Permission denied reading '{full_path}'." | |
| except IsADirectoryError: | |
| return f"Error: '{full_path}' is a directory, not a file. Use list_directory to see its contents." | |
| except Exception as read_e: | |
| tb_str = traceback.format_exc() | |
| return f"Error reading file: {str(read_e)}\n{tb_str}" | |
| except Exception as e: | |
| tb_str = traceback.format_exc() | |
| print(f"--- Read File Tool FAILED ---\n{tb_str}\n---") | |
| return f"Unexpected error accessing file '{path}': {str(e)}" | |
| def write_file(path: str, content: str) -> str: | |
| """Writes content to a file at the specified path. Creates directories if needed.""" | |
| if not isinstance(path, str) or not path.strip(): | |
| return "Error: Invalid input. 'path' must be a non-empty string." | |
| if not isinstance(content, str): | |
| return "Error: Invalid input. 'content' must be a string." | |
| print(f"--- Calling Write File Tool: {path} ---") | |
| try: | |
| base_dir = os.getcwd() | |
| full_path = os.path.join(base_dir, path) | |
| # Create directories if needed | |
| dir_path = os.path.dirname(full_path) | |
| if dir_path: | |
| os.makedirs(dir_path, exist_ok=True) | |
| with open(full_path, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| return f"Successfully wrote {len(content)} characters to '{path}'." | |
| except PermissionError: | |
| return f"Error: Permission denied writing to '{path}'." | |
| except Exception as e: | |
| tb_str = traceback.format_exc() | |
| return f"Error writing file '{path}': {str(e)}\n{tb_str}" | |
| def list_directory(path: str = ".") -> str: | |
| """Lists the contents of a directory. Useful for finding available files.""" | |
| if not isinstance(path, str): | |
| return "Error: Invalid input. 'path' must be a string." | |
| print(f"--- Calling List Directory Tool: {path} ---") | |
| try: | |
| base_dir = os.getcwd() | |
| full_path = os.path.join(base_dir, path) if path != "." else base_dir | |
| if not os.path.isdir(full_path): | |
| return f"Error: '{path}' is not a valid directory." | |
| items = os.listdir(full_path) | |
| if not items: | |
| return f"Directory '{path}' is empty." | |
| # Separate files and directories | |
| files = [] | |
| directories = [] | |
| for item in sorted(items): | |
| item_path = os.path.join(full_path, item) | |
| if os.path.isdir(item_path): | |
| directories.append(f"📁 {item}/") | |
| else: | |
| size = os.path.getsize(item_path) | |
| files.append(f"📄 {item} ({size} bytes)") | |
| result = f"Contents of '{path}':\n\n" | |
| if directories: | |
| result += "Directories:\n" + "\n".join(directories) + "\n\n" | |
| if files: | |
| result += "Files:\n" + "\n".join(files) | |
| return result | |
| except PermissionError: | |
| return f"Error: Permission denied listing directory '{path}'." | |
| except Exception as e: | |
| tb_str = traceback.format_exc() | |
| return f"Error listing directory '{path}': {str(e)}\n{tb_str}" | |
| def audio_transcription_tool(file_path: str) -> str: | |
| """Transcribes an audio file (mp3, wav, etc.) to text using Whisper.""" | |
| if not isinstance(file_path, str) or not file_path.strip(): | |
| return "Error: Invalid input. 'file_path' must be a non-empty string." | |
| print(f"--- Calling Audio Transcription: {file_path} ---") | |
| if asr_pipeline is None: | |
| return "Error: ASR pipeline is not available. Audio transcription cannot be performed." | |
| try: | |
| # Find file using same strategy as read_file | |
| script_dir = os.getcwd() | |
| safe_path = os.path.normpath(file_path) | |
| paths_to_try = [ | |
| os.path.join(script_dir, safe_path), | |
| safe_path, | |
| os.path.join(os.getcwd(), os.path.basename(safe_path)) | |
| ] | |
| full_path = None | |
| for attempt_path in paths_to_try: | |
| if os.path.exists(attempt_path): | |
| full_path = attempt_path | |
| break | |
| if not full_path: | |
| return f"Error: Audio file not found: '{file_path}'" | |
| print(f"Transcribing file: {full_path}") | |
| transcription = asr_pipeline(full_path) | |
| result_text = transcription.get("text", "") | |
| if not result_text: | |
| return "Error: Transcription produced no text. The audio file may be empty or corrupted." | |
| # Truncate if too long | |
| if len(result_text) > MAX_MESSAGE_LENGTH: | |
| result_text = result_text[:MAX_MESSAGE_LENGTH] + f"\n...[truncated, original length unknown]" | |
| return f"Transcription:\n{result_text}" | |
| except Exception as e: | |
| tb_str = traceback.format_exc() | |
| return f"Error transcribing '{file_path}': {str(e)}\n{tb_str}" | |
| def get_youtube_transcript(video_url: str) -> str: | |
| """Fetches the transcript/captions for a YouTube video.""" | |
| if not isinstance(video_url, str) or not video_url.strip(): | |
| return "Error: Invalid input. 'video_url' must be a non-empty string." | |
| print(f"--- Calling YouTube Transcript: {video_url} ---") | |
| try: | |
| # Extract video ID | |
| video_id = None | |
| if "watch?v=" in video_url: | |
| video_id = video_url.split("v=")[1].split("&")[0] | |
| elif "youtu.be/" in video_url: | |
| video_id = video_url.split("youtu.be/")[1].split("?")[0] | |
| elif len(video_url) == 11 and video_url.isalnum(): # Direct video ID | |
| video_id = video_url | |
| if not video_id: | |
| return f"Error: Could not extract YouTube video ID from '{video_url}'. Provide a valid YouTube URL." | |
| print(f"Fetching transcript for video ID: {video_id}") | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
| if not transcript_list: | |
| return "Error: No transcript found for this video. It may not have captions available." | |
| full_transcript = " ".join([item["text"] for item in transcript_list]) | |
| # Truncate if too long | |
| if len(full_transcript) > MAX_MESSAGE_LENGTH: | |
| full_transcript = full_transcript[:MAX_MESSAGE_LENGTH] + f"\n...[truncated, {len(full_transcript)} total chars]" | |
| return f"YouTube Transcript:\n{full_transcript}" | |
| except Exception as e: | |
| tb_str = traceback.format_exc() | |
| return f"Error getting transcript for '{video_url}': {str(e)}\nThis video may not have transcripts available.\n{tb_str}" | |
| def scrape_web_page(url: str) -> str: | |
| """Fetches and extracts the main text content from a webpage.""" | |
| if not isinstance(url, str) or not url.strip(): | |
| return "Error: Invalid input. 'url' must be a non-empty string." | |
| if not url.lower().startswith(('http://', 'https://')): | |
| return f"Error: Invalid URL. Must start with http:// or https://. Got: '{url}'" | |
| print(f"--- Calling Web Scraper: {url} ---") | |
| try: | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=20) | |
| response.raise_for_status() | |
| content_type = response.headers.get('Content-Type', '').lower() | |
| if 'html' not in content_type: | |
| return f"Error: URL returned '{content_type}', not HTML. Cannot scrape non-HTML content." | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Remove unwanted elements | |
| for tag in soup(["script", "style", "nav", "footer", "aside", "header", | |
| "form", "button", "input", "img", "link", "meta"]): | |
| tag.extract() | |
| # Try to find main content area | |
| main_content = (soup.find('main') or | |
| soup.find('article') or | |
| soup.find('div', role='main') or | |
| soup.find('div', class_=lambda x: x and 'content' in x.lower()) or | |
| soup.body) | |
| if not main_content: | |
| return "Error: Could not find main content area on the page." | |
| text = main_content.get_text(separator='\n', strip=True) | |
| # Clean up whitespace | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| if not text: | |
| return "Error: Scraped content was empty after cleaning." | |
| # Truncate if too long | |
| if len(text) > MAX_MESSAGE_LENGTH: | |
| text = text[:MAX_MESSAGE_LENGTH] + f"\n...[truncated, {len(text)} total chars]" | |
| return f"Content from {url}:\n\n{text}" | |
| except requests.exceptions.Timeout: | |
| return f"Error: Request to {url} timed out after 20 seconds." | |
| except requests.exceptions.RequestException as req_e: | |
| return f"Error fetching URL {url}: {str(req_e)}" | |
| except Exception as e: | |
| tb_str = traceback.format_exc() | |
| return f"Error scraping {url}: {str(e)}\n{tb_str}" | |
| def final_answer_tool(answer: str) -> str: | |
| """ | |
| Call this tool ONLY when you have the final, definitive answer. | |
| The 'answer' must be EXACTLY what was asked for, with no extra text. | |
| Examples: | |
| - If asked for a number: "42" (not "The answer is 42") | |
| - If asked for a list: "apple, banana, cherry" | |
| - If asked for a name: "John Smith" | |
| """ | |
| if not isinstance(answer, str): | |
| try: | |
| answer = str(answer) | |
| except: | |
| return "Error: Invalid input. 'answer' must be a string." | |
| print(f"--- FINAL ANSWER TOOL CALLED ---") | |
| print(f"Answer: {answer}") | |
| return answer | |
| # --- Helper Function --- | |
| def remove_fences_simple(text): | |
| """Remove code fences from text.""" | |
| original_text = text | |
| text = text.strip() | |
| if text.startswith("```") and text.endswith("```"): | |
| text = text[3:-3].strip() | |
| if '\n' in text: | |
| first_line, rest = text.split('\n', 1) | |
| # Remove language identifier | |
| if first_line.strip().replace('_','').isalnum() and len(first_line.strip()) < 15: | |
| text = rest.strip() | |
| return text | |
| return original_text | |
| # List of all tools | |
| defined_tools = [ | |
| search_tool, | |
| code_interpreter, | |
| read_file, | |
| write_file, | |
| list_directory, | |
| audio_transcription_tool, | |
| get_youtube_transcript, | |
| scrape_web_page, | |
| final_answer_tool | |
| ] | |
| # --- LangGraph Agent State --- | |
| class AgentState(TypedDict): | |
| messages: Annotated[List[AnyMessage], add_messages] | |
| turn: int | |
| # --- Conditional Edge Function --- | |
| def should_continue(state: AgentState): | |
| """Decide whether to continue, call tools, or end.""" | |
| last_message = state['messages'][-1] | |
| current_turn = state.get('turn', 0) | |
| # 1. Check for final_answer_tool | |
| if isinstance(last_message, AIMessage) and last_message.tool_calls: | |
| for tool_call in last_message.tool_calls: | |
| if tool_call.get("name") == "final_answer_tool": | |
| print("--- Condition: final_answer_tool called, ending. ---") | |
| return END | |
| # 2. Check turn limit | |
| if current_turn >= MAX_TURNS: | |
| print(f"--- Condition: Max turns ({MAX_TURNS}) reached. Ending. ---") | |
| state['messages'].append( | |
| SystemMessage(content=f"SYSTEM: Maximum turn limit ({MAX_TURNS}) reached. Ending execution.") | |
| ) | |
| return END | |
| # 3. Route to tools if tool calls exist | |
| if isinstance(last_message, AIMessage) and last_message.tool_calls: | |
| print("--- Condition: Tools called, routing to tools node. ---") | |
| return "tools" | |
| # 4. NEW LOOP PREVENTION: | |
| # Check for consecutive AI messages without tool calls. | |
| # This catches "thinking" loops or raw answer dribbling (like "58"). | |
| if isinstance(last_message, AIMessage) and not last_message.tool_calls: | |
| # Check if the message *before* this one was ALSO an AIMessage. | |
| # We need at least 3 messages total (System, Human, AI-Turn1-Plan) | |
| # for this check to be valid, so we check len > 2. | |
| if len(state['messages']) > 2 and isinstance(state['messages'][-2], AIMessage): | |
| print(f"--- Condition: Detected 2+ consecutive AI messages (Turn {current_turn}). Ending to prevent loop. ---") | |
| state['messages'].append( | |
| SystemMessage(content=f"SYSTEM: Agent stuck in a loop (consecutive non-tool-call AI messages). Ending execution.") | |
| ) | |
| return END | |
| # 5. Default: Loop back to agent (e.g., after Turn 1 plan) | |
| print(f"--- Condition: No tool call (Turn {current_turn}). Continuing to agent. ---") | |
| return "agent" | |
| # ==================================================== | |
| # --- Basic Agent Class --- | |
| class BasicAgent: | |
| def __init__(self): | |
| print("BasicAgent (LangGraph) initializing...") | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| if not GROQ_API_KEY: | |
| raise ValueError("GROQ_API_KEY environment variable is not set!") | |
| self.tools = defined_tools | |
| # Build tool descriptions | |
| tool_desc_list = [] | |
| for tool in self.tools: | |
| if tool.name == 'code_interpreter': | |
| desc = ( | |
| f"- {tool.name}: Executes Python code. Use for calculations, data analysis, logic puzzles.\n" | |
| f" **CRITICAL RULES:**\n" | |
| f" 1. ALWAYS use print() to output results\n" | |
| f" 2. Write simple, focused code (one task per execution)\n" | |
| f" 3. Add comments (#) to explain your logic\n" | |
| f" Available: pandas as pd" | |
| ) | |
| else: | |
| desc = f"- {tool.name}: {tool.description}" | |
| tool_desc_list.append(desc) | |
| tool_descriptions = "\n".join(tool_desc_list) | |
| # ==================== SYSTEM PROMPT V5 ==================== | |
| self.system_prompt = f"""You are a highly intelligent AI assistant for the GAIA benchmark. | |
| Your goal: Provide the EXACT answer in the EXACT format requested. | |
| **PROTOCOL:** | |
| 1. **ANALYZE QUESTION:** | |
| - What information is needed? | |
| - What format should the answer be? | |
| - Are there any files? | |
| 2. **FIRST TURN - MAKE A PLAN:** | |
| - Your FIRST response MUST be a brief plan (2-3 sentences). | |
| - DO NOT call tools on your first turn! Just state the plan. | |
| 3. **EXECUTE:** | |
| - Call ONE tool per turn. | |
| - Wait for the result before planning your next step. | |
| - For ANY calculation or logic: use code_interpreter with print() | |
| 4. **VERIFY RESULTS:** | |
| - Check if tool output contains errors. | |
| - If error: plan a different approach. | |
| - If success: decide if you need more info or have the answer. | |
| 5. **FINISH:** | |
| - When you have the answer from a tool output: | |
| - Call final_answer_tool immediately. | |
| - Provide ONLY the exact answer (no explanations!) | |
| **CRITICAL RULES:** | |
| ❌ NEVER guess or use training data. | |
| ❌ NEVER call multiple tools in one turn. | |
| ❌ NEVER add explanations to final_answer_tool. | |
| ✅ ALWAYS use code_interpreter for calculations/logic. | |
| ✅ ALWAYS match the requested answer format exactly. | |
| ✅ ALWAYS base your answer on tool outputs. | |
| **TOOL CALL FORMATTING (CRITICAL!):** | |
| When you call a tool, you MUST use the exact tool name and provide arguments as valid JSON. | |
| **Example for final_answer_tool:** | |
| {{ "name": "final_answer_tool", "arguments": {{"answer": "The Final Answer"}} }} | |
| **Example for code_interpreter (MUST have 'code' key):** | |
| {{ "name": "code_interpreter", "arguments": {{"code": "print(1 + 1)"}} }} | |
| **Example for search_tool (MUST have 'query' key):** | |
| {{ "name": "search_tool", "arguments": {{"query": "latest news"}} }} | |
| Failure to provide arguments in this exact JSON format will cause an error. | |
| **ANSWER FORMAT EXAMPLES:** | |
| - "What is 5+5?" → final_answer("10") | |
| - "List the colors" → final_answer("red, blue, green") | |
| - "Is it true?" → final_answer("Yes") or final_answer("No") | |
| - "What's the name?" → final_answer("John Smith") | |
| **TOOLS:** | |
| {tool_descriptions} | |
| **REMEMBER:** One tool per turn. Base everything on tool outputs. Match the format exactly. | |
| """ | |
| print("Initializing Groq LLM...") | |
| try: | |
| chat_llm = ChatGroq( | |
| temperature=0, # Maximum determinism | |
| groq_api_key=GROQ_API_KEY, | |
| model_name="openai/gpt-oss-120b", # Best reasoning model | |
| max_tokens=4096, | |
| timeout=60 | |
| ) | |
| print("✅ Groq LLM initialized with llama-3.3-70b-versatile") | |
| except Exception as e: | |
| print(f"❌ Error initializing Groq: {e}") | |
| raise | |
| self.llm_with_tools = chat_llm.bind_tools(self.tools) | |
| print("✅ Tools bound to LLM") | |
| # --- Agent Node --- | |
| def agent_node(state: AgentState): | |
| # --- Turn Counter Logic --- | |
| # We need to check if this is a retry of a failed turn (e.g., Turn 1 violation) | |
| # We identify a retry if the *last* message was our "Protocol Violation" message | |
| last_msg = state['messages'][-1] | |
| is_a_retry = False | |
| if isinstance(last_msg, SystemMessage) and "Protocol Violation" in last_msg.content: | |
| is_a_retry = True | |
| # Get the state's current turn number | |
| current_turn = state.get('turn', 0) | |
| # If this is NOT a retry, increment the turn. | |
| # If it IS a retry, we *stay on the same turn number* | |
| if not is_a_retry: | |
| current_turn += 1 | |
| # Handle the very first run (where state['turn'] is 0) | |
| if current_turn == 0: | |
| current_turn = 1 | |
| # --- End Turn Counter Logic --- | |
| print(f"\n{'='*60}") | |
| print(f"AGENT TURN {current_turn}/{MAX_TURNS}") | |
| if is_a_retry: | |
| print("--- (Re-trying after protocol violation) ---") | |
| print('='*60) | |
| messages_to_send = state["messages"] | |
| # Retry logic with exponential backoff | |
| max_retries = 3 | |
| ai_message = None | |
| for attempt in range(max_retries): | |
| try: | |
| ai_message = self.llm_with_tools.invoke(messages_to_send) | |
| break | |
| except Exception as e: | |
| print(f"⚠️ LLM attempt {attempt+1}/{max_retries} failed: {e}") | |
| if attempt == max_retries - 1: | |
| error_msg = AIMessage( | |
| content=f"Error: LLM failed after {max_retries} attempts: {str(e)}" | |
| ) | |
| return {"messages": [error_msg], "turn": current_turn} | |
| time.sleep(2 ** attempt) # Exponential backoff | |
| # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ | |
| # --- (FIX #1) RULE ENFORCEMENT BLOCK --- | |
| # | |
| # If it's Turn 1 AND the agent tried to call tools, we reject it | |
| # and force it to re-do Turn 1. | |
| if current_turn == 1 and ai_message.tool_calls: | |
| print("⚠️ AGENT VIOLATION: Tried to call tools on Turn 1. Forcing replan.") | |
| # Strip the illegal tool call | |
| ai_message.tool_calls = [] | |
| # Create the correction message that forces the plan | |
| correction_message = SystemMessage( | |
| content="SYSTEM: Protocol Violation. Your FIRST turn MUST be a plan with NO tool calls. " | |
| "You are not allowed to call any tools on your first turn. " | |
| "Re-read the protocol and provide your 2-3 sentence plan now." | |
| ) | |
| # Return the messages. | |
| # Critically, we set the state's turn counter back to 1. | |
| # This ensures the *next* run of this node is *still* Turn 1. | |
| return {"messages": [ai_message, correction_message], "turn": 1} | |
| # --- END OF RULE ENFORCEMENT BLOCK --- | |
| # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ | |
| # --- FIX #2: REPLACE THE FALLBACK PARSING BLOCK --- | |
| # | |
| # --- Fallback Parsing --- | |
| # Check if LLM failed to format tool call and put it in 'content' | |
| if not ai_message.tool_calls and isinstance(ai_message.content, str) and ai_message.content.strip(): | |
| content = ai_message.content | |
| tool_name = None | |
| tool_input = None | |
| # 1. Try to parse the new <function(tool_name)>{json}</function> format | |
| # Note: We look for </function> optionally, as it might be truncated | |
| func_match = re.search( | |
| r"<function\(([^)]+)\)>(\{.*?\})(?:</function>)?", | |
| content, | |
| re.DOTALL | re.IGNORECASE | |
| ) | |
| if func_match: | |
| try: | |
| tool_name = func_match.group(1).strip() | |
| json_str = func_match.group(2) | |
| tool_input = json.loads(json_str) | |
| print(f"🔧 Fallback (Format 1): Parsed tool call for '{tool_name}'") | |
| except json.JSONDecodeError as e: | |
| print(f"⚠️ Fallback (Format 1): Failed to parse JSON: {e}") | |
| tool_name = None # Reset | |
| # 2. If Format 1 failed, try to parse bare JSON (old fallback) | |
| if not tool_name: | |
| json_match = re.search( | |
| r"```(?:json)?\s*(\{.*?\})\s*```|(\{.*?\})", | |
| content, | |
| re.DOTALL | re.IGNORECASE | |
| ) | |
| if json_match: | |
| json_str = json_match.group(1) or json_match.group(2) | |
| try: | |
| parsed_json = json.loads(json_str) | |
| # This format is less structured; we guess tool from keys | |
| if isinstance(parsed_json, dict): | |
| if "tool" in parsed_json and "tool_input" in parsed_json: | |
| tool_name = parsed_json.get("tool") | |
| tool_input = parsed_json.get("tool_input", {}) | |
| elif "code" in parsed_json: # Guess code_interpreter | |
| tool_name = "code_interpreter" | |
| tool_input = parsed_json | |
| elif "answer" in parsed_json: # Guess final_answer | |
| tool_name = "final_answer_tool" | |
| tool_input = parsed_json | |
| if tool_name: | |
| print(f"🔧 Fallback (Format 2): Parsed tool call for '{tool_name}'") | |
| except json.JSONDecodeError as e: | |
| print(f"⚠️ Fallback (Format 2): Failed to parse JSON: {e}") | |
| # --- If any fallback parser succeeded, build the tool call --- | |
| if tool_name and tool_input is not None and any(t.name == tool_name for t in self.tools): | |
| print(f"🔧 Fallback SUCCESS: Rebuilding tool call for '{tool_name}'") | |
| tool_call = ToolCall( | |
| name=tool_name, | |
| args=tool_input, | |
| id=str(uuid.uuid4()) | |
| ) | |
| ai_message.tool_calls = [tool_call] | |
| ai_message.content = "" # Clear content field | |
| elif not tool_name: | |
| print(f"⚠️ Fallback FAILED: Could not parse any tool call from content:\n{content[:200]}...") | |
| # --- END OF REPLACEMENT BLOCK --- | |
| # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ | |
| # --- Logging --- | |
| if ai_message.tool_calls: | |
| for tc in ai_message.tool_calls: | |
| print(f"🔧 Tool Call: {tc.get('name')}") | |
| print(f" Args: {tc.get('args', {})}") | |
| elif ai_message.content: | |
| content_preview = ai_message.content[:300] | |
| if len(ai_message.content) > 300: | |
| content_preview += "..." | |
| print(f"💭 Agent Reasoning:\n{content_preview}") | |
| return {"messages": [ai_message], "turn": current_turn} | |
| # --- Tool Node --- | |
| tool_node = ToolNode(self.tools) | |
| # --- Build Graph --- | |
| print("Building agent graph...") | |
| graph_builder = StateGraph(AgentState) | |
| graph_builder.add_node("agent", agent_node) | |
| graph_builder.add_node("tools", tool_node) | |
| graph_builder.add_edge(START, "agent") | |
| graph_builder.add_edge("tools", "agent") | |
| graph_builder.add_conditional_edges( | |
| "agent", | |
| should_continue, | |
| { | |
| "tools": "tools", | |
| "agent": "agent", | |
| END: END | |
| } | |
| ) | |
| self.graph = graph_builder.compile() | |
| print("✅ Graph compiled successfully") | |
| def __call__(self, question: str) -> str: | |
| print(f"\n--- Starting Agent Run for Question ---") | |
| print(f"Agent received question (first 100 chars): {question[:100]}...") | |
| # Initialize graph input with turn counter | |
| graph_input = { | |
| "messages": [ | |
| SystemMessage(content=self.system_prompt), | |
| HumanMessage(content=question) | |
| ], | |
| "turn": 0 | |
| } | |
| final_answer = "AGENT FAILED TO PRODUCE ANSWER" | |
| try: | |
| # Add config for recursion limit (LangGraph default is 25, but our turn limit is softer) | |
| config = {"recursion_limit": MAX_TURNS + 5} # Allow slightly more graph steps than turns | |
| for event in self.graph.stream(graph_input, stream_mode="values", config=config): | |
| last_message = event["messages"][-1] | |
| # Check for final answer extraction | |
| if isinstance(last_message, AIMessage) and last_message.tool_calls: | |
| if last_message.tool_calls[0].get("name") == "final_answer_tool": | |
| final_answer = last_message.tool_calls[0]['args'].get('answer', "ERROR: FINAL_ANSWER_TOOL CALLED WITHOUT ANSWER") | |
| print(f"--- Final Answer Captured from tool call: '{final_answer}' ---") | |
| # We can break here since the graph condition should lead to END anyway | |
| break | |
| # Log other message types (optional but helpful) | |
| elif isinstance(last_message, ToolMessage): | |
| print(f"Tool Result ({last_message.tool_call_id}): {last_message.content[:500]}...") | |
| elif isinstance(last_message, AIMessage) and not last_message.tool_calls: | |
| # This is now expected (the "plan" or "think" step) | |
| print(f"AI Message (Plan/Thought): {last_message.content[:500]}...") | |
| # Don't set final_answer here anymore, only final_answer_tool counts | |
| # --- Cleaning step (Keep as is) --- | |
| cleaned_answer = str(final_answer).strip() | |
| # ... (keep existing prefix removal and fence removal logic) ... | |
| prefixes_to_remove = ["The answer is:", "Here is the answer:", "Based on the information:", "Final Answer:", "Answer:"] | |
| original_cleaned = cleaned_answer | |
| for prefix in prefixes_to_remove: | |
| if cleaned_answer.lower().startswith(prefix.lower()): | |
| potential_answer = cleaned_answer[len(prefix):].strip() | |
| if potential_answer: cleaned_answer = potential_answer; break | |
| if cleaned_answer == original_cleaned and any(cleaned_answer.lower().startswith(p.lower()) for p in prefixes_to_remove): | |
| print(f"Warning: Prefix found but not stripped: '{original_cleaned[:100]}...'") | |
| # Simple fence removal | |
| cleaned_answer = remove_fences_simple(cleaned_answer) | |
| if cleaned_answer.startswith("`") and cleaned_answer.endswith("`"): | |
| cleaned_answer = cleaned_answer[1:-1].strip() | |
| print(f"Agent returning final answer (cleaned): '{cleaned_answer}'") | |
| return cleaned_answer | |
| except Exception as e: | |
| print(f"Error running agent graph: {e}") | |
| tb_str = traceback.format_exc() | |
| print(tb_str) | |
| # Check if it was specifically our turn limit message | |
| if isinstance(e, SystemMessage) and f"maximum turn limit ({MAX_TURNS})" in str(e.content): | |
| return f"AGENT STOPPED: Reached maximum turn limit ({MAX_TURNS})." | |
| return f"AGENT GRAPH ERROR: {e}" | |
| # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ | |
| # --- (Original Template Code - Mock Questions Version) --- | |
| def run_and_submit_all( profile: gr.OAuthProfile | None): | |
| """ | |
| Fetches MOCK questions, runs the BasicAgent on them, simulates submission prep, | |
| and displays the results. DOES NOT SUBMIT. | |
| """ | |
| space_id = os.getenv("SPACE_ID") | |
| username = profile.username if profile else "local_test_user" | |
| print(f"User: {username}{'' if profile else ' (dummy)'}") | |
| submit_url = f"{DEFAULT_API_URL}/submit" | |
| print("Instantiating agent...") | |
| try: | |
| agent = BasicAgent() | |
| if asr_pipeline is None: print("⚠️ Global ASR Pipeline failed load.") | |
| except Exception as e: print(f"Error instantiating agent: {e}"); import traceback; traceback.print_exc(); return f"Error initializing agent: {e}", None | |
| print("Agent instantiated successfully.") | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local_run" | |
| print(f"Agent code URL: {agent_code}") | |
| print("--- USING MOCK QUESTIONS ---") | |
| mock_questions_data = [ | |
| { | |
| "task_id": "mock_level1_001", | |
| "question": r"""Here's a fun riddle that I'd like you to try.\n\nAn adventurer exploring an ancient tomb came across a horde of gold coins, all neatly stacked in columns. As he reached to scoop them into his backpack, a mysterious voice filled the room. \"You have fallen for my trap adventurer,\" the voice began, and suddenly the doorway to the chamber was sealed by a heavy rolling disk of stone. The adventurer tried to move the stone disk but was unable to budge the heavy stone. Trapped, he was startled when the voice again spoke. \n\n\"If you solve my riddle, I will reward you with a portion of my riches, but if you are not clever, you will never leave this treasure chamber. Before you are 200 gold coins. I pose a challenge to you, adventurer. Within these stacks of coins, all but 30 are face-up. You must divide the coins into two piles, one is yours, and one is mine. You may place as many coins as you like in either pile. You may flip any coins over, but you may not balance any coins on their edges. For every face-down coin in your pile, you will be rewarded with two gold coins. But be warned, if both piles do not contain the same number of face-down coins, the door will remain sealed for all eternity!\"\n\nThe adventurer smiled, as this would be an easy task. All he had to do was flip over every coin so it was face down, and he would win the entire treasure! As he moved to the columns of coins, however, the light suddenly faded, and he was left in total darkness. The adventurer reached forward and picked up one of the coins, and was shocked when he realized that both sides felt almost the same. Without the light, he was unable to determine which side of the coin was heads and which side was tails. He carefully replaced the coin in its original orientation and tried to think of a way to solve the puzzle. Finally, out of desperation, the adventurer removed 30 coins to create his pile. He then carefully flipped over each coin in his pile, so its orientation was inverted from its original state.\n\n\"I've finished,\" he said, and the lights returned. Looking at the two piles, he noticed that the larger pile contained 14 face-down coins.\n\nWhat was the outcome for the adventurer? If he failed the challenge, please respond with \"The adventurer died.\" Otherwise, please provide the number of coins the adventurer won at the conclusion of the riddle. If the adventurer won any coins, provide your response as the number of coins, with no other text.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_002", | |
| "question": r"""If you use some of the letters in the given Letter Bank to spell out the sentence "I am a penguin halfway to the moon", which of the remaining unused letters would have to be changed to spell out, "The moon is made of cheese"? Return a comma-separated alphabetized list.\nLetter Bank: {OAMFETIMPECRFSHTDNIWANEPNOFAAIYOOMGUTNAHHLNEHCME}""" | |
| }, | |
| { | |
| "task_id": "mock_level1_003", | |
| "question": r"""A data annotator stayed up too late creating test questions to check that a system was working properly and submitted several questions with mathematical errors. On nights when they created 15 test questions, they made 1 error. On nights when they created fewer than 15 questions, they also corrected 3 errors. On nights they created 20 questions, they made 0 errors. On nights when they created 25 or more, they made 4 errors. Over the course of five nights, the worker produced a total of 6 errors. When asked how many nights they created 15 questions, they gave three possible numbers as responses. What are the three numbers, presented in the format x, y, z in ascending order?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_004", | |
| "question": r"""Please solve the following crossword:\n\n|1|2|3|4|5|\n|6| | | | |\n|7| | | | |\n|8| | | | |\n|X|9| | | |\n\nI have indicated by numbers where the hints start, so you should replace numbers and spaces by the answers.\nAnd X denotes a black square that isn\u2019t to fill.\n\nACROSS\n- 1 Wooden strips on a bed frame\n- 6 _ Minhaj, Peabody-winning comedian for "Patriot Act"\n- 7 Japanese city of 2.6+ million\n- 8 Stopwatch, e.g.\n- 9 Pain in the neck\n\nDOWN\n- 1 Quick drink of whiskey\n- 2 Eye procedure\n- 3 "Same here," in a three-word phrase\n- 4 Already occupied, as a seat\n- 5 Sarcastically critical commentary. Answer by concatenating the characters you choose to fill the crossword, in row-major order.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_005", | |
| "question": r"""I wanted to make another batch of cherry melomel. I remember liking the last recipe I tried, but I can't remember it off the top of my head. It was from the Reddit, r/mead. I remember that the user who made it had a really distinct name, I think it was StormBeforeDawn. Could you please look up the recipe for me? I'm not sure if it has been changed, so please make sure that the recipe you review wasn't updated after July 14, 2022. That's the last time I tried the recipe.\n\nWhat I want to know is how many cherries I'm supposed to use. I'm making a 10-gallon batch in two 5-gallon carboys. Please just respond with the integer number of pounds of whole cherries with pits that are supposed to be used for a 10-gallon batch.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_006", | |
| "question": r"""Verify each of the following ISBN 13 numbers:\n\n1. 9783518188156\n2. 9788476540746\n3. 9788415091004\n4. 9788256014590\n5. 9782046407331\n\nIf any are invalid, correct them by changing the final digit. Then, return the list, comma separated, in the same order as in the question.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_007", | |
| "question": r"""A porterhouse by any other name is centered around a letter. What does Three Dog Night think about the first natural number that starts with that letter? Give the first line from the lyrics that references it.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_008", | |
| "question": r"""Bob has genome type Aa, and Linda has genome type Aa. Assuming that a child of theirs also has a child with someone who also has genome type Aa, what is the probability that Bob and Linda's grandchild will have Genome type Aa? Write the answer as a percentage, rounding to the nearest integer if necessary.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_009", | |
| "question": r"""An array of candy is set out to choose from including gumballs, candy corn, gumdrops, banana taffy, chocolate chips, and gummy bears. There is one bag of each type of candy. The gumballs come in red, orange, yellow, green, blue, and brown. The candy corn is yellow, white, and orange. The gumdrops are red, green, purple, yellow, and orange. The banana taffy is yellow. The chocolate chips are brown and white. The gummy bears are red, green, yellow, and orange. Five people pass through and each selects one bag. The first selects one with only primary colors. The second selects one with no primary colors. The third selects one with all the primary colors. The fourth selects one that has neither the most nor the least colors of the remaining bags. The fifth selects the one with their favorite color, green. A second bag of the candy the first person chose is added to the remaining bag of candy. Which two candies are in the remaining bag after the addition? Give me them in a comma separated list, in alphabetical order""" | |
| }, | |
| { | |
| "task_id": "mock_level1_010", | |
| "question": r"""In the year 2020, where were koi fish found in the watershed with the id 02040203? Give only the name of the pond, lake, or stream where the fish were found, and not the name of the city or county.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_011", | |
| "question": r"""In Sonia Sanchez\u2019s poem \u201cfather\u2019s voice\u201d, what primary colour is evoked by the imagery in the beginning of the tenth stanza? Answer with a capitalized word.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_012", | |
| "question": r"""According to Papers with Code, what was the name of the first model to go beyond 70% of accuracy on ImageNet ?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_013", | |
| "question": r"""What is the dimension of the boundary of the tame twindragon rounded to two decimal places?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_014", | |
| "question": r"""In what year was the home village of the subject of British Museum item #Bb,11.118 founded?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_015", | |
| "question": r"""What is the ISSN of the journal that included G. Scott's potato article that mentioned both a fast food restaurant and a Chinese politician in the title in a 2012 issue?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_016", | |
| "question": r"""VNV Nation has a song that shares its title with the nickname of Louis XV. What album was it released with?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_017", | |
| "question": r"""If I combine a Beatle's first name and a type of beer, in what category and year of Nobel Prize do I have a winner? Answer using the format CATEGORY, YEAR.""" | |
| }, | |
| { | |
| "task_id": "mock_level1_018", | |
| "question": r"""In the version of NumPy where the numpy.msort function was deprecated, which attribute was added to the numpy.polynomial package's polynomial classes?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_019", | |
| "question": r"""A word meaning dramatic or theatrical forms a species of duck when appended with two letters and then duplicated. What is that word?""" | |
| }, | |
| { | |
| "task_id": "mock_level1_020", | |
| "question": r"""As of August 2023, how many in-text citations on the West African Vodun Wikipedia page reference a source that was cited using Scopus?""" | |
| } | |
| ] | |
| questions_data = mock_questions_data | |
| print(f"Using {len(questions_data)} mock questions.") | |
| results_log, answers_payload = [], [] | |
| print(f"Running agent on {len(questions_data)} mock questions...") | |
| for i, item in enumerate(questions_data): | |
| task_id, question_text = item.get("task_id"), item.get("question") | |
| if not task_id or question_text is None: print(f"Skipping mock item {i+1}"); continue | |
| print(f"\n--- Running Mock Task {i+1} (ID: {task_id}) ---") | |
| try: | |
| file_path = item.get("file_path") | |
| question_text_with_context = question_text | |
| if file_path: | |
| base_dir = os.getcwd() | |
| potential_path = os.path.join(base_dir, file_path) | |
| file_context = f"[Attached File (provided): {file_path}]" | |
| if os.path.exists(potential_path): file_context = f"[Attached File (exists): {file_path}]" | |
| else: file_context = f"[Attached File (NOT FOUND): {file_path}]" | |
| question_text_with_context = f"{question_text}\n\n{file_context}" | |
| print(f"Q includes file: {file_path}") | |
| submitted_answer = agent(question_text_with_context) | |
| submitted_answer_str = str(submitted_answer) if submitted_answer is not None else "" | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer_str}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer_str}) | |
| print(f"--- Mock Task {task_id} Complete ---") | |
| except Exception as e: | |
| print(f"FATAL ERROR on mock task {task_id}: {e}") | |
| import traceback; traceback.print_exc() | |
| submitted_answer = f"AGENT CRASH: {e}" | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
| if not answers_payload: return "Agent produced no answers.", pd.DataFrame(results_log) | |
| status_update = f"Finished mock run. Processed {len(answers_payload)} answers for '{username}'." | |
| print(status_update); print("--- MOCK RUN - SUBMISSION SKIPPED ---") | |
| final_status = "--- MOCK RUN COMPLETE ---\n" + status_update + "\nSubmission SKIPPED." | |
| results_df = pd.DataFrame(results_log); results_df['Correct'] = 'N/A (Mock)' | |
| return final_status, results_df | |
| # --- Build Gradio Interface --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# GAIA Agent - MOCK TEST (Groq Llama3.1)") | |
| gr.Markdown(""" | |
| **Instructions:** Click 'Run Mock Evaluation'. | |
| **Notes:** Uses Groq (Llama 3.1 8B). Ensure `GROQ_API_KEY` secret/env var exists. **DOES NOT** fetch official Qs or submit. Check logs for details. | |
| """) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Mock Evaluation") | |
| status_output = gr.Textbox(label="Run Status / Mock Result", lines=5, interactive=False) | |
| results_table = gr.DataFrame(label="Mock Qs, Agent Answers, Results", wrap=True) | |
| run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " App Starting " + "-"*30) | |
| space_host_startup = os.getenv("SPACE_HOST"); space_id_startup = os.getenv("SPACE_ID") | |
| if space_host_startup: print(f"✅ SPACE_HOST: {space_host_startup}\n Runtime URL: https://{space_host_startup}.hf.space") | |
| else: print("ℹ️ No SPACE_HOST (local?).") | |
| if space_id_startup: print(f"✅ SPACE_ID: {space_id_startup}\n Repo URL: https://huggingface.co/spaces/{space_id_startup}\n Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
| else: print("ℹ️ No SPACE_ID (local?).") | |
| try: script_dir = os.path.dirname(os.path.realpath(__file__)) | |
| except NameError: script_dir = os.getcwd() | |
| print(f"Script directory: {script_dir}") | |
| print(f"CWD: {os.getcwd()}") | |
| try: print("Files in CWD:", os.listdir(".")) | |
| except FileNotFoundError: print("Warning: CWD listing failed.") | |
| print("-"*(60 + len(" App Starting ")) + "\n") | |
| print("Launching Gradio Interface...") | |
| demo.queue().launch(debug=True, share=False) |