Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| from smolagents import Tool, CodeAgent, Model | |
| # Import internal modules | |
| from config import ( | |
| DEFAULT_API_URL, | |
| USE_LLAMACPP, | |
| LLAMACPP_CONFIG | |
| ) | |
| from tools.tool_manager import ToolManager | |
| from utils.llama_cpp_model import LlamaCppModel | |
| class GaiaToolCallingAgent: | |
| """Tool-calling agent specifically designed for the GAIA system.""" | |
| def __init__(self, local_model=None): | |
| print("GaiaToolCallingAgent initialized.") | |
| self.tool_manager = ToolManager() | |
| self.name = "tool_agent" | |
| self.description = "A specialized agent that uses various tools to answer questions" | |
| self.local_model = local_model | |
| if not self.local_model: | |
| try: | |
| from utils.llama_cpp_model import LlamaCppModel | |
| self.local_model = LlamaCppModel( | |
| max_tokens=512 | |
| ) | |
| except Exception as e: | |
| print(f"Couldn't initialize local model in tool agent: {e}") | |
| self.local_model = None | |
| def run(self, query: str) -> str: | |
| print(f"Processing query: {query}") | |
| tools = self.tool_manager.get_tools() | |
| context_info = [] | |
| for tool in tools: | |
| try: | |
| if self._should_use_tool(tool, query): | |
| print(f"Using tool: {tool.name}") | |
| result = tool.forward(query) | |
| if result: | |
| context_info.append(f"{tool.name} Results:\n{result}") | |
| except Exception as e: | |
| print(f"Error using {tool.name}: {e}") | |
| full_context = "\n\n".join(context_info) if context_info else "" | |
| if full_context and self.local_model: | |
| try: | |
| prompt = f""" | |
| Based on the following information, please provide a comprehensive answer to the question: "{query}" | |
| CONTEXT INFORMATION: | |
| {full_context} | |
| Answer: | |
| """ | |
| response = self.local_model.generate(prompt) | |
| return response | |
| except Exception as e: | |
| print(f"Error generating response with local model: {e}") | |
| return full_context | |
| else: | |
| if not full_context: | |
| return "I couldn't find any relevant information to answer your question." | |
| return full_context | |
| def __call__(self, query: str) -> str: | |
| print(f"Tool agent received query: {query}") | |
| return self.run(query) | |
| def _should_use_tool(self, tool: Tool, query: str) -> bool: | |
| query_lower = query.lower() | |
| patterns = { | |
| "web_search": ["current", "latest", "recent", "who", "what", "when", "where", "how"], | |
| "web_content": ["content", "webpage", "website", "page"], | |
| "youtube_video": ["youtube.com", "youtu.be"], | |
| "wikipedia_search": ["wikipedia", "wiki", "article"], | |
| "gaia_retriever": ["gaia", "agent", "ai", "artificial intelligence"] | |
| } | |
| if tool.name not in patterns: | |
| return True | |
| return any(pattern in query_lower for pattern in patterns.get(tool.name, [])) | |
| def download_model_if_needed(model_path, model_url): | |
| if not os.path.exists(model_path): | |
| print(f"Downloading model from {model_url}...") | |
| os.makedirs(os.path.dirname(model_path), exist_ok=True) | |
| with requests.get(model_url, stream=True) as response: | |
| response.raise_for_status() | |
| with open(model_path, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print("Download complete.") | |
| def create_manager_agent() -> CodeAgent: | |
| try: | |
| from config import USE_LLAMACPP, LLAMACPP_CONFIG | |
| if USE_LLAMACPP: | |
| # Use TheBloke's model with auto-download | |
| model_path = LLAMACPP_CONFIG.get("model_path") or "./models/llama-2-7b.Q4_0.gguf" | |
| model_url = "https://huggingface.co/TheBloke/Llama-2-7B-GGUF/resolve/main/llama-2-7b.Q4_0.gguf" | |
| download_model_if_needed(model_path, model_url) | |
| model = LlamaCppModel( | |
| model_path=model_path, | |
| n_ctx=LLAMACPP_CONFIG.get("n_ctx", 2048), | |
| n_gpu_layers=LLAMACPP_CONFIG.get("n_gpu_layers", 0), | |
| temperature=LLAMACPP_CONFIG.get("temperature", 0.7) | |
| ) | |
| print(f"Using LlamaCpp model from {model_path}") | |
| else: | |
| from smolagents import StubModel | |
| model = StubModel() | |
| print("Using StubModel as fallback") | |
| except Exception as e: | |
| print(f"Error setting up model: {e}") | |
| try: | |
| model = LlamaCppModel() | |
| print("Using fallback LlamaCpp model configuration") | |
| except Exception as e2: | |
| from smolagents import StubModel | |
| model = StubModel() | |
| print(f"Using StubModel due to error: {e2}") | |
| tool_agent = GaiaToolCallingAgent(local_model=model) | |
| manager_agent = CodeAgent( | |
| model=model, | |
| tools=[], | |
| managed_agents=[tool_agent], | |
| additional_authorized_imports=[ | |
| "json", "pandas", "numpy", "re", "requests", "bs4" | |
| ], | |
| planning_interval=3, | |
| verbosity_level=2, | |
| max_steps=10 | |
| ) | |
| print("Manager agent created with local model") | |
| return manager_agent | |
| def create_agent(): | |
| try: | |
| print("Initializing GAIA agent system...") | |
| return create_manager_agent() | |
| except Exception as e: | |
| print(f"Error creating GAIA agent: {e}") | |
| return None | |
| def run_and_submit_all(profile: gr.OAuthProfile | None): | |
| space_id = os.getenv("SPACE_ID") | |
| if profile: | |
| username = f"{profile.username}" | |
| print(f"User logged in: {username}") | |
| else: | |
| print("User not logged in.") | |
| return "Please Login to Hugging Face with the button.", None | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| submit_url = f"{api_url}/submit" | |
| try: | |
| print("Initializing GAIA agent system...") | |
| agent = create_agent() | |
| if not agent: | |
| return "Error: Could not initialize agent.", None | |
| print("GAIA agent initialization complete.") | |
| except Exception as e: | |
| print(f"Error initializing agent: {e}") | |
| return f"Error initializing agent: {e}", None | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| print("Fetched questions list is empty.") | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching questions: {e}") | |
| return f"Error fetching questions: {e}", None | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching questions: {e}") | |
| return f"An unexpected error occurred fetching questions: {e}", None | |
| results_log = [] | |
| answers_payload = [] | |
| print(f"Running agent on {len(questions_data)} questions...") | |
| for item in questions_data: | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| if not task_id or question_text is None: | |
| print(f"Skipping item with missing task_id or question: {item}") | |
| continue | |
| try: | |
| response = agent.run(f"Answer this question concisely: {question_text}") | |
| if isinstance(response, dict): | |
| submitted_answer = response.get("answer", str(response)) | |
| else: | |
| submitted_answer = str(response) | |
| answers_payload.append({ | |
| "task_id": task_id, | |
| "submitted_answer": submitted_answer | |
| }) | |
| results_log.append({ | |
| "Task ID": task_id, | |
| "Question": question_text, | |
| "Submitted Answer": submitted_answer | |
| }) | |
| except Exception as e: | |
| print(f"Error running agent on task {task_id}: {e}") | |
| results_log.append({ | |
| "Task ID": task_id, | |
| "Question": question_text, | |
| "Submitted Answer": f"AGENT ERROR: {e}" | |
| }) | |
| if not answers_payload: | |
| print("Agent did not produce any answers to submit.") | |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| submission_data = { | |
| "username": username.strip(), | |
| "agent_code": agent_code, | |
| "answers": answers_payload | |
| } | |
| print(f"Submitting {len(answers_payload)} answers to API...") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| status_message = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| print("Submission successful.") | |
| return status_message, pd.DataFrame(results_log) | |
| except Exception as e: | |
| status_message = f"Submission Failed: {str(e)}" | |
| print(f"Error during submission: {e}") | |
| return status_message, pd.DataFrame(results_log) | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# GAIA Agent Evaluation Runner") | |
| gr.Markdown(""" | |
| **Instructions:** | |
| 1. Log in to your Hugging Face account using the button below. | |
| 2. Click 'Run Evaluation & Submit All Answers' to fetch questions, run the agent, and see the score. | |
| """) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Evaluation & Submit All Answers") | |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
| run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " GAIA Agent Starting " + "-"*30) | |
| demo.launch(debug=True, share=False) | |