Spaces:
Sleeping
Sleeping
| # client_agent.py | |
| import asyncio | |
| from contextlib import AsyncExitStack | |
| from typing import Optional | |
| from dotenv import load_dotenv | |
| from anthropic import Anthropic | |
| from mcp import ClientSession, StdioServerParameters | |
| from mcp.client.stdio import stdio_client | |
| load_dotenv() | |
| class DocsNavigatorClient: | |
| def __init__(self): | |
| self.session: Optional[ClientSession] = None | |
| self.exit_stack = AsyncExitStack() | |
| self.anthropic = Anthropic() | |
| self._tools_cache = None | |
| async def connect(self, server_script_path: str = "src/server/server.py"): | |
| """ | |
| Start the docs MCP server (via stdio) and initialize a session. | |
| """ | |
| import os | |
| import sys | |
| # Try to use uv run first, then fall back to the virtual environment python | |
| if os.path.exists(".venv/Scripts/python.exe"): | |
| # Windows virtual environment | |
| python_path = ".venv/Scripts/python.exe" | |
| params = StdioServerParameters( | |
| command=python_path, | |
| args=[server_script_path], | |
| env=None, | |
| ) | |
| elif os.path.exists(".venv/bin/python"): | |
| # Unix virtual environment | |
| python_path = ".venv/bin/python" | |
| params = StdioServerParameters( | |
| command=python_path, | |
| args=[server_script_path], | |
| env=None, | |
| ) | |
| else: | |
| # Fallback to system python | |
| params = StdioServerParameters( | |
| command="python", | |
| args=[server_script_path], | |
| env=None, | |
| ) | |
| stdio_transport = await self.exit_stack.enter_async_context( | |
| stdio_client(params) | |
| ) | |
| self.stdio, self.write = stdio_transport | |
| self.session = await self.exit_stack.enter_async_context( | |
| ClientSession(self.stdio, self.write) | |
| ) | |
| await self.session.initialize() | |
| tools_response = await self.session.list_tools() | |
| self._tools_cache = [ | |
| { | |
| "name": t.name, | |
| "description": t.description, | |
| "input_schema": t.inputSchema, | |
| } | |
| for t in tools_response.tools | |
| ] | |
| async def close(self): | |
| await self.exit_stack.aclose() | |
| async def answer(self, user_query: str) -> str: | |
| """ | |
| Ask the LLM to answer a question, using docs tools when needed. | |
| Supports multi-turn conversations with multiple tool calls. | |
| """ | |
| if not self.session: | |
| raise RuntimeError("MCP session not initialized. Call connect() first.") | |
| if self._tools_cache is None: | |
| tools_response = await self.session.list_tools() | |
| self._tools_cache = [ | |
| { | |
| "name": t.name, | |
| "description": t.description, | |
| "input_schema": t.inputSchema, | |
| } | |
| for t in tools_response.tools | |
| ] | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": ( | |
| "You are a documentation assistant. " | |
| "Use the available MCP tools to search and read docs in order " | |
| "to answer the question. You can use multiple tools and think through " | |
| "your response step by step. Always reference the files you used.\n\n" | |
| f"User question: {user_query}" | |
| ), | |
| } | |
| ] | |
| tools = self._tools_cache | |
| max_iterations = 10 # Prevent infinite loops | |
| iteration = 0 | |
| while iteration < max_iterations: | |
| iteration += 1 | |
| # Call the LLM | |
| response = self.anthropic.messages.create( | |
| model="claude-3-haiku-20240307", | |
| max_tokens=2500, # Increased token limit for longer responses | |
| messages=messages, | |
| tools=tools, | |
| ) | |
| # Add assistant's response to conversation | |
| messages.append({ | |
| "role": "assistant", | |
| "content": response.content, | |
| }) | |
| # Check if there are any tool calls to execute | |
| tool_calls = [content for content in response.content if content.type == "tool_use"] | |
| if not tool_calls: | |
| # No more tool calls - we're done | |
| text_content = [content.text for content in response.content if content.type == "text"] | |
| return "\n".join(text_content) if text_content else "[no text response from model]" | |
| # Execute all tool calls in this round | |
| tool_results = [] | |
| for tool_call in tool_calls: | |
| try: | |
| tool_name = tool_call.name | |
| tool_args = tool_call.input | |
| # Call the MCP tool | |
| result = await self.session.call_tool(tool_name, tool_args) | |
| tool_results.append({ | |
| "type": "tool_result", | |
| "tool_use_id": tool_call.id, | |
| "content": result.content, | |
| }) | |
| except Exception as e: | |
| # Handle tool errors gracefully | |
| tool_results.append({ | |
| "type": "tool_result", | |
| "tool_use_id": tool_call.id, | |
| "content": f"Error calling tool {tool_call.name}: {str(e)}", | |
| "is_error": True, | |
| }) | |
| # Add tool results to conversation | |
| messages.append({ | |
| "role": "user", | |
| "content": tool_results, | |
| }) | |
| # If we hit max iterations, return what we have so far | |
| text_content = [] | |
| for message in messages: | |
| if message["role"] == "assistant": | |
| for content in message["content"]: | |
| if hasattr(content, 'type') and content.type == "text": | |
| text_content.append(content.text) | |
| elif isinstance(content, dict) and content.get("type") == "text": | |
| text_content.append(content.get("text", "")) | |
| return "\n".join(text_content) if text_content else "[reached max iterations without final response]" | |
| # Thread-local storage for client instances | |
| import threading | |
| _thread_local = threading.local() | |
| def answer_sync(user_query: str) -> str: | |
| """ | |
| Synchronous wrapper so Gradio can call into our async flow easily. | |
| Creates a new client for each request to avoid event loop conflicts. | |
| """ | |
| import concurrent.futures | |
| def run_in_new_loop(): | |
| # Create a new event loop in this thread | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| return loop.run_until_complete(_answer_async(user_query)) | |
| finally: | |
| loop.close() | |
| # Run in a separate thread to avoid conflicts with Gradio's event loop | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit(run_in_new_loop) | |
| return future.result() | |
| async def _answer_async(user_query: str) -> str: | |
| """ | |
| Create a fresh client for each request to avoid event loop issues. | |
| """ | |
| client = DocsNavigatorClient() | |
| try: | |
| await client.connect("src/server/server.py") | |
| return await client.answer(user_query) | |
| finally: | |
| await client.close() |