|
|
import gradio as gr |
|
|
import openai |
|
|
import requests |
|
|
import json |
|
|
import asyncio |
|
|
import aiohttp |
|
|
from typing import Dict, Any, List, Tuple |
|
|
|
|
|
class MCPClient: |
|
|
def __init__(self, server_url: str): |
|
|
self.server_url = server_url.rstrip('/') |
|
|
self.session = None |
|
|
|
|
|
async def initialize_session(self): |
|
|
if not self.session: |
|
|
self.session = aiohttp.ClientSession() |
|
|
|
|
|
async def close_session(self): |
|
|
if self.session: |
|
|
await self.session.close() |
|
|
self.session = None |
|
|
|
|
|
async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]: |
|
|
if arguments is None: |
|
|
arguments = {} |
|
|
|
|
|
await self.initialize_session() |
|
|
|
|
|
mcp_request = { |
|
|
"jsonrpc": "2.0", |
|
|
"id": 1, |
|
|
"method": "tools/call", |
|
|
"params": {"name": tool_name, "arguments": arguments} |
|
|
} |
|
|
|
|
|
try: |
|
|
async with self.session.post(f"{self.server_url}/mcp", json=mcp_request) as response: |
|
|
return await response.json() |
|
|
except Exception as e: |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
async def list_tools(self) -> List[Dict[str, Any]]: |
|
|
await self.initialize_session() |
|
|
mcp_request = {"jsonrpc": "2.0", "id": 1, "method": "tools/list"} |
|
|
try: |
|
|
async with self.session.post(f"{self.server_url}/mcp", json=mcp_request) as response: |
|
|
result = await response.json() |
|
|
return result.get("result", {}).get("tools", []) |
|
|
except: |
|
|
return [] |
|
|
|
|
|
class AIAssistant: |
|
|
def __init__(self, openai_api_key: str, mcp_client: MCPClient): |
|
|
self.openai_client = openai.OpenAI(api_key=openai_api_key) |
|
|
self.mcp_client = mcp_client |
|
|
self.available_tools = [] |
|
|
|
|
|
async def initialize(self): |
|
|
self.available_tools = await self.mcp_client.list_tools() |
|
|
|
|
|
async def process_message(self, user_message: str) -> Tuple[str, str]: |
|
|
try: |
|
|
response = self.openai_client.chat.completions.create( |
|
|
model="gpt-3.5-turbo", |
|
|
messages=[{"role": "user", "content": user_message}], |
|
|
temperature=0.7, |
|
|
max_tokens=1000 |
|
|
) |
|
|
return response.choices[0].message.content, "" |
|
|
except Exception as e: |
|
|
return f"β Error: {str(e)}", "" |
|
|
|
|
|
|
|
|
|
|
|
assistant = None |
|
|
|
|
|
async def initialize_assistant(openai_key, mcp_url): |
|
|
global assistant |
|
|
if not openai_key or not mcp_url: |
|
|
return "β Provide valid OpenAI API key and MCP URL" |
|
|
|
|
|
mcp_client = MCPClient(mcp_url) |
|
|
assistant = AIAssistant(openai_key, mcp_client) |
|
|
await assistant.initialize() |
|
|
return f"β
Initialized with {len(assistant.available_tools)} tools" |
|
|
|
|
|
def chat_interface(message, history, openai_key, mcp_url): |
|
|
global assistant |
|
|
if not assistant: |
|
|
init_result = asyncio.run(initialize_assistant(openai_key, mcp_url)) |
|
|
if "β" in init_result: |
|
|
history.append([message, init_result]) |
|
|
return history, "" |
|
|
|
|
|
response, _ = asyncio.run(assistant.process_message(message)) |
|
|
history.append([message, response]) |
|
|
return history, "" |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# π€ MCP Assistant") |
|
|
chatbot = gr.Chatbot(height=400) |
|
|
msg = gr.Textbox(placeholder="Ask me anything...") |
|
|
openai_key = gr.Textbox(type="password", label="OpenAI API Key") |
|
|
mcp_url = gr.Textbox(label="MCP Server URL") |
|
|
|
|
|
submit_btn = gr.Button("Send") |
|
|
submit_btn.click(chat_interface, [msg, chatbot, openai_key, mcp_url], [chatbot, msg]) |
|
|
msg.submit(chat_interface, [msg, chatbot, openai_key, mcp_url], [chatbot, msg]) |
|
|
|
|
|
demo.launch() |
|
|
|