|
|
import gradio as gr |
|
|
import asyncio |
|
|
import json |
|
|
import atexit |
|
|
import signal |
|
|
import sys |
|
|
from ..mcp_agent.agent import MCP_Agent |
|
|
|
|
|
class GradioMCPApp: |
|
|
def __init__(self): |
|
|
self.agent = None |
|
|
self.chat_history = [] |
|
|
self._loop = None |
|
|
self.server_count = 1 |
|
|
|
|
|
def get_or_create_loop(self): |
|
|
"""Get existing event loop or create a new one""" |
|
|
if self._loop is None or self._loop.is_closed(): |
|
|
self._loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(self._loop) |
|
|
return self._loop |
|
|
|
|
|
async def initialize_agent(self, openai_api_key, *server_configs): |
|
|
"""Initialize the MCP Agent with provided configuration""" |
|
|
try: |
|
|
|
|
|
if self.agent: |
|
|
await self.disconnect_agent() |
|
|
|
|
|
|
|
|
|
|
|
mcp_servers = [] |
|
|
for i in range(0, len(server_configs), 4): |
|
|
if i + 3 < len(server_configs): |
|
|
server_url = server_configs[i] |
|
|
server_name = server_configs[i + 1] |
|
|
server_type = server_configs[i + 2] |
|
|
headers = server_configs[i + 3] |
|
|
|
|
|
if server_url and server_url.strip(): |
|
|
server_config = { |
|
|
'url': server_url.strip(), |
|
|
'name': server_name.strip() if server_name and server_name.strip() else f'server_{i//4 + 1}', |
|
|
'type': server_type if server_type else 'http', |
|
|
'headers': headers.strip() if headers and headers.strip() else None |
|
|
} |
|
|
mcp_servers.append(server_config) |
|
|
|
|
|
|
|
|
api_keys = {'openai_api_key': openai_api_key} |
|
|
self.agent = MCP_Agent(api_keys=api_keys, mpc_server_urls=mcp_servers) |
|
|
|
|
|
|
|
|
await self.agent.connect() |
|
|
|
|
|
server_count = len(mcp_servers) |
|
|
if server_count == 0: |
|
|
return True, "Agent initialized successfully (no MCP servers configured)!" |
|
|
else: |
|
|
return True, f"Agent initialized successfully with {server_count} MCP server(s)!" |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Error initializing agent: {str(e)}" |
|
|
|
|
|
async def chat_with_agent(self, message): |
|
|
"""Handle text chat with the agent""" |
|
|
if not self.agent: |
|
|
return self.chat_history, "Please initialize the agent first by providing your OpenAI API key and clicking 'Initialize Agent'." |
|
|
|
|
|
if not message or not message.strip(): |
|
|
return self.chat_history, "Please provide a message." |
|
|
|
|
|
try: |
|
|
|
|
|
response = await self.agent.chat(message.strip()) |
|
|
|
|
|
|
|
|
self.chat_history.append([message.strip(), str(response)]) |
|
|
|
|
|
return self.chat_history, "" |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error during chat: {str(e)}" |
|
|
self.chat_history.append([message, error_msg]) |
|
|
return self.chat_history, error_msg |
|
|
|
|
|
async def reset_agent(self): |
|
|
"""Reset the agent's conversation history""" |
|
|
if self.agent: |
|
|
self.agent.reset() |
|
|
self.chat_history = [] |
|
|
return [], "Agent conversation history reset successfully!" |
|
|
else: |
|
|
return [], "No agent to reset. Please initialize the agent first." |
|
|
|
|
|
async def disconnect_agent(self): |
|
|
"""Disconnect from MCP servers""" |
|
|
if self.agent: |
|
|
try: |
|
|
await self.agent.disconnect() |
|
|
except Exception as e: |
|
|
print(f"Error during disconnect: {e}") |
|
|
finally: |
|
|
self.agent = None |
|
|
self.chat_history = [] |
|
|
return [], "Agent disconnected successfully!" |
|
|
|
|
|
async def cleanup(self): |
|
|
"""Clean up resources""" |
|
|
if self.agent: |
|
|
await self.disconnect_agent() |
|
|
if self._loop and not self._loop.is_closed(): |
|
|
self._loop.close() |
|
|
|
|
|
|
|
|
app_instance = GradioMCPApp() |
|
|
|
|
|
def run_async_safely(coro, *args): |
|
|
"""Safely run async function with proper error handling""" |
|
|
loop = app_instance.get_or_create_loop() |
|
|
try: |
|
|
return loop.run_until_complete(coro(*args)) |
|
|
except Exception as e: |
|
|
print(f"Error in async operation: {e}") |
|
|
return None, f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
def initialize_agent_wrapper(openai_api_key, *server_configs): |
|
|
success, message = run_async_safely( |
|
|
app_instance.initialize_agent, |
|
|
openai_api_key, *server_configs |
|
|
) |
|
|
if success is None: |
|
|
return gr.update(visible=False), gr.update(visible=True), message |
|
|
return gr.update(visible=success), gr.update(visible=not success), message |
|
|
|
|
|
def chat_wrapper(message): |
|
|
chat_history, error_msg = run_async_safely(app_instance.chat_with_agent, message) |
|
|
if chat_history is None: |
|
|
return [], error_msg, "" |
|
|
return chat_history, error_msg, "" |
|
|
|
|
|
def reset_wrapper(): |
|
|
chat_history, message = run_async_safely(app_instance.reset_agent) |
|
|
if chat_history is None: |
|
|
return [], message |
|
|
return chat_history, message |
|
|
|
|
|
def disconnect_wrapper(): |
|
|
chat_history, message = run_async_safely(app_instance.disconnect_agent) |
|
|
if chat_history is None: |
|
|
return [], gr.update(visible=False), gr.update(visible=True), message |
|
|
return chat_history, gr.update(visible=False), gr.update(visible=True), message |
|
|
|
|
|
|
|
|
def cleanup_on_exit(): |
|
|
"""Cleanup function to run on exit""" |
|
|
try: |
|
|
loop = app_instance.get_or_create_loop() |
|
|
if not loop.is_closed(): |
|
|
loop.run_until_complete(app_instance.cleanup()) |
|
|
except Exception as e: |
|
|
print(f"Error during cleanup: {e}") |
|
|
|
|
|
|
|
|
atexit.register(cleanup_on_exit) |
|
|
|
|
|
|
|
|
def signal_handler(signum, frame): |
|
|
print("\nReceived interrupt signal. Cleaning up...") |
|
|
cleanup_on_exit() |
|
|
sys.exit(0) |
|
|
|
|
|
signal.signal(signal.SIGINT, signal_handler) |
|
|
|
|
|
|
|
|
def add_server(current_count): |
|
|
"""Show the next server configuration""" |
|
|
new_count = min(current_count + 1, 3) |
|
|
return ( |
|
|
new_count, |
|
|
gr.update(visible=new_count >= 2), |
|
|
gr.update(visible=new_count >= 3), |
|
|
gr.update(interactive=new_count < 3), |
|
|
gr.update(interactive=new_count > 1) |
|
|
) |
|
|
|
|
|
def remove_server(current_count): |
|
|
"""Hide the last server configuration""" |
|
|
new_count = max(current_count - 1, 1) |
|
|
return ( |
|
|
new_count, |
|
|
gr.update(visible=new_count >= 2), |
|
|
gr.update(visible=new_count >= 3), |
|
|
gr.update(interactive=new_count < 3), |
|
|
gr.update(interactive=new_count > 1) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="MCP Agent Chat", theme=gr.themes.Soft()) as demo: |
|
|
with gr.Row(): |
|
|
|
|
|
gr.Markdown("# MCP Agent Chat Interface") |
|
|
gr.HTML("") |
|
|
|
|
|
|
|
|
|
|
|
with gr.Sidebar(): |
|
|
|
|
|
sidebar_column = gr.Column(scale=1, min_width=350) |
|
|
with sidebar_column: |
|
|
gr.Markdown("## π§ Configuration") |
|
|
|
|
|
openai_key = gr.Textbox( |
|
|
label="OpenAI API Key", |
|
|
type="password", |
|
|
placeholder="sk-...", |
|
|
info="Your OpenAI API key for the language model" |
|
|
) |
|
|
|
|
|
gr.Markdown("### MCP Servers Setup") |
|
|
gr.Markdown("Configure your MCP server connections (leave all URLs empty to run without MCP servers)") |
|
|
|
|
|
|
|
|
servers_container = gr.Column() |
|
|
|
|
|
|
|
|
with servers_container: |
|
|
|
|
|
with gr.Tab("Url based servers"): |
|
|
with gr.Group(): |
|
|
gr.Markdown("#### Server 1") |
|
|
server1_url = gr.Textbox( |
|
|
label="Server URL", |
|
|
placeholder="http://localhost:8000", |
|
|
info="The URL of your MCP server" |
|
|
) |
|
|
server1_name = gr.Textbox( |
|
|
label="Server Name", |
|
|
placeholder="server_1", |
|
|
info="A friendly name for your MCP server" |
|
|
) |
|
|
server1_type = gr.Dropdown( |
|
|
label="Server Type", |
|
|
choices=["http", "SSE"], |
|
|
value="http", |
|
|
info="The type of MCP server connection" |
|
|
) |
|
|
server1_headers = gr.Textbox( |
|
|
label="Headers (Optional)", |
|
|
type="password", |
|
|
placeholder="Leave empty if not required", |
|
|
info="Headers for the MCP server (if required), example: {'Authorization': 'Bearer 1234567890'}" |
|
|
) |
|
|
|
|
|
|
|
|
server2_group = gr.Group(visible=False) |
|
|
with server2_group: |
|
|
gr.Markdown("#### Server 2") |
|
|
server2_url = gr.Textbox( |
|
|
label="Server URL", |
|
|
placeholder="http://localhost:8001", |
|
|
info="The URL of your MCP server" |
|
|
) |
|
|
server2_name = gr.Textbox( |
|
|
label="Server Name", |
|
|
placeholder="server_2", |
|
|
info="A friendly name for your MCP server" |
|
|
) |
|
|
server2_type = gr.Dropdown( |
|
|
label="Server Type", |
|
|
choices=["http", "SSE"], |
|
|
value="http", |
|
|
info="The type of MCP server connection" |
|
|
) |
|
|
server2_headers = gr.Textbox( |
|
|
label="Headers (Optional)", |
|
|
type="password", |
|
|
placeholder="Leave empty if not required", |
|
|
info="Headers for the MCP server (if required), example: {'Authorization': 'Bearer 1234567890'}" |
|
|
) |
|
|
|
|
|
|
|
|
server3_group = gr.Group(visible=False) |
|
|
with server3_group: |
|
|
gr.Markdown("#### Server 3") |
|
|
server3_url = gr.Textbox( |
|
|
label="Server URL", |
|
|
placeholder="http://localhost:8002", |
|
|
info="The URL of your MCP server" |
|
|
) |
|
|
server3_name = gr.Textbox( |
|
|
label="Server Name", |
|
|
placeholder="server_3", |
|
|
info="A friendly name for your MCP server" |
|
|
) |
|
|
server3_type = gr.Dropdown( |
|
|
label="Server Type", |
|
|
choices=["http", "SSE"], |
|
|
value="http", |
|
|
info="The type of MCP server connection" |
|
|
) |
|
|
server3_headers = gr.Textbox( |
|
|
label="Headers (Optional)", |
|
|
type="password", |
|
|
placeholder="Leave empty if not required", |
|
|
info="Headers for the MCP server (if required), example: {'Authorization': 'Bearer 1234567890'}" |
|
|
) |
|
|
with gr.Tab("Stdio based servers"): |
|
|
with gr.Group(): |
|
|
gr.Markdown("#### Server 1") |
|
|
server1_command = gr.Textbox( |
|
|
label="Command", |
|
|
placeholder="npx", |
|
|
info="The command to use to run the MCP server, docker, npm, python, etc." |
|
|
) |
|
|
server1_args = gr.Textbox( |
|
|
label="Arguments", |
|
|
placeholder="['-y', '@modelcontextprotocol/server-memory']", |
|
|
info="The arguments to use to run the MCP server" |
|
|
) |
|
|
server1_name = gr.Textbox( |
|
|
label="Server Name", |
|
|
placeholder="server_1", |
|
|
info="A friendly name for your MCP server" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
add_server_btn = gr.Button("+ Add Server", variant="secondary", size="sm") |
|
|
remove_server_btn = gr.Button("- Remove Server", variant="secondary", size="sm", interactive=False) |
|
|
|
|
|
|
|
|
server_count_state = gr.State(1) |
|
|
|
|
|
init_btn = gr.Button("Initialize Agent", variant="primary", size="lg") |
|
|
init_status = gr.Textbox(label="Status", interactive=False, max_lines=3) |
|
|
|
|
|
|
|
|
chat_column = gr.Column(scale=2) |
|
|
with chat_column: |
|
|
with gr.Row(): |
|
|
gr.Markdown("## π¬ Chat with your MCP Agent") |
|
|
config_status = gr.Markdown("", visible=False) |
|
|
|
|
|
chat_interface = gr.Column(visible=False) |
|
|
with chat_interface: |
|
|
chatbot = gr.Chatbot( |
|
|
label="Conversation", |
|
|
height=500, |
|
|
show_copy_button=True, |
|
|
avatar_images=("π€", "π€") |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
msg = gr.Textbox( |
|
|
label="Message", |
|
|
placeholder="Type your message here...", |
|
|
scale=4, |
|
|
lines=2 |
|
|
) |
|
|
send_btn = gr.Button("Send", variant="primary", scale=1) |
|
|
|
|
|
error_display = gr.Textbox( |
|
|
label="Error Messages", |
|
|
visible=False, |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
reset_btn = gr.Button("Reset Conversation", variant="secondary") |
|
|
disconnect_btn = gr.Button("Disconnect Agent", variant="secondary") |
|
|
|
|
|
|
|
|
placeholder = gr.Markdown( |
|
|
"### π Welcome!\n\nPlease configure and initialize your MCP Agent using the sidebar to start chatting.", |
|
|
visible=True |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
add_server_btn.click( |
|
|
fn=add_server, |
|
|
inputs=[server_count_state], |
|
|
outputs=[server_count_state, server2_group, server3_group, add_server_btn, remove_server_btn] |
|
|
) |
|
|
|
|
|
remove_server_btn.click( |
|
|
fn=remove_server, |
|
|
inputs=[server_count_state], |
|
|
outputs=[server_count_state, server2_group, server3_group, add_server_btn, remove_server_btn] |
|
|
) |
|
|
|
|
|
|
|
|
init_btn.click( |
|
|
fn=initialize_agent_wrapper, |
|
|
inputs=[ |
|
|
openai_key, |
|
|
server1_url, server1_name, server1_type, server1_headers, |
|
|
server2_url, server2_name, server2_type, server2_headers, |
|
|
server3_url, server3_name, server3_type, server3_headers, |
|
|
server1_command, server1_args, server1_name |
|
|
], |
|
|
outputs=[chat_interface, placeholder, init_status] |
|
|
) |
|
|
|
|
|
|
|
|
def handle_chat(message): |
|
|
if not message or not message.strip(): |
|
|
return app_instance.chat_history, "Please provide a message.", "" |
|
|
return chat_wrapper(message) |
|
|
|
|
|
send_btn.click( |
|
|
fn=handle_chat, |
|
|
inputs=[msg], |
|
|
outputs=[chatbot, error_display, msg] |
|
|
).then( |
|
|
lambda error: gr.update(visible=bool(error)), |
|
|
inputs=[error_display], |
|
|
outputs=[error_display] |
|
|
) |
|
|
|
|
|
msg.submit( |
|
|
fn=handle_chat, |
|
|
inputs=[msg], |
|
|
outputs=[chatbot, error_display, msg] |
|
|
).then( |
|
|
lambda error: gr.update(visible=bool(error)), |
|
|
inputs=[error_display], |
|
|
outputs=[error_display] |
|
|
) |
|
|
|
|
|
reset_btn.click( |
|
|
fn=reset_wrapper, |
|
|
outputs=[chatbot, error_display] |
|
|
).then( |
|
|
lambda error: gr.update(visible=bool(error)), |
|
|
inputs=[error_display], |
|
|
outputs=[error_display] |
|
|
) |
|
|
|
|
|
disconnect_btn.click( |
|
|
fn=disconnect_wrapper, |
|
|
outputs=[chatbot, chat_interface, placeholder, init_status] |
|
|
) |
|
|
|
|
|
def main(): |
|
|
"""Main entry point for the application""" |
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=False, |
|
|
show_error=True |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|