| import gradio as gr |
| from huggingface_hub import InferenceClient |
| import json |
| import asyncio |
| import subprocess |
| import os |
| from typing import Optional, Dict, Any, List |
|
|
|
|
| class MCPContextManager: |
| """Model Context Protocol Manager for handling context and tools""" |
| |
| def __init__(self): |
| self.context = [] |
| self.tools = {} |
| self.resources = {} |
| self.mcp_servers = {} |
| |
| def add_context(self, role: str, content: str, metadata: Optional[Dict] = None): |
| """Add context entry following MCP specification""" |
| entry = { |
| "role": role, |
| "content": content, |
| "timestamp": None, |
| "metadata": metadata or {} |
| } |
| self.context.append(entry) |
| return entry |
| |
| def register_tool(self, name: str, description: str, parameters: Dict, handler): |
| """Register a tool following MCP tool specification""" |
| self.tools[name] = { |
| "name": name, |
| "description": description, |
| "parameters": parameters, |
| "handler": handler |
| } |
| |
| def register_resource(self, uri: str, name: str, mime_type: str, content: Any): |
| """Register a resource following MCP resource specification""" |
| self.resources[uri] = { |
| "uri": uri, |
| "name": name, |
| "mimeType": mime_type, |
| "content": content |
| } |
| |
| def register_mcp_server(self, server_name: str, config: Dict): |
| """Register an MCP server configuration""" |
| self.mcp_servers[server_name] = config |
| |
| def get_context_window(self, max_tokens: int = 4096) -> List[Dict]: |
| """Get context window within token limits""" |
| return self.context[-10:] |
| |
| async def call_tool(self, tool_name: str, arguments: Dict) -> Any: |
| """Execute a registered tool""" |
| if tool_name not in self.tools: |
| raise ValueError(f"Tool {tool_name} not found") |
| |
| tool = self.tools[tool_name] |
| return await tool["handler"](arguments) |
| |
| def get_available_tools(self) -> List[Dict]: |
| """Get list of available tools in MCP format""" |
| return [ |
| { |
| "name": tool["name"], |
| "description": tool["description"], |
| "parameters": tool["parameters"] |
| } |
| for tool in self.tools.values() |
| ] |
| |
| def export_context(self) -> str: |
| """Export context in MCP-compatible JSON format""" |
| return json.dumps({ |
| "context": self.context, |
| "tools": self.get_available_tools(), |
| "resources": list(self.resources.keys()), |
| "mcp_servers": list(self.mcp_servers.keys()) |
| }, indent=2) |
|
|
|
|
| |
| mcp_manager = MCPContextManager() |
|
|
| |
| mcp_manager.register_mcp_server("postgres-full", { |
| "command": "npx", |
| "args": [ |
| "-y", |
| "mcp-postgres-full-access", |
| "postgresql://neondb_owner:npg_oGg8yphr6FeZ@ep-summer-art-a1jpcb05-pooler.ap-southeast-1.aws.neon.tech/neondb?sslmode=require&channel_binding=require" |
| ], |
| "env": { |
| "TRANSACTION_TIMEOUT_MS": "60000", |
| "MAX_CONCURRENT_TRANSACTIONS": "5", |
| "PG_STATEMENT_TIMEOUT_MS": "30000" |
| } |
| }) |
|
|
| mcp_manager.register_mcp_server("filesystem", { |
| "command": "npx", |
| "args": [ |
| "-y", |
| "@modelcontextprotocol/server-filesystem", |
| "C:\\Users\\CHAN\\Documents\\PROJECTS" |
| ] |
| }) |
|
|
|
|
| |
| async def calculator_handler(args): |
| """Calculator tool handler""" |
| operation = args.get("operation") |
| a = float(args.get("a", 0)) |
| b = float(args.get("b", 0)) |
| |
| operations = { |
| "add": a + b, |
| "subtract": a - b, |
| "multiply": a * b, |
| "divide": a / b if b != 0 else "Error: Division by zero" |
| } |
| return operations.get(operation, "Invalid operation") |
|
|
|
|
| async def memory_handler(args): |
| """Memory/context storage handler""" |
| action = args.get("action") |
| key = args.get("key") |
| value = args.get("value") |
| |
| if action == "store": |
| mcp_manager.register_resource( |
| uri=f"memory://{key}", |
| name=key, |
| mime_type="text/plain", |
| content=value |
| ) |
| return f"Stored {key}" |
| elif action == "retrieve": |
| resource = mcp_manager.resources.get(f"memory://{key}") |
| return resource["content"] if resource else "Not found" |
| return "Invalid action" |
|
|
|
|
| async def web_search_handler(args): |
| """Simulated web search handler""" |
| query = args.get("query") |
| return f"Search results for: {query} (MCP tool simulation)" |
|
|
|
|
| async def git_command_handler(args): |
| """Git command execution handler""" |
| command = args.get("command", "") |
| repo_path = args.get("repo_path", os.getcwd()) |
| |
| try: |
| |
| allowed_commands = ["status", "log", "branch", "diff", "commit", "push", "pull", "add"] |
| cmd_parts = command.split() |
| |
| if not cmd_parts or cmd_parts[0] not in allowed_commands: |
| return {"error": "Command not allowed or invalid"} |
| |
| |
| result = subprocess.run( |
| ["git"] + cmd_parts, |
| cwd=repo_path, |
| capture_output=True, |
| text=True, |
| timeout=30 |
| ) |
| |
| return { |
| "stdout": result.stdout, |
| "stderr": result.stderr, |
| "returncode": result.returncode, |
| "success": result.returncode == 0 |
| } |
| except subprocess.TimeoutExpired: |
| return {"error": "Command timed out"} |
| except Exception as e: |
| return {"error": str(e)} |
|
|
|
|
| async def postgres_query_handler(args): |
| """PostgreSQL query handler (simulated - would connect to your Neon DB)""" |
| query = args.get("query", "") |
| operation = args.get("operation", "SELECT") |
| |
| |
| |
| return { |
| "message": f"Would execute {operation} query: {query}", |
| "note": "Connect to: postgresql://neondb_owner@ep-summer-art-a1jpcb05-pooler.ap-southeast-1.aws.neon.tech/neondb", |
| "status": "simulated" |
| } |
|
|
|
|
| async def filesystem_handler(args): |
| """Filesystem operations handler""" |
| operation = args.get("operation") |
| path = args.get("path", "") |
| content = args.get("content", "") |
| |
| base_path = r"C:\Users\CHAN\Documents\PROJECTS" |
| full_path = os.path.join(base_path, path) |
| |
| try: |
| if operation == "read": |
| if os.path.exists(full_path): |
| with open(full_path, 'r', encoding='utf-8') as f: |
| return {"content": f.read(), "success": True} |
| return {"error": "File not found", "success": False} |
| |
| elif operation == "list": |
| if os.path.exists(full_path): |
| items = os.listdir(full_path) |
| return {"items": items, "success": True} |
| return {"error": "Directory not found", "success": False} |
| |
| elif operation == "write": |
| with open(full_path, 'w', encoding='utf-8') as f: |
| f.write(content) |
| return {"message": "File written successfully", "success": True} |
| |
| elif operation == "search": |
| pattern = args.get("pattern", "") |
| results = [] |
| for root, dirs, files in os.walk(full_path if os.path.isdir(full_path) else base_path): |
| for file in files: |
| if pattern.lower() in file.lower(): |
| results.append(os.path.join(root, file)) |
| return {"results": results, "count": len(results), "success": True} |
| |
| return {"error": "Invalid operation", "success": False} |
| |
| except Exception as e: |
| return {"error": str(e), "success": False} |
|
|
|
|
| |
| mcp_manager.register_tool( |
| name="calculator", |
| description="Perform basic arithmetic operations", |
| parameters={ |
| "type": "object", |
| "properties": { |
| "operation": {"type": "string", "enum": ["add", "subtract", "multiply", "divide"]}, |
| "a": {"type": "number"}, |
| "b": {"type": "number"} |
| }, |
| "required": ["operation", "a", "b"] |
| }, |
| handler=calculator_handler |
| ) |
|
|
| mcp_manager.register_tool( |
| name="memory", |
| description="Store and retrieve information in memory", |
| parameters={ |
| "type": "object", |
| "properties": { |
| "action": {"type": "string", "enum": ["store", "retrieve"]}, |
| "key": {"type": "string"}, |
| "value": {"type": "string"} |
| }, |
| "required": ["action", "key"] |
| }, |
| handler=memory_handler |
| ) |
|
|
| mcp_manager.register_tool( |
| name="web_search", |
| description="Search the web for information", |
| parameters={ |
| "type": "object", |
| "properties": { |
| "query": {"type": "string"} |
| }, |
| "required": ["query"] |
| }, |
| handler=web_search_handler |
| ) |
|
|
| mcp_manager.register_tool( |
| name="git", |
| description="Execute git commands (status, log, commit, push, pull, add, branch, diff)", |
| parameters={ |
| "type": "object", |
| "properties": { |
| "command": {"type": "string", "description": "Git command (e.g., 'status', 'commit -am \"message\"', 'push')"}, |
| "repo_path": {"type": "string", "description": "Repository path (optional)"} |
| }, |
| "required": ["command"] |
| }, |
| handler=git_command_handler |
| ) |
|
|
| mcp_manager.register_tool( |
| name="postgres", |
| description="Execute PostgreSQL queries on Neon database", |
| parameters={ |
| "type": "object", |
| "properties": { |
| "query": {"type": "string"}, |
| "operation": {"type": "string", "enum": ["SELECT", "INSERT", "UPDATE", "DELETE", "CREATE", "ALTER"]} |
| }, |
| "required": ["query"] |
| }, |
| handler=postgres_query_handler |
| ) |
|
|
| mcp_manager.register_tool( |
| name="filesystem", |
| description="Perform filesystem operations (read, write, list, search)", |
| parameters={ |
| "type": "object", |
| "properties": { |
| "operation": {"type": "string", "enum": ["read", "write", "list", "search"]}, |
| "path": {"type": "string"}, |
| "content": {"type": "string"}, |
| "pattern": {"type": "string"} |
| }, |
| "required": ["operation", "path"] |
| }, |
| handler=filesystem_handler |
| ) |
|
|
|
|
| def respond( |
| message, |
| history: list[dict[str, str]], |
| system_message, |
| max_tokens, |
| temperature, |
| top_p, |
| enable_mcp, |
| hf_token: gr.OAuthToken, |
| ): |
| """ |
| Enhanced respond function with MCP support |
| """ |
| |
| if enable_mcp: |
| mcp_manager.add_context("user", message) |
| |
| client = InferenceClient(token=hf_token.token, model="openai/gpt-oss-20b") |
|
|
| |
| messages = [{"role": "system", "content": system_message}] |
| |
| if enable_mcp: |
| |
| tools_info = "\n\nAvailable MCP Tools:\n" + json.dumps(mcp_manager.get_available_tools(), indent=2) |
| messages[0]["content"] += tools_info |
|
|
| messages.extend(history) |
| messages.append({"role": "user", "content": message}) |
|
|
| response = "" |
|
|
| for msg in client.chat_completion( |
| messages, |
| max_tokens=max_tokens, |
| stream=True, |
| temperature=temperature, |
| top_p=top_p, |
| ): |
| choices = msg.choices |
| token = "" |
| if len(choices) and choices[0].delta.content: |
| token = choices[0].delta.content |
|
|
| response += token |
| yield response |
| |
| |
| if enable_mcp: |
| mcp_manager.add_context("assistant", response) |
|
|
|
|
| def call_mcp_tool(tool_name: str, arguments_json: str): |
| """Interface for calling MCP tools from UI""" |
| try: |
| arguments = json.loads(arguments_json) if arguments_json else {} |
| result = asyncio.run(mcp_manager.call_tool(tool_name, arguments)) |
| return json.dumps({"success": True, "result": result}, indent=2) |
| except Exception as e: |
| return json.dumps({"success": False, "error": str(e)}, indent=2) |
|
|
|
|
| def export_mcp_context(): |
| """Export current MCP context""" |
| return mcp_manager.export_context() |
|
|
|
|
| def get_mcp_tools_list(): |
| """Get formatted list of available MCP tools""" |
| tools = mcp_manager.get_available_tools() |
| return json.dumps(tools, indent=2) |
|
|
|
|
| def get_mcp_servers(): |
| """Get configured MCP servers""" |
| return json.dumps(mcp_manager.mcp_servers, indent=2) |
|
|
|
|
| """ |
| Enhanced Gradio interface with Model Context Protocol (MCP) support |
| """ |
| chatbot = gr.ChatInterface( |
| respond, |
| type="messages", |
| additional_inputs=[ |
| gr.Textbox( |
| value="You are a powerful AI assistant with MCP tools including: Git operations, PostgreSQL database access (Neon), filesystem operations, calculator, memory, and web search.", |
| label="System message", |
| lines=3 |
| ), |
| gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), |
| gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), |
| gr.Slider( |
| minimum=0.1, |
| maximum=1.0, |
| value=0.95, |
| step=0.05, |
| label="Top-p (nucleus sampling)", |
| ), |
| gr.Checkbox(value=True, label="Enable MCP (Model Context Protocol)"), |
| ], |
| ) |
|
|
| with gr.Blocks(title="AI Chat with Full MCP Support") as demo: |
| gr.Markdown("# π€ AI Chatbot with Full Model Context Protocol (MCP)") |
| gr.Markdown("β¨ Connected to PostgreSQL (Neon), Filesystem, Git, and more!") |
| |
| with gr.Tabs(): |
| with gr.Tab("π¬ Chat"): |
| with gr.Sidebar(): |
| gr.LoginButton() |
| chatbot.render() |
| |
| with gr.Tab("π οΈ MCP Tools"): |
| gr.Markdown("### Available MCP Tools") |
| tools_display = gr.Code(language="json", label="Registered Tools") |
| refresh_tools_btn = gr.Button("π Refresh Tools List") |
| |
| gr.Markdown("### π― Quick Actions") |
| with gr.Row(): |
| with gr.Column(): |
| gr.Markdown("**Git Commands**") |
| git_cmd = gr.Textbox(label="Git Command", placeholder="status") |
| git_btn = gr.Button("Execute Git") |
| |
| with gr.Column(): |
| gr.Markdown("**Filesystem**") |
| fs_operation = gr.Dropdown(["list", "read", "search"], label="Operation") |
| fs_path = gr.Textbox(label="Path", placeholder="PROJECTS") |
| fs_btn = gr.Button("Execute FS") |
| |
| gr.Markdown("### π§ Custom Tool Call") |
| with gr.Row(): |
| tool_name_input = gr.Textbox(label="Tool Name", placeholder="calculator") |
| tool_args_input = gr.Code( |
| label="Arguments (JSON)", |
| value='{"operation": "add", "a": 5, "b": 3}', |
| language="json", |
| lines=5 |
| ) |
| call_tool_btn = gr.Button("βΆοΈ Call Tool") |
| tool_result = gr.Code(language="json", label="Tool Result", lines=10) |
| |
| |
| refresh_tools_btn.click(fn=get_mcp_tools_list, outputs=tools_display) |
| |
| git_btn.click( |
| fn=lambda cmd: call_mcp_tool("git", json.dumps({"command": cmd})), |
| inputs=[git_cmd], |
| outputs=tool_result |
| ) |
| |
| fs_btn.click( |
| fn=lambda op, path: call_mcp_tool("filesystem", json.dumps({"operation": op, "path": path})), |
| inputs=[fs_operation, fs_path], |
| outputs=tool_result |
| ) |
| |
| call_tool_btn.click( |
| fn=call_mcp_tool, |
| inputs=[tool_name_input, tool_args_input], |
| outputs=tool_result |
| ) |
| |
| with gr.Tab("π MCP Servers"): |
| gr.Markdown("### Configured MCP Servers") |
| servers_display = gr.Code(language="json", label="MCP Server Configuration") |
| refresh_servers_btn = gr.Button("π Refresh Servers") |
| |
| refresh_servers_btn.click(fn=get_mcp_servers, outputs=servers_display) |
| |
| gr.Markdown(""" |
| #### Connected Servers: |
| - **postgres-full**: Neon PostgreSQL Database |
| - **filesystem**: Local file system access (C:\\Users\\CHAN\\Documents\\PROJECTS) |
| """) |
| |
| with gr.Tab("π MCP Context"): |
| gr.Markdown("### Export MCP Context") |
| gr.Markdown("View and export the current context window managed by MCP") |
| export_btn = gr.Button("π₯ Export Context") |
| context_display = gr.Code(language="json", label="MCP Context", lines=15) |
| |
| export_btn.click(fn=export_mcp_context, outputs=context_display) |
| |
| with gr.Tab("βΉοΈ About MCP"): |
| gr.Markdown(""" |
| ### Model Context Protocol (MCP) |
| |
| This AI assistant is powered by MCP with full access to: |
| |
| #### ποΈ Database Access |
| - **PostgreSQL (Neon)**: Full database operations |
| - Connection: `ep-summer-art-a1jpcb05-pooler.ap-southeast-1.aws.neon.tech` |
| |
| #### π Filesystem Access |
| - **Root**: `C:\\Users\\CHAN\\Documents\\PROJECTS` |
| - Operations: read, write, list, search |
| |
| #### π§ Available Tools: |
| 1. **Git**: Execute git commands (status, commit, push, pull, etc.) |
| 2. **PostgreSQL**: Query and manage your Neon database |
| 3. **Filesystem**: Read/write files, list directories, search |
| 4. **Calculator**: Perform arithmetic operations |
| 5. **Memory**: Store and retrieve session data |
| 6. **Web Search**: Search for information |
| |
| #### π Example Commands: |
| |
| **Git:** |
| ```json |
| { |
| "tool": "git", |
| "arguments": { |
| "command": "status" |
| } |
| } |
| ``` |
| |
| **Filesystem:** |
| ```json |
| { |
| "tool": "filesystem", |
| "arguments": { |
| "operation": "list", |
| "path": "PROJECTS" |
| } |
| } |
| ``` |
| |
| **PostgreSQL:** |
| ```json |
| { |
| "tool": "postgres", |
| "arguments": { |
| "query": "SELECT * FROM users LIMIT 10", |
| "operation": "SELECT" |
| } |
| } |
| ``` |
| """) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|
|
|