| | import os |
| | import gradio as gr |
| | import requests |
| | import inspect |
| | import asyncio |
| | from typing import Dict, List, AsyncGenerator, Union, Tuple, Optional |
| |
|
| | |
| | from langchain_core.messages import HumanMessage, AIMessage, BaseMessage |
| | from langchain_core.tools import tool |
| | from langchain_openai import ChatOpenAI |
| | from langgraph.checkpoint.memory import MemorySaver |
| | from langgraph.prebuilt import create_react_agent |
| |
|
| | |
| | DEFAULT_API_URL = "http://127.0.0.1:8000" |
| |
|
| | |
| | @tool |
| | def get_lat_lng(location_description: str) -> dict[str, float]: |
| | """Get the latitude and longitude of a location.""" |
| | print(f"Tool: Getting lat/lng for {location_description}") |
| | |
| | if "tokyo" in location_description.lower(): |
| | return {"lat": 35.6895, "lng": 139.6917} |
| | elif "paris" in location_description.lower(): |
| | return {"lat": 48.8566, "lng": 2.3522} |
| | elif "new york" in location_description.lower(): |
| | return {"lat": 40.7128, "lng": -74.0060} |
| | else: |
| | return {"lat": 51.5072, "lng": -0.1276} |
| |
|
| | @tool |
| | def get_weather(lat: float, lng: float) -> dict[str, str]: |
| | """Get the weather at a location.""" |
| | print(f"Tool: Getting weather for lat={lat}, lng={lng}") |
| | |
| | if lat > 45: |
| | return {"temperature": "15°C", "description": "Cloudy"} |
| | elif lat > 30: |
| | return {"temperature": "25°C", "description": "Sunny"} |
| | else: |
| | return {"temperature": "30°C", "description": "Very Sunny"} |
| |
|
| | |
| | class MyLangChainAgent: |
| | """ |
| | A sample LangChain agent class designed for interaction and submission. |
| | NOTE: The current tools (weather/location) are placeholders and WILL NOT |
| | correctly answer GAIA benchmark questions. This class structure |
| | demonstrates how to integrate an agent with the submission API. |
| | Replace LLM, tools, and potentially the agent type for actual GAIA tasks. |
| | """ |
| | def __init__(self, model_name="gpt-4", temperature=0): |
| | |
| | if not os.getenv("OPENAI_API_KEY"): |
| | raise ValueError("OPENAI_API_KEY environment variable not set.") |
| |
|
| | self.llm = ChatOpenAI(temperature=temperature, model=model_name) |
| | self.tools = [get_lat_lng, get_weather] |
| | self.memory = MemorySaver() |
| | |
| | self.agent_executor = create_react_agent(self.llm, self.tools, checkpointer=self.memory) |
| | print("MyLangChainAgent initialized.") |
| |
|
| | async def __call__(self, question: str, thread_id: str) -> AsyncGenerator[Union[str, Dict[str, str]], str]: |
| | """ |
| | Runs the agent asynchronously, yielding intermediate steps and returning the final answer. |
| | |
| | Args: |
| | question: The input question string. |
| | thread_id: A unique identifier for the conversation thread. |
| | |
| | Yields: |
| | Intermediate steps (tool calls/results) as strings or dicts. |
| | |
| | Returns: |
| | The final AI answer as a string. |
| | """ |
| | print(f"Agent executing for thread_id: {thread_id} on question: {question[:50]}...") |
| | lc_messages: List[BaseMessage] = [HumanMessage(content=question)] |
| | final_answer = "" |
| | full_response_content = "" |
| |
|
| | async for chunk in self.agent_executor.astream_events( |
| | {"messages": lc_messages}, |
| | config={"configurable": {"thread_id": thread_id}}, |
| | version="v1" |
| | ): |
| | event = chunk["event"] |
| | data = chunk["data"] |
| | |
| |
|
| | if event == "on_chat_model_stream": |
| | content = data["chunk"].content |
| | if content: |
| | |
| | full_response_content += content |
| | |
| | |
| |
|
| | elif event == "on_tool_start": |
| | tool_input_str = str(data.get('input', '')) |
| | yield f"🛠️ Using tool: **{data['name']}** with input: `{tool_input_str}`" |
| |
|
| | elif event == "on_tool_end": |
| | tool_output_str = str(data.get('output', '')) |
| | yield f"✅ Tool **{data['name']}** finished.\nResult: `{tool_output_str}`" |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | final_answer = full_response_content.strip() |
| | print(f"Agent execution finished. Final Answer: {final_answer[:100]}...") |
| | |
| | |
| | return final_answer |
| |
|
| | def __repr__(self) -> str: |
| | """ |
| | Return the source code required to reconstruct this agent, including |
| | the class definition, tool functions, and necessary imports. |
| | """ |
| | imports = [ |
| | "import os", |
| | "from typing import Dict, List, AsyncGenerator, Union, Tuple, Optional", |
| | "from langchain_core.messages import HumanMessage, AIMessage, BaseMessage", |
| | "from langchain_core.tools import tool", |
| | "from langchain_openai import ChatOpenAI", |
| | "from langgraph.checkpoint.memory import MemorySaver", |
| | "from langgraph.prebuilt import create_react_agent", |
| | "import inspect", |
| | "import asyncio", |
| | "\n" |
| | ] |
| | |
| | tool_sources = [] |
| | for t in self.tools: |
| | try: |
| | tool_sources.append(inspect.getsource(t)) |
| | except (TypeError, OSError) as e: |
| | print(f"Warning: Could not get source for tool {t.__name__}: {e}") |
| | tool_sources.append(f"# Could not automatically get source for tool: {t.__name__}\n") |
| |
|
| | |
| | class_source = inspect.getsource(MyLangChainAgent) |
| |
|
| | |
| | full_source = "\n".join(imports) + "\n\n" + \ |
| | "\n\n".join(tool_sources) + "\n\n" + \ |
| | class_source |
| | return full_source |
| |
|
| |
|
| | |
| |
|
| | |
| | |
| | try: |
| | agent_instance = MyLangChainAgent() |
| | except ValueError as e: |
| | print(f"ERROR initializing agent: {e}") |
| | |
| | agent_instance = None |
| |
|
| | def format_chat_history(history: List[List[Optional[str]]]) -> List[Tuple[Optional[str], Optional[str]]]: |
| | """Helper to format Gradio history for display.""" |
| | |
| | |
| | formatted = [] |
| | for turn in history: |
| | formatted.append(tuple(turn)) |
| | return formatted |
| |
|
| |
|
| | async def fetch_and_display_question(api_url: str): |
| | """Calls the backend to get a random question.""" |
| | if not api_url: |
| | return "Please enter the API URL.", "", "", gr.update(value=""), gr.update(value="") |
| | |
| | question_url = f"{api_url.strip('/')}/random-question" |
| | print(f"Fetching question from: {question_url}") |
| | try: |
| | response = requests.get(question_url, timeout=10) |
| | response.raise_for_status() |
| | data = response.json() |
| | task_id = data.get("task_id") |
| | question_text = data.get("question") |
| | if task_id and question_text: |
| | print(f"Fetched Task ID: {task_id}") |
| | |
| | return "Question fetched successfully!", task_id, question_text, "", [] |
| | else: |
| | return "Error: Invalid data format received from API.", "", "", "", [] |
| | except requests.exceptions.RequestException as e: |
| | print(f"Error fetching question: {e}") |
| | return f"Error fetching question: {e}", "", "", "", [] |
| | except Exception as e: |
| | print(f"An unexpected error occurred: {e}") |
| | return f"An unexpected error occurred: {e}", "", "", "", [] |
| |
|
| |
|
| | async def run_agent_interaction( |
| | message: str, |
| | history: List[List[Optional[str]]], |
| | current_task_id: str, |
| | |
| | ): |
| | """Handles the chat interaction, runs the agent, yields steps, updates final answer state.""" |
| | if agent_instance is None: |
| | yield "Agent not initialized. Please check API keys and restart." |
| | return |
| |
|
| | if not current_task_id: |
| | yield "Please fetch a question first using the button above." |
| | return |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | thread_id = f"gaia_task_{current_task_id}_{os.urandom(4).hex()}" |
| |
|
| | print(f"Running agent for user message: {message[:50]}...") |
| | history.append([message, None]) |
| |
|
| | final_agent_answer = None |
| | full_yielded_response = "" |
| |
|
| | |
| | async for step in agent_instance(message, thread_id=thread_id): |
| | if isinstance(step, str): |
| | |
| | history[-1][1] = step |
| | yield format_chat_history(history) |
| | full_yielded_response = step |
| | |
| | |
| | |
| | |
| |
|
| | |
| | final_agent_answer = step |
| | print(f"Agent final answer received: {final_agent_answer[:100]}...") |
| |
|
| | |
| | if final_agent_answer: |
| | history[-1][1] = final_agent_answer |
| | elif full_yielded_response: |
| | |
| | history[-1][1] = full_yielded_response |
| | final_agent_answer = full_yielded_response |
| | else: |
| | history[-1][1] = "Agent did not produce a final answer." |
| | final_agent_answer = "" |
| |
|
| | |
| | yield format_chat_history(history), final_agent_answer |
| |
|
| |
|
| | def submit_to_leaderboard( |
| | api_url: str, |
| | username: str, |
| | task_id: str, |
| | agent_answer: str, |
| | |
| | ): |
| | """Submits the agent's answer and code to the FastAPI backend.""" |
| | if agent_instance is None: |
| | return "Agent not initialized. Cannot submit." |
| | if not api_url: |
| | return "Please enter the API URL." |
| | if not username: |
| | return "Please enter your Hugging Face username." |
| | if not task_id: |
| | return "No task ID available. Please fetch a question first." |
| | if agent_answer is None or agent_answer.strip() == "": |
| | |
| | print("Warning: Submitting empty answer.") |
| | |
| |
|
| |
|
| | submit_url = f"{api_url.strip('/')}/submit" |
| | print(f"Submitting to: {submit_url}") |
| |
|
| | |
| | try: |
| | agent_code = agent_instance.__repr__() |
| | |
| | except Exception as e: |
| | print(f"Error getting agent representation: {e}") |
| | return f"Error generating agent code for submission: {e}" |
| |
|
| | |
| | submission_data = { |
| | "username": username.strip(), |
| | "agent_code": agent_code, |
| | "answers": [ |
| | { |
| | "task_id": task_id, |
| | "submitted_answer": agent_answer |
| | } |
| | |
| | ] |
| | } |
| |
|
| | try: |
| | response = requests.post(submit_url, json=submission_data, timeout=30) |
| | response.raise_for_status() |
| | result_data = response.json() |
| | |
| | result_message = ( |
| | f"Submission Successful!\n" |
| | f"User: {result_data.get('username')}\n" |
| | f"Score: {result_data.get('score')}\n" |
| | f"Correct: {result_data.get('correct_count')}/{result_data.get('total_attempted')}\n" |
| | f"Message: {result_data.get('message')}\n" |
| | f"Timestamp: {result_data.get('timestamp')}" |
| | ) |
| | print("Submission successful.") |
| | return result_message |
| | except requests.exceptions.HTTPError as e: |
| | |
| | error_detail = e.response.text |
| | try: |
| | error_json = e.response.json() |
| | error_detail = error_json.get('detail', error_detail) |
| | except requests.exceptions.JSONDecodeError: |
| | pass |
| | print(f"HTTP Error during submission: {e.response.status_code} - {error_detail}") |
| | return f"Submission Failed (HTTP {e.response.status_code}): {error_detail}" |
| | except requests.exceptions.RequestException as e: |
| | print(f"Network error during submission: {e}") |
| | return f"Submission Failed: Network error - {e}" |
| | except Exception as e: |
| | print(f"An unexpected error occurred during submission: {e}") |
| | return f"Submission Failed: An unexpected error occurred - {e}" |
| |
|
| |
|
| | |
| | with gr.Blocks() as demo: |
| | gr.Markdown("# Agent Evaluation Interface") |
| | gr.Markdown( |
| | "Fetch a random question from the evaluation API, interact with the agent " |
| | "(Note: the default agent answers weather questions, not GAIA), " |
| | "and submit the agent's final answer to the leaderboard." |
| | ) |
| |
|
| | |
| | |
| | current_task_id = gr.State("") |
| | current_question_text = gr.State("") |
| | current_agent_answer = gr.State("") |
| | |
| |
|
| | with gr.Row(): |
| | api_url_input = gr.Textbox(label="FastAPI API URL", value=DEFAULT_API_URL) |
| | hf_username_input = gr.Textbox(label="Hugging Face Username") |
| |
|
| | with gr.Row(): |
| | fetch_button = gr.Button("Get Random Question") |
| | submission_status_display = gr.Textbox(label="Status", interactive=False) |
| |
|
| | with gr.Row(): |
| | question_display = gr.Textbox(label="Current Question", lines=3, interactive=False) |
| |
|
| | gr.Markdown("---") |
| | gr.Markdown("## Agent Interaction") |
| |
|
| | chatbot = gr.Chatbot(label="Agent Conversation", height=400) |
| | msg_input = gr.Textbox(label="Send a message to the Agent (or just observe)") |
| |
|
| | |
| | final_answer_display = gr.Textbox(label="Agent's Final Answer (Extracted)", interactive=False) |
| |
|
| | gr.Markdown("---") |
| | gr.Markdown("## Submission") |
| | with gr.Row(): |
| | submit_button = gr.Button("Submit Current Answer to Leaderboard") |
| |
|
| | submission_result_display = gr.Markdown(label="Submission Result", value="*Submit an answer to see the result here.*") |
| |
|
| |
|
| | |
| |
|
| | |
| | fetch_button.click( |
| | fn=fetch_and_display_question, |
| | inputs=[api_url_input], |
| | outputs=[ |
| | submission_status_display, |
| | current_task_id, |
| | question_display, |
| | final_answer_display, |
| | chatbot |
| | ] |
| | ) |
| |
|
| | |
| | msg_input.submit( |
| | fn=run_agent_interaction, |
| | inputs=[ |
| | msg_input, |
| | chatbot, |
| | current_task_id, |
| | |
| | ], |
| | outputs=[ |
| | chatbot, |
| | current_agent_answer |
| | ] |
| | ).then( |
| | |
| | lambda answer_state: answer_state, |
| | inputs=[current_agent_answer], |
| | outputs=[final_answer_display] |
| | ) |
| |
|
| | |
| | msg_input.submit(lambda: "", None, msg_input, queue=False) |
| |
|
| |
|
| | |
| | submit_button.click( |
| | fn=submit_to_leaderboard, |
| | inputs=[ |
| | api_url_input, |
| | hf_username_input, |
| | current_task_id, |
| | current_agent_answer, |
| | |
| | ], |
| | outputs=[submission_result_display] |
| | ) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | if agent_instance is None: |
| | print("\nFATAL: Agent could not be initialized. Gradio app will not run correctly.") |
| | print("Please ensure OPENAI_API_KEY is set and valid.\n") |
| | |
| | |
| | else: |
| | print("Launching Gradio Interface...") |
| | demo.launch(debug=True, server_name="0.0.0.0") |