Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| from smolagents import InferenceClientModel, CodeAgent | |
| from smolagents.mcp_client import MCPClient | |
| import time | |
| import sys | |
| # Replace this with your actual MCP server URL | |
| # MCP_SERVER_URL = { | |
| # "url": "https://japhari-mcp-client.hf.space/gradio_api/mcp/sse", | |
| # "timeout": 60 # Increase timeout to 60 seconds | |
| # } | |
| # For deployment as a client, use the public Hugging Face Space MCP server: | |
| MCP_SERVER_URL = { | |
| "url": "https://japhari-mcp-sentiment.hf.space/gradio_api/mcp/sse", | |
| "timeout": 60 | |
| } | |
| def create_mcp_client(max_retries=3, retry_delay=5): | |
| """Create MCP client with retry logic""" | |
| for attempt in range(max_retries): | |
| try: | |
| print(f"Attempting to connect to MCP server (attempt {attempt + 1}/{max_retries})...") | |
| client = MCPClient(MCP_SERVER_URL) | |
| print("Successfully connected to MCP server!") | |
| return client | |
| except Exception as e: | |
| print(f"Connection attempt {attempt + 1} failed: {str(e)}") | |
| if attempt < max_retries - 1: | |
| print(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| else: | |
| print("All connection attempts failed. Please check if the MCP server is running.") | |
| raise | |
| mcp_client = None | |
| try: | |
| # Connect to the MCP server with retry logic | |
| mcp_client = create_mcp_client() | |
| # Discover available tools from the server | |
| tools = mcp_client.get_tools() | |
| # Initialize a simple model (in this case, a dummy one from smolagents) | |
| model = InferenceClientModel() | |
| # Create an agent that can use the discovered tools | |
| agent = CodeAgent(tools=[*tools], model=model) | |
| # Build a Gradio chat interface | |
| demo = gr.ChatInterface( | |
| fn=lambda message, history: str(agent.run(message)), | |
| type="messages", | |
| examples=[ | |
| "Prime factorization of 68" | |
| ], | |
| title="Agent with MCP Tools", | |
| description="This is a simple agent that uses MCP tools to answer questions" | |
| ) | |
| # Launch the UI | |
| demo.launch() | |
| except Exception as e: | |
| print(f"Error: {str(e)}") | |
| sys.exit(1) | |
| finally: | |
| # Always disconnect the MCP client cleanly if it was created | |
| if mcp_client is not None: | |
| try: | |
| mcp_client.disconnect() | |
| except Exception as e: | |
| print(f"Error during disconnect: {str(e)}") | |