| import os |
| import gradio as gr |
| import requests |
| import inspect |
| import asyncio |
| from typing import Dict, List, AsyncGenerator, Union, Tuple, Optional |
|
|
| |
| from langchain_core.messages import HumanMessage, AIMessage, BaseMessage |
| from langchain_core.tools import tool |
| from langchain_openai import ChatOpenAI |
| from langgraph.checkpoint.memory import MemorySaver |
| from langgraph.prebuilt import create_react_agent |
|
|
| |
| DEFAULT_API_URL = "http://127.0.0.1:8000" |
|
|
| |
| @tool |
| def get_lat_lng(location_description: str) -> dict[str, float]: |
| """Get the latitude and longitude of a location.""" |
| print(f"Tool: Getting lat/lng for {location_description}") |
| |
| if "tokyo" in location_description.lower(): |
| return {"lat": 35.6895, "lng": 139.6917} |
| elif "paris" in location_description.lower(): |
| return {"lat": 48.8566, "lng": 2.3522} |
| elif "new york" in location_description.lower(): |
| return {"lat": 40.7128, "lng": -74.0060} |
| else: |
| return {"lat": 51.5072, "lng": -0.1276} |
|
|
| @tool |
| def get_weather(lat: float, lng: float) -> dict[str, str]: |
| """Get the weather at a location.""" |
| print(f"Tool: Getting weather for lat={lat}, lng={lng}") |
| |
| if lat > 45: |
| return {"temperature": "15°C", "description": "Cloudy"} |
| elif lat > 30: |
| return {"temperature": "25°C", "description": "Sunny"} |
| else: |
| return {"temperature": "30°C", "description": "Very Sunny"} |
|
|
| |
| class MyLangChainAgent: |
| """ |
| A sample LangChain agent class designed for interaction and submission. |
| NOTE: The current tools (weather/location) are placeholders and WILL NOT |
| correctly answer GAIA benchmark questions. This class structure |
| demonstrates how to integrate an agent with the submission API. |
| Replace LLM, tools, and potentially the agent type for actual GAIA tasks. |
| """ |
| def __init__(self, model_name="gpt-4", temperature=0): |
| |
| if not os.getenv("OPENAI_API_KEY"): |
| raise ValueError("OPENAI_API_KEY environment variable not set.") |
|
|
| self.llm = ChatOpenAI(temperature=temperature, model=model_name) |
| self.tools = [get_lat_lng, get_weather] |
| self.memory = MemorySaver() |
| |
| self.agent_executor = create_react_agent(self.llm, self.tools, checkpointer=self.memory) |
| print("MyLangChainAgent initialized.") |
|
|
| async def __call__(self, question: str, thread_id: str) -> AsyncGenerator[Union[str, Dict[str, str]], str]: |
| """ |
| Runs the agent asynchronously, yielding intermediate steps and returning the final answer. |
| |
| Args: |
| question: The input question string. |
| thread_id: A unique identifier for the conversation thread. |
| |
| Yields: |
| Intermediate steps (tool calls/results) as strings or dicts. |
| |
| Returns: |
| The final AI answer as a string. |
| """ |
| print(f"Agent executing for thread_id: {thread_id} on question: {question[:50]}...") |
| lc_messages: List[BaseMessage] = [HumanMessage(content=question)] |
| final_answer = "" |
| full_response_content = "" |
|
|
| async for chunk in self.agent_executor.astream_events( |
| {"messages": lc_messages}, |
| config={"configurable": {"thread_id": thread_id}}, |
| version="v1" |
| ): |
| event = chunk["event"] |
| data = chunk["data"] |
| |
|
|
| if event == "on_chat_model_stream": |
| content = data["chunk"].content |
| if content: |
| |
| full_response_content += content |
| |
| |
|
|
| elif event == "on_tool_start": |
| tool_input_str = str(data.get('input', '')) |
| yield f"🛠️ Using tool: **{data['name']}** with input: `{tool_input_str}`" |
|
|
| elif event == "on_tool_end": |
| tool_output_str = str(data.get('output', '')) |
| yield f"✅ Tool **{data['name']}** finished.\nResult: `{tool_output_str}`" |
| |
| |
| |
| |
| |
| |
|
|
| |
| final_answer = full_response_content.strip() |
| print(f"Agent execution finished. Final Answer: {final_answer[:100]}...") |
| |
| |
| return final_answer |
|
|
| def __repr__(self) -> str: |
| """ |
| Return the source code required to reconstruct this agent, including |
| the class definition, tool functions, and necessary imports. |
| """ |
| imports = [ |
| "import os", |
| "from typing import Dict, List, AsyncGenerator, Union, Tuple, Optional", |
| "from langchain_core.messages import HumanMessage, AIMessage, BaseMessage", |
| "from langchain_core.tools import tool", |
| "from langchain_openai import ChatOpenAI", |
| "from langgraph.checkpoint.memory import MemorySaver", |
| "from langgraph.prebuilt import create_react_agent", |
| "import inspect", |
| "import asyncio", |
| "\n" |
| ] |
| |
| tool_sources = [] |
| for t in self.tools: |
| try: |
| tool_sources.append(inspect.getsource(t)) |
| except (TypeError, OSError) as e: |
| print(f"Warning: Could not get source for tool {t.__name__}: {e}") |
| tool_sources.append(f"# Could not automatically get source for tool: {t.__name__}\n") |
|
|
| |
| class_source = inspect.getsource(MyLangChainAgent) |
|
|
| |
| full_source = "\n".join(imports) + "\n\n" + \ |
| "\n\n".join(tool_sources) + "\n\n" + \ |
| class_source |
| return full_source |
|
|
|
|
| |
|
|
| |
| |
| try: |
| agent_instance = MyLangChainAgent() |
| except ValueError as e: |
| print(f"ERROR initializing agent: {e}") |
| |
| agent_instance = None |
|
|
| def format_chat_history(history: List[List[Optional[str]]]) -> List[Tuple[Optional[str], Optional[str]]]: |
| """Helper to format Gradio history for display.""" |
| |
| |
| formatted = [] |
| for turn in history: |
| formatted.append(tuple(turn)) |
| return formatted |
|
|
|
|
| async def fetch_and_display_question(api_url: str): |
| """Calls the backend to get a random question.""" |
| if not api_url: |
| return "Please enter the API URL.", "", "", gr.update(value=""), gr.update(value="") |
| |
| question_url = f"{api_url.strip('/')}/random-question" |
| print(f"Fetching question from: {question_url}") |
| try: |
| response = requests.get(question_url, timeout=10) |
| response.raise_for_status() |
| data = response.json() |
| task_id = data.get("task_id") |
| question_text = data.get("question") |
| if task_id and question_text: |
| print(f"Fetched Task ID: {task_id}") |
| |
| return "Question fetched successfully!", task_id, question_text, "", [] |
| else: |
| return "Error: Invalid data format received from API.", "", "", "", [] |
| except requests.exceptions.RequestException as e: |
| print(f"Error fetching question: {e}") |
| return f"Error fetching question: {e}", "", "", "", [] |
| except Exception as e: |
| print(f"An unexpected error occurred: {e}") |
| return f"An unexpected error occurred: {e}", "", "", "", [] |
|
|
|
|
| async def run_agent_interaction( |
| message: str, |
| history: List[List[Optional[str]]], |
| current_task_id: str, |
| |
| ): |
| """Handles the chat interaction, runs the agent, yields steps, updates final answer state.""" |
| if agent_instance is None: |
| yield "Agent not initialized. Please check API keys and restart." |
| return |
|
|
| if not current_task_id: |
| yield "Please fetch a question first using the button above." |
| return |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| thread_id = f"gaia_task_{current_task_id}_{os.urandom(4).hex()}" |
|
|
| print(f"Running agent for user message: {message[:50]}...") |
| history.append([message, None]) |
|
|
| final_agent_answer = None |
| full_yielded_response = "" |
|
|
| |
| async for step in agent_instance(message, thread_id=thread_id): |
| if isinstance(step, str): |
| |
| history[-1][1] = step |
| yield format_chat_history(history) |
| full_yielded_response = step |
| |
| |
| |
| |
|
|
| |
| final_agent_answer = step |
| print(f"Agent final answer received: {final_agent_answer[:100]}...") |
|
|
| |
| if final_agent_answer: |
| history[-1][1] = final_agent_answer |
| elif full_yielded_response: |
| |
| history[-1][1] = full_yielded_response |
| final_agent_answer = full_yielded_response |
| else: |
| history[-1][1] = "Agent did not produce a final answer." |
| final_agent_answer = "" |
|
|
| |
| yield format_chat_history(history), final_agent_answer |
|
|
|
|
| def submit_to_leaderboard( |
| api_url: str, |
| username: str, |
| task_id: str, |
| agent_answer: str, |
| |
| ): |
| """Submits the agent's answer and code to the FastAPI backend.""" |
| if agent_instance is None: |
| return "Agent not initialized. Cannot submit." |
| if not api_url: |
| return "Please enter the API URL." |
| if not username: |
| return "Please enter your Hugging Face username." |
| if not task_id: |
| return "No task ID available. Please fetch a question first." |
| if agent_answer is None or agent_answer.strip() == "": |
| |
| print("Warning: Submitting empty answer.") |
| |
|
|
|
|
| submit_url = f"{api_url.strip('/')}/submit" |
| print(f"Submitting to: {submit_url}") |
|
|
| |
| try: |
| agent_code = agent_instance.__repr__() |
| |
| except Exception as e: |
| print(f"Error getting agent representation: {e}") |
| return f"Error generating agent code for submission: {e}" |
|
|
| |
| submission_data = { |
| "username": username.strip(), |
| "agent_code": agent_code, |
| "answers": [ |
| { |
| "task_id": task_id, |
| "submitted_answer": agent_answer |
| } |
| |
| ] |
| } |
|
|
| try: |
| response = requests.post(submit_url, json=submission_data, timeout=30) |
| response.raise_for_status() |
| result_data = response.json() |
| |
| result_message = ( |
| f"Submission Successful!\n" |
| f"User: {result_data.get('username')}\n" |
| f"Score: {result_data.get('score')}\n" |
| f"Correct: {result_data.get('correct_count')}/{result_data.get('total_attempted')}\n" |
| f"Message: {result_data.get('message')}\n" |
| f"Timestamp: {result_data.get('timestamp')}" |
| ) |
| print("Submission successful.") |
| return result_message |
| except requests.exceptions.HTTPError as e: |
| |
| error_detail = e.response.text |
| try: |
| error_json = e.response.json() |
| error_detail = error_json.get('detail', error_detail) |
| except requests.exceptions.JSONDecodeError: |
| pass |
| print(f"HTTP Error during submission: {e.response.status_code} - {error_detail}") |
| return f"Submission Failed (HTTP {e.response.status_code}): {error_detail}" |
| except requests.exceptions.RequestException as e: |
| print(f"Network error during submission: {e}") |
| return f"Submission Failed: Network error - {e}" |
| except Exception as e: |
| print(f"An unexpected error occurred during submission: {e}") |
| return f"Submission Failed: An unexpected error occurred - {e}" |
|
|
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# Agent Evaluation Interface") |
| gr.Markdown( |
| "Fetch a random question from the evaluation API, interact with the agent " |
| "(Note: the default agent answers weather questions, not GAIA), " |
| "and submit the agent's final answer to the leaderboard." |
| ) |
|
|
| |
| |
| current_task_id = gr.State("") |
| current_question_text = gr.State("") |
| current_agent_answer = gr.State("") |
| |
|
|
| with gr.Row(): |
| api_url_input = gr.Textbox(label="FastAPI API URL", value=DEFAULT_API_URL) |
| hf_username_input = gr.Textbox(label="Hugging Face Username") |
|
|
| with gr.Row(): |
| fetch_button = gr.Button("Get Random Question") |
| submission_status_display = gr.Textbox(label="Status", interactive=False) |
|
|
| with gr.Row(): |
| question_display = gr.Textbox(label="Current Question", lines=3, interactive=False) |
|
|
| gr.Markdown("---") |
| gr.Markdown("## Agent Interaction") |
|
|
| chatbot = gr.Chatbot(label="Agent Conversation", height=400) |
| msg_input = gr.Textbox(label="Send a message to the Agent (or just observe)") |
|
|
| |
| final_answer_display = gr.Textbox(label="Agent's Final Answer (Extracted)", interactive=False) |
|
|
| gr.Markdown("---") |
| gr.Markdown("## Submission") |
| with gr.Row(): |
| submit_button = gr.Button("Submit Current Answer to Leaderboard") |
|
|
| submission_result_display = gr.Markdown(label="Submission Result", value="*Submit an answer to see the result here.*") |
|
|
|
|
| |
|
|
| |
| fetch_button.click( |
| fn=fetch_and_display_question, |
| inputs=[api_url_input], |
| outputs=[ |
| submission_status_display, |
| current_task_id, |
| question_display, |
| final_answer_display, |
| chatbot |
| ] |
| ) |
|
|
| |
| msg_input.submit( |
| fn=run_agent_interaction, |
| inputs=[ |
| msg_input, |
| chatbot, |
| current_task_id, |
| |
| ], |
| outputs=[ |
| chatbot, |
| current_agent_answer |
| ] |
| ).then( |
| |
| lambda answer_state: answer_state, |
| inputs=[current_agent_answer], |
| outputs=[final_answer_display] |
| ) |
|
|
| |
| msg_input.submit(lambda: "", None, msg_input, queue=False) |
|
|
|
|
| |
| submit_button.click( |
| fn=submit_to_leaderboard, |
| inputs=[ |
| api_url_input, |
| hf_username_input, |
| current_task_id, |
| current_agent_answer, |
| |
| ], |
| outputs=[submission_result_display] |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| if agent_instance is None: |
| print("\nFATAL: Agent could not be initialized. Gradio app will not run correctly.") |
| print("Please ensure OPENAI_API_KEY is set and valid.\n") |
| |
| |
| else: |
| print("Launching Gradio Interface...") |
| demo.launch(debug=True, server_name="0.0.0.0") |