| import gradio as gr |
| import openai |
| import requests |
| import json |
| import asyncio |
| import aiohttp |
| from typing import Dict, Any, List, Tuple |
|
|
| class MCPClient: |
| def __init__(self, server_url: str): |
| self.server_url = server_url.rstrip('/') |
| self.session = None |
|
|
| async def initialize_session(self): |
| if not self.session: |
| self.session = aiohttp.ClientSession() |
|
|
| async def close_session(self): |
| if self.session: |
| await self.session.close() |
| self.session = None |
|
|
| async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]: |
| if arguments is None: |
| arguments = {} |
|
|
| await self.initialize_session() |
|
|
| mcp_request = { |
| "jsonrpc": "2.0", |
| "id": 1, |
| "method": "tools/call", |
| "params": {"name": tool_name, "arguments": arguments} |
| } |
|
|
| try: |
| async with self.session.post(f"{self.server_url}/mcp", json=mcp_request) as response: |
| return await response.json() |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| async def list_tools(self) -> List[Dict[str, Any]]: |
| await self.initialize_session() |
| mcp_request = {"jsonrpc": "2.0", "id": 1, "method": "tools/list"} |
| try: |
| async with self.session.post(f"{self.server_url}/mcp", json=mcp_request) as response: |
| result = await response.json() |
| return result.get("result", {}).get("tools", []) |
| except: |
| return [] |
|
|
| class AIAssistant: |
| def __init__(self, openai_api_key: str, mcp_client: MCPClient): |
| self.openai_client = openai.OpenAI(api_key=openai_api_key) |
| self.mcp_client = mcp_client |
| self.available_tools = [] |
|
|
| async def initialize(self): |
| self.available_tools = await self.mcp_client.list_tools() |
|
|
| def get_system_prompt(self) -> str: |
| tools_description = "\n".join([ |
| f"- {tool['name']}: {tool['description']}" |
| for tool in self.available_tools |
| ]) |
| return f"""You are an AI assistant with access to tools: |
| {tools_description} |
| |
| Use these tools explicitly if user queries require external data. |
| |
| Respond with 'CALL_TOOL: tool_name(parameter=value)' to invoke tools. |
| """ |
|
|
| def extract_tool_calls(self, response: str) -> List[Dict[str, Any]]: |
| tool_calls = [] |
| lines = response.split('\n') |
| for line in lines: |
| if line.startswith('CALL_TOOL:'): |
| tool_part = line[len('CALL_TOOL:'):].strip() |
| tool_name, args = tool_part.split('(', 1) |
| args = args.rstrip(')') |
| arg_dict = {} |
| for arg in args.split(','): |
| key, value = arg.split('=') |
| arg_dict[key.strip()] = value.strip().strip('"\'') |
| tool_calls.append({'name': tool_name.strip(), 'arguments': arg_dict}) |
| return tool_calls |
|
|
| async def process_message(self, user_message: str) -> Tuple[str, str]: |
| messages = [ |
| {"role": "system", "content": self.get_system_prompt()}, |
| {"role": "user", "content": user_message} |
| ] |
| response = self.openai_client.chat.completions.create( |
| model="gpt-3.5-turbo", |
| messages=messages, |
| temperature=0 |
| ) |
| ai_response = response.choices[0].message.content |
| tool_calls = self.extract_tool_calls(ai_response) |
|
|
| tool_info = "" |
| if tool_calls: |
| tool_results = [] |
| for call in tool_calls: |
| result = await self.mcp_client.call_tool(call['name'], call['arguments']) |
| tool_results.append(result) |
| tool_info += f"Called {call['name']}: {result}\n" |
|
|
| |
| final_messages = messages + [ |
| {"role": "assistant", "content": ai_response}, |
| {"role": "user", "content": f"Tool results:\n{json.dumps(tool_results)}"} |
| ] |
|
|
| final_response = self.openai_client.chat.completions.create( |
| model="gpt-3.5-turbo", |
| messages=final_messages, |
| temperature=0 |
| ) |
| return final_response.choices[0].message.content, tool_info |
|
|
| return ai_response, "" |
|
|
|
|
| |
| assistant = None |
|
|
| async def initialize_assistant(openai_key, mcp_url): |
| global assistant |
| if not openai_key or not mcp_url: |
| return "❌ Provide valid OpenAI API key and MCP URL" |
|
|
| mcp_client = MCPClient(mcp_url) |
| assistant = AIAssistant(openai_key, mcp_client) |
| await assistant.initialize() |
| return f"✅ Initialized with {len(assistant.available_tools)} tools" |
|
|
| async def chat_interface(message, history, openai_key, mcp_url): |
| global assistant |
| if not assistant: |
| init_result = await initialize_assistant(openai_key, mcp_url) |
| if "❌" in init_result: |
| history.append([message, init_result]) |
| return history, "" |
|
|
| response, tool_info = await assistant.process_message(message) |
| full_response = f"{response}\n\n{tool_info}" if tool_info else response |
| history.append([message, full_response]) |
| return history, "" |
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("# 🤖 MCP Assistant") |
| chatbot = gr.Chatbot(height=400) |
| msg = gr.Textbox(placeholder="Ask me anything...") |
| openai_key = gr.Textbox(type="password", label="OpenAI API Key") |
| mcp_url = gr.Textbox(label="MCP Server URL") |
|
|
| submit_btn = gr.Button("Send") |
| submit_btn.click( |
| chat_interface, [msg, chatbot, openai_key, mcp_url], [chatbot, msg] |
| ) |
| msg.submit( |
| chat_interface, [msg, chatbot, openai_key, mcp_url], [chatbot, msg] |
| ) |
|
|
| demo.launch() |
|
|
|
|