File size: 5,735 Bytes
2cf9499
 
 
 
 
 
 
68c317d
2cf9499
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68c317d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2cf9499
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68c317d
2cf9499
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import asyncio
import json
import sys
import urllib.request
import re
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
import time

async def scrape_site(url: str) -> str:
    req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
    with urllib.request.urlopen(req) as response:
        return response.read().decode('utf-8')

async def plaintext(url: str) -> str:
    html = await scrape_site(url)
    html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
    html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.IGNORECASE)
    text = re.sub(r'<[^>]+>', '', html)
    return ' '.join(text.split())

class SimpleMCPServer:
    def __init__(self):
        self.tools = {}

    def register_tool(self, name, func, description=""):
        self.tools[name] = {"function": func, "description": description}

    async def handle_request(self, request):
        try:
            req = json.loads(request)
            method = req.get("method")
            if method == "initialize":
                return json.dumps({
                    "jsonrpc": "2.0",
                    "id": req.get("id"),
                    "result": {
                        "protocolVersion": "2024-11-05",
                        "capabilities": {"tools": {"listChanged": True}},
                        "serverInfo": {"name": "simple-mcp", "version": "1.0.0"}
                    }
                })
            if method == "tools/list":
                tools = [{"name": name, "description": info["description"]} for name, info in self.tools.items()]
                return json.dumps({"jsonrpc": "2.0", "id": req.get("id"), "result": {"tools": tools}})
            if method == "tools/call":
                tool_name = req["params"]["name"]
                args = req["params"].get("arguments", {})
                if tool_name in self.tools:
                    result = await self.tools[tool_name]["function"](**args)
                    return json.dumps({
                        "jsonrpc": "2.0",
                        "id": req.get("id"),
                        "result": {"content": [{"type": "text", "text": result}]}
                    })
        except Exception as e:
            return json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}})

class MCPRequestHandler(BaseHTTPRequestHandler):
    def __init__(self, server_instance, *args, **kwargs):
        self.server_instance = server_instance
        super().__init__(*args, **kwargs)

    def do_POST(self):
        try:
            content_length = int(self.headers['Content-Length'])
            post_data = self.rfile.read(content_length).decode('utf-8')
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            try:
                response = loop.run_until_complete(self.server_instance.handle_request(post_data))
            finally:
                loop.close()
            self.send_response(200)
            self.send_header('Content-Type', 'application/json')
            self.send_header('Access-Control-Allow-Origin', '*')
            self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
            self.send_header('Access-Control-Allow-Headers', 'Content-Type')
            self.end_headers()
            self.wfile.write(response.encode('utf-8'))
        except Exception as e:
            self.send_response(500)
            self.send_header('Content-Type', 'application/json')
            self.end_headers()
            error_response = json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}})
            self.wfile.write(error_response.encode('utf-8'))

    def do_OPTIONS(self):
        self.send_response(200)
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
        self.end_headers()

    def do_GET(self):
        if self.path == "/sse":
            self.send_response(200)
            self.send_header('Content-Type', 'text/event-stream')
            self.send_header('Cache-Control', 'no-cache')
            self.send_header('Connection', 'keep-alive')
            self.send_header('Access-Control-Allow-Origin', '*')
            self.end_headers()
            try:
                for i in range(1000):
                    data = f"data: {{\"message\": \"ping {i}\"}}\n\n"
                    self.wfile.write(data.encode('utf-8'))
                    self.wfile.flush()
                    time.sleep(1)
            except BrokenPipeError:
                pass
        else:
            self.send_response(404)
            self.end_headers()

def run_http_server(server_instance):
    def handler_factory(server_instance):
        def create_handler(*args, **kwargs):
            return MCPRequestHandler(server_instance, *args, **kwargs)
        return create_handler
    httpd = HTTPServer(('0.0.0.0', 7860), handler_factory(server_instance))
    print("Simple MCP HTTP Server started on port 7860")
    httpd.serve_forever()

async def main():
    server = SimpleMCPServer()
    server.register_tool("scrape_site", scrape_site, "Get HTML content from a website URL")
    server.register_tool("plaintext", plaintext, "Get plain text content from a website URL (removes HTML tags, scripts, and styles)")
    server_thread = threading.Thread(target=run_http_server, args=(server,))
    server_thread.daemon = True
    server_thread.start()
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        pass

if __name__ == "__main__":
    asyncio.run(main())