Spaces:
Sleeping
Sleeping
File size: 5,735 Bytes
2cf9499 68c317d 2cf9499 68c317d 2cf9499 68c317d 2cf9499 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
import asyncio
import json
import sys
import urllib.request
import re
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
import time
async def scrape_site(url: str) -> str:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req) as response:
return response.read().decode('utf-8')
async def plaintext(url: str) -> str:
html = await scrape_site(url)
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'<[^>]+>', '', html)
return ' '.join(text.split())
class SimpleMCPServer:
def __init__(self):
self.tools = {}
def register_tool(self, name, func, description=""):
self.tools[name] = {"function": func, "description": description}
async def handle_request(self, request):
try:
req = json.loads(request)
method = req.get("method")
if method == "initialize":
return json.dumps({
"jsonrpc": "2.0",
"id": req.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {"listChanged": True}},
"serverInfo": {"name": "simple-mcp", "version": "1.0.0"}
}
})
if method == "tools/list":
tools = [{"name": name, "description": info["description"]} for name, info in self.tools.items()]
return json.dumps({"jsonrpc": "2.0", "id": req.get("id"), "result": {"tools": tools}})
if method == "tools/call":
tool_name = req["params"]["name"]
args = req["params"].get("arguments", {})
if tool_name in self.tools:
result = await self.tools[tool_name]["function"](**args)
return json.dumps({
"jsonrpc": "2.0",
"id": req.get("id"),
"result": {"content": [{"type": "text", "text": result}]}
})
except Exception as e:
return json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}})
class MCPRequestHandler(BaseHTTPRequestHandler):
def __init__(self, server_instance, *args, **kwargs):
self.server_instance = server_instance
super().__init__(*args, **kwargs)
def do_POST(self):
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
response = loop.run_until_complete(self.server_instance.handle_request(post_data))
finally:
loop.close()
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
self.wfile.write(response.encode('utf-8'))
except Exception as e:
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self.end_headers()
error_response = json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}})
self.wfile.write(error_response.encode('utf-8'))
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def do_GET(self):
if self.path == "/sse":
self.send_response(200)
self.send_header('Content-Type', 'text/event-stream')
self.send_header('Cache-Control', 'no-cache')
self.send_header('Connection', 'keep-alive')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
try:
for i in range(1000):
data = f"data: {{\"message\": \"ping {i}\"}}\n\n"
self.wfile.write(data.encode('utf-8'))
self.wfile.flush()
time.sleep(1)
except BrokenPipeError:
pass
else:
self.send_response(404)
self.end_headers()
def run_http_server(server_instance):
def handler_factory(server_instance):
def create_handler(*args, **kwargs):
return MCPRequestHandler(server_instance, *args, **kwargs)
return create_handler
httpd = HTTPServer(('0.0.0.0', 7860), handler_factory(server_instance))
print("Simple MCP HTTP Server started on port 7860")
httpd.serve_forever()
async def main():
server = SimpleMCPServer()
server.register_tool("scrape_site", scrape_site, "Get HTML content from a website URL")
server.register_tool("plaintext", plaintext, "Get plain text content from a website URL (removes HTML tags, scripts, and styles)")
server_thread = threading.Thread(target=run_http_server, args=(server,))
server_thread.daemon = True
server_thread.start()
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
asyncio.run(main())
|