mcp / app.py
GamerC0der's picture
Update app.py
68c317d verified
raw
history blame
5.74 kB
import asyncio
import json
import sys
import urllib.request
import re
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
import time
async def scrape_site(url: str) -> str:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req) as response:
return response.read().decode('utf-8')
async def plaintext(url: str) -> str:
html = await scrape_site(url)
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'<[^>]+>', '', html)
return ' '.join(text.split())
class SimpleMCPServer:
def __init__(self):
self.tools = {}
def register_tool(self, name, func, description=""):
self.tools[name] = {"function": func, "description": description}
async def handle_request(self, request):
try:
req = json.loads(request)
method = req.get("method")
if method == "initialize":
return json.dumps({
"jsonrpc": "2.0",
"id": req.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {"listChanged": True}},
"serverInfo": {"name": "simple-mcp", "version": "1.0.0"}
}
})
if method == "tools/list":
tools = [{"name": name, "description": info["description"]} for name, info in self.tools.items()]
return json.dumps({"jsonrpc": "2.0", "id": req.get("id"), "result": {"tools": tools}})
if method == "tools/call":
tool_name = req["params"]["name"]
args = req["params"].get("arguments", {})
if tool_name in self.tools:
result = await self.tools[tool_name]["function"](**args)
return json.dumps({
"jsonrpc": "2.0",
"id": req.get("id"),
"result": {"content": [{"type": "text", "text": result}]}
})
except Exception as e:
return json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}})
class MCPRequestHandler(BaseHTTPRequestHandler):
def __init__(self, server_instance, *args, **kwargs):
self.server_instance = server_instance
super().__init__(*args, **kwargs)
def do_POST(self):
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
response = loop.run_until_complete(self.server_instance.handle_request(post_data))
finally:
loop.close()
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
self.wfile.write(response.encode('utf-8'))
except Exception as e:
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self.end_headers()
error_response = json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}})
self.wfile.write(error_response.encode('utf-8'))
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def do_GET(self):
if self.path == "/sse":
self.send_response(200)
self.send_header('Content-Type', 'text/event-stream')
self.send_header('Cache-Control', 'no-cache')
self.send_header('Connection', 'keep-alive')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
try:
for i in range(1000):
data = f"data: {{\"message\": \"ping {i}\"}}\n\n"
self.wfile.write(data.encode('utf-8'))
self.wfile.flush()
time.sleep(1)
except BrokenPipeError:
pass
else:
self.send_response(404)
self.end_headers()
def run_http_server(server_instance):
def handler_factory(server_instance):
def create_handler(*args, **kwargs):
return MCPRequestHandler(server_instance, *args, **kwargs)
return create_handler
httpd = HTTPServer(('0.0.0.0', 7860), handler_factory(server_instance))
print("Simple MCP HTTP Server started on port 7860")
httpd.serve_forever()
async def main():
server = SimpleMCPServer()
server.register_tool("scrape_site", scrape_site, "Get HTML content from a website URL")
server.register_tool("plaintext", plaintext, "Get plain text content from a website URL (removes HTML tags, scripts, and styles)")
server_thread = threading.Thread(target=run_http_server, args=(server,))
server_thread.daemon = True
server_thread.start()
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
asyncio.run(main())