Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| MCP Server with sample tools for Hugging Face Spaces deployment | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import sys | |
| from typing import Any, Dict, List, Optional, Union | |
| from datetime import datetime | |
| import os | |
| # MCP Server implementation | |
| class MCPServer: | |
| def __init__(self, name: str = "hf-mcp-server", version: str = "1.0.0"): | |
| self.name = name | |
| self.version = version | |
| self.tools = {} | |
| self.resources = {} | |
| self.setup_logging() | |
| def setup_logging(self): | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| self.logger = logging.getLogger(self.name) | |
| def tool(self, name: str, description: str = "", parameters: Dict = None): | |
| """Decorator to register tools""" | |
| def decorator(func): | |
| self.tools[name] = { | |
| "function": func, | |
| "description": description, | |
| "parameters": parameters or {} | |
| } | |
| return func | |
| return decorator | |
| def resource(self, uri: str, name: str = "", description: str = ""): | |
| """Decorator to register resources""" | |
| def decorator(func): | |
| self.resources[uri] = { | |
| "function": func, | |
| "name": name, | |
| "description": description | |
| } | |
| return func | |
| return decorator | |
| async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]: | |
| """Handle incoming MCP requests""" | |
| try: | |
| method = request.get("method") | |
| params = request.get("params", {}) | |
| request_id = request.get("id", 1) | |
| if method == "initialize": | |
| return await self.handle_initialize(request_id, params) | |
| elif method == "tools/list": | |
| return await self.handle_list_tools(request_id) | |
| elif method == "tools/call": | |
| return await self.handle_call_tool(request_id, params) | |
| elif method == "resources/list": | |
| return await self.handle_list_resources(request_id) | |
| elif method == "resources/read": | |
| return await self.handle_read_resource(request_id, params) | |
| else: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "error": { | |
| "code": -32601, | |
| "message": f"Method not found: {method}" | |
| } | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error handling request: {e}") | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request.get("id", 1), | |
| "error": { | |
| "code": -32603, | |
| "message": f"Internal error: {str(e)}" | |
| } | |
| } | |
| async def handle_initialize(self, request_id: int, params: Dict) -> Dict: | |
| """Handle initialization request""" | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "result": { | |
| "protocolVersion": "2024-11-05", | |
| "capabilities": { | |
| "tools": {}, | |
| "resources": {} | |
| }, | |
| "serverInfo": { | |
| "name": self.name, | |
| "version": self.version | |
| } | |
| } | |
| } | |
| async def handle_list_tools(self, request_id: int) -> Dict: | |
| """Handle tools list request""" | |
| tools_list = [] | |
| for name, tool_info in self.tools.items(): | |
| tools_list.append({ | |
| "name": name, | |
| "description": tool_info["description"], | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": tool_info["parameters"], | |
| "required": list(tool_info["parameters"].keys()) if tool_info["parameters"] else [] | |
| } | |
| }) | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "result": { | |
| "tools": tools_list | |
| } | |
| } | |
| async def handle_call_tool(self, request_id: int, params: Dict) -> Dict: | |
| """Handle tool call request""" | |
| tool_name = params.get("name") | |
| arguments = params.get("arguments", {}) | |
| if tool_name not in self.tools: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "error": { | |
| "code": -32602, | |
| "message": f"Tool not found: {tool_name}" | |
| } | |
| } | |
| try: | |
| tool_func = self.tools[tool_name]["function"] | |
| result = await tool_func(**arguments) | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "result": { | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": str(result) | |
| } | |
| ] | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "error": { | |
| "code": -32603, | |
| "message": f"Tool execution error: {str(e)}" | |
| } | |
| } | |
| async def handle_list_resources(self, request_id: int) -> Dict: | |
| """Handle resources list request""" | |
| resources_list = [] | |
| for uri, resource_info in self.resources.items(): | |
| resources_list.append({ | |
| "uri": uri, | |
| "name": resource_info["name"], | |
| "description": resource_info["description"], | |
| "mimeType": "text/plain" | |
| }) | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "result": { | |
| "resources": resources_list | |
| } | |
| } | |
| async def handle_read_resource(self, request_id: int, params: Dict) -> Dict: | |
| """Handle resource read request""" | |
| uri = params.get("uri") | |
| if uri not in self.resources: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "error": { | |
| "code": -32602, | |
| "message": f"Resource not found: {uri}" | |
| } | |
| } | |
| try: | |
| resource_func = self.resources[uri]["function"] | |
| content = await resource_func() | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "result": { | |
| "contents": [ | |
| { | |
| "uri": uri, | |
| "mimeType": "text/plain", | |
| "text": str(content) | |
| } | |
| ] | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "error": { | |
| "code": -32603, | |
| "message": f"Resource read error: {str(e)}" | |
| } | |
| } | |
| # Create server instance | |
| server = MCPServer("hf-mcp-server", "1.0.0") | |
| # Sample Tools | |
| async def echo_tool(text: str) -> str: | |
| """Simple echo tool""" | |
| return f"Echo: {text}" | |
| async def current_time_tool() -> str: | |
| """Get current time""" | |
| return f"Current time: {datetime.now().isoformat()}" | |
| async def calculate_tool(expression: str) -> str: | |
| """Simple calculator tool""" | |
| try: | |
| # Basic safety check - only allow certain characters | |
| allowed_chars = set("0123456789+-*/()%. ") | |
| if not all(c in allowed_chars for c in expression): | |
| return "Error: Invalid characters in expression" | |
| result = eval(expression) | |
| return f"Result: {expression} = {result}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| async def word_count_tool(text: str) -> str: | |
| """Count words in text""" | |
| words = text.split() | |
| chars = len(text) | |
| chars_no_spaces = len(text.replace(" ", "")) | |
| return f"Text analysis:\n- Words: {len(words)}\n- Characters: {chars}\n- Characters (no spaces): {chars_no_spaces}" | |
| async def reverse_text_tool(text: str) -> str: | |
| """Reverse text""" | |
| return f"Reversed: {text[::-1]}" | |
| # Sample Resources | |
| async def server_info_resource() -> str: | |
| """Server information resource""" | |
| return f""" | |
| MCP Server Information: | |
| - Name: {server.name} | |
| - Version: {server.version} | |
| - Tools available: {len(server.tools)} | |
| - Resources available: {len(server.resources)} | |
| - Started at: {datetime.now().isoformat()} | |
| - Environment: Hugging Face Spaces | |
| """ | |
| async def capabilities_resource() -> str: | |
| """Server capabilities resource""" | |
| capabilities = "Available Tools:\n\n" | |
| for name, tool_info in server.tools.items(): | |
| capabilities += f"- {name}: {tool_info['description']}\n" | |
| capabilities += "\nAvailable Resources:\n\n" | |
| for uri, resource_info in server.resources.items(): | |
| capabilities += f"- {uri}: {resource_info['description']}\n" | |
| return capabilities | |
| # Main execution function for stdio mode | |
| async def main(): | |
| """Main function for stdio-based MCP server""" | |
| server.logger.info("Starting MCP server in stdio mode") | |
| try: | |
| while True: | |
| # Read from stdin | |
| line = await asyncio.get_event_loop().run_in_executor( | |
| None, sys.stdin.readline | |
| ) | |
| if not line: | |
| break | |
| try: | |
| request = json.loads(line.strip()) | |
| response = await server.handle_request(request) | |
| print(json.dumps(response)) | |
| sys.stdout.flush() | |
| except json.JSONDecodeError: | |
| error_response = { | |
| "jsonrpc": "2.0", | |
| "id": None, | |
| "error": { | |
| "code": -32700, | |
| "message": "Parse error" | |
| } | |
| } | |
| print(json.dumps(error_response)) | |
| sys.stdout.flush() | |
| except KeyboardInterrupt: | |
| server.logger.info("Server shutting down") | |
| except Exception as e: | |
| server.logger.error(f"Server error: {e}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |