Spaces:
Sleeping
Sleeping
| import asyncio | |
| import json | |
| import sys | |
| import urllib.request | |
| import re | |
| from http.server import BaseHTTPRequestHandler, HTTPServer | |
| import threading | |
| import time | |
| async def scrape_site(url: str) -> str: | |
| req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) | |
| with urllib.request.urlopen(req) as response: | |
| return response.read().decode('utf-8') | |
| async def plaintext(url: str) -> str: | |
| html = await scrape_site(url) | |
| html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE) | |
| html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.IGNORECASE) | |
| text = re.sub(r'<[^>]+>', '', html) | |
| return ' '.join(text.split()) | |
| class SimpleMCPServer: | |
| def __init__(self): | |
| self.tools = {} | |
| def register_tool(self, name, func, description=""): | |
| self.tools[name] = {"function": func, "description": description} | |
| async def handle_request(self, request): | |
| try: | |
| req = json.loads(request) | |
| method = req.get("method") | |
| if method == "initialize": | |
| return json.dumps({ | |
| "jsonrpc": "2.0", | |
| "id": req.get("id"), | |
| "result": { | |
| "protocolVersion": "2024-11-05", | |
| "capabilities": {"tools": {"listChanged": True}}, | |
| "serverInfo": {"name": "simple-mcp", "version": "1.0.0"} | |
| } | |
| }) | |
| if method == "tools/list": | |
| tools = [{"name": name, "description": info["description"]} for name, info in self.tools.items()] | |
| return json.dumps({"jsonrpc": "2.0", "id": req.get("id"), "result": {"tools": tools}}) | |
| if method == "tools/call": | |
| tool_name = req["params"]["name"] | |
| args = req["params"].get("arguments", {}) | |
| if tool_name in self.tools: | |
| result = await self.tools[tool_name]["function"](**args) | |
| return json.dumps({ | |
| "jsonrpc": "2.0", | |
| "id": req.get("id"), | |
| "result": {"content": [{"type": "text", "text": result}]} | |
| }) | |
| except Exception as e: | |
| return json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}}) | |
| class MCPRequestHandler(BaseHTTPRequestHandler): | |
| def __init__(self, server_instance, *args, **kwargs): | |
| self.server_instance = server_instance | |
| super().__init__(*args, **kwargs) | |
| def do_POST(self): | |
| try: | |
| content_length = int(self.headers['Content-Length']) | |
| post_data = self.rfile.read(content_length).decode('utf-8') | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| response = loop.run_until_complete(self.server_instance.handle_request(post_data)) | |
| finally: | |
| loop.close() | |
| self.send_response(200) | |
| self.send_header('Content-Type', 'application/json') | |
| self.send_header('Access-Control-Allow-Origin', '*') | |
| self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS') | |
| self.send_header('Access-Control-Allow-Headers', 'Content-Type') | |
| self.end_headers() | |
| self.wfile.write(response.encode('utf-8')) | |
| except Exception as e: | |
| self.send_response(500) | |
| self.send_header('Content-Type', 'application/json') | |
| self.end_headers() | |
| error_response = json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}}) | |
| self.wfile.write(error_response.encode('utf-8')) | |
| def do_OPTIONS(self): | |
| self.send_response(200) | |
| self.send_header('Access-Control-Allow-Origin', '*') | |
| self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS') | |
| self.send_header('Access-Control-Allow-Headers', 'Content-Type') | |
| self.end_headers() | |
| def do_GET(self): | |
| if self.path == "/sse": | |
| self.send_response(200) | |
| self.send_header('Content-Type', 'text/event-stream') | |
| self.send_header('Cache-Control', 'no-cache') | |
| self.send_header('Connection', 'keep-alive') | |
| self.send_header('Access-Control-Allow-Origin', '*') | |
| self.end_headers() | |
| try: | |
| for i in range(1000): | |
| data = f"data: {{\"message\": \"ping {i}\"}}\n\n" | |
| self.wfile.write(data.encode('utf-8')) | |
| self.wfile.flush() | |
| time.sleep(1) | |
| except BrokenPipeError: | |
| pass | |
| else: | |
| self.send_response(404) | |
| self.end_headers() | |
| def run_http_server(server_instance): | |
| def handler_factory(server_instance): | |
| def create_handler(*args, **kwargs): | |
| return MCPRequestHandler(server_instance, *args, **kwargs) | |
| return create_handler | |
| httpd = HTTPServer(('0.0.0.0', 7860), handler_factory(server_instance)) | |
| print("Simple MCP HTTP Server started on port 7860") | |
| httpd.serve_forever() | |
| async def main(): | |
| server = SimpleMCPServer() | |
| server.register_tool("scrape_site", scrape_site, "Get HTML content from a website URL") | |
| server.register_tool("plaintext", plaintext, "Get plain text content from a website URL (removes HTML tags, scripts, and styles)") | |
| server_thread = threading.Thread(target=run_http_server, args=(server,)) | |
| server_thread.daemon = True | |
| server_thread.start() | |
| try: | |
| while True: | |
| await asyncio.sleep(1) | |
| except KeyboardInterrupt: | |
| pass | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |