import asyncio import json import sys import urllib.request import re from http.server import BaseHTTPRequestHandler, HTTPServer import threading import time async def scrape_site(url: str) -> str: req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) with urllib.request.urlopen(req) as response: return response.read().decode('utf-8') async def plaintext(url: str) -> str: html = await scrape_site(url) html = re.sub(r']*>.*?', '', html, flags=re.DOTALL | re.IGNORECASE) html = re.sub(r']*>.*?', '', html, flags=re.DOTALL | re.IGNORECASE) text = re.sub(r'<[^>]+>', '', html) return ' '.join(text.split()) class SimpleMCPServer: def __init__(self): self.tools = {} def register_tool(self, name, func, description=""): self.tools[name] = {"function": func, "description": description} async def handle_request(self, request): try: req = json.loads(request) method = req.get("method") if method == "initialize": return json.dumps({ "jsonrpc": "2.0", "id": req.get("id"), "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {"listChanged": True}}, "serverInfo": {"name": "simple-mcp", "version": "1.0.0"} } }) if method == "tools/list": tools = [{"name": name, "description": info["description"]} for name, info in self.tools.items()] return json.dumps({"jsonrpc": "2.0", "id": req.get("id"), "result": {"tools": tools}}) if method == "tools/call": tool_name = req["params"]["name"] args = req["params"].get("arguments", {}) if tool_name in self.tools: result = await self.tools[tool_name]["function"](**args) return json.dumps({ "jsonrpc": "2.0", "id": req.get("id"), "result": {"content": [{"type": "text", "text": result}]} }) except Exception as e: return json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}}) class MCPRequestHandler(BaseHTTPRequestHandler): def __init__(self, server_instance, *args, **kwargs): self.server_instance = server_instance super().__init__(*args, **kwargs) def do_POST(self): try: content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length).decode('utf-8') loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: response = loop.run_until_complete(self.server_instance.handle_request(post_data)) finally: loop.close() self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.end_headers() self.wfile.write(response.encode('utf-8')) except Exception as e: self.send_response(500) self.send_header('Content-Type', 'application/json') self.end_headers() error_response = json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}}) self.wfile.write(error_response.encode('utf-8')) def do_OPTIONS(self): self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.end_headers() def do_GET(self): if self.path == "/sse": self.send_response(200) self.send_header('Content-Type', 'text/event-stream') self.send_header('Cache-Control', 'no-cache') self.send_header('Connection', 'keep-alive') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() try: for i in range(1000): data = f"data: {{\"message\": \"ping {i}\"}}\n\n" self.wfile.write(data.encode('utf-8')) self.wfile.flush() time.sleep(1) except BrokenPipeError: pass else: self.send_response(404) self.end_headers() def run_http_server(server_instance): def handler_factory(server_instance): def create_handler(*args, **kwargs): return MCPRequestHandler(server_instance, *args, **kwargs) return create_handler httpd = HTTPServer(('0.0.0.0', 7860), handler_factory(server_instance)) print("Simple MCP HTTP Server started on port 7860") httpd.serve_forever() async def main(): server = SimpleMCPServer() server.register_tool("scrape_site", scrape_site, "Get HTML content from a website URL") server.register_tool("plaintext", plaintext, "Get plain text content from a website URL (removes HTML tags, scripts, and styles)") server_thread = threading.Thread(target=run_http_server, args=(server,)) server_thread.daemon = True server_thread.start() try: while True: await asyncio.sleep(1) except KeyboardInterrupt: pass if __name__ == "__main__": asyncio.run(main())