mcp / app.py
GamerC0der's picture
Update app.py
68c317d verified
import asyncio
import json
import sys
import urllib.request
import re
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
import time
async def scrape_site(url: str) -> str:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req) as response:
return response.read().decode('utf-8')
async def plaintext(url: str) -> str:
html = await scrape_site(url)
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'<[^>]+>', '', html)
return ' '.join(text.split())
class SimpleMCPServer:
def __init__(self):
self.tools = {}
def register_tool(self, name, func, description=""):
self.tools[name] = {"function": func, "description": description}
async def handle_request(self, request):
try:
req = json.loads(request)
method = req.get("method")
if method == "initialize":
return json.dumps({
"jsonrpc": "2.0",
"id": req.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {"listChanged": True}},
"serverInfo": {"name": "simple-mcp", "version": "1.0.0"}
}
})
if method == "tools/list":
tools = [{"name": name, "description": info["description"]} for name, info in self.tools.items()]
return json.dumps({"jsonrpc": "2.0", "id": req.get("id"), "result": {"tools": tools}})
if method == "tools/call":
tool_name = req["params"]["name"]
args = req["params"].get("arguments", {})
if tool_name in self.tools:
result = await self.tools[tool_name]["function"](**args)
return json.dumps({
"jsonrpc": "2.0",
"id": req.get("id"),
"result": {"content": [{"type": "text", "text": result}]}
})
except Exception as e:
return json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}})
class MCPRequestHandler(BaseHTTPRequestHandler):
def __init__(self, server_instance, *args, **kwargs):
self.server_instance = server_instance
super().__init__(*args, **kwargs)
def do_POST(self):
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
response = loop.run_until_complete(self.server_instance.handle_request(post_data))
finally:
loop.close()
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
self.wfile.write(response.encode('utf-8'))
except Exception as e:
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self.end_headers()
error_response = json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}})
self.wfile.write(error_response.encode('utf-8'))
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def do_GET(self):
if self.path == "/sse":
self.send_response(200)
self.send_header('Content-Type', 'text/event-stream')
self.send_header('Cache-Control', 'no-cache')
self.send_header('Connection', 'keep-alive')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
try:
for i in range(1000):
data = f"data: {{\"message\": \"ping {i}\"}}\n\n"
self.wfile.write(data.encode('utf-8'))
self.wfile.flush()
time.sleep(1)
except BrokenPipeError:
pass
else:
self.send_response(404)
self.end_headers()
def run_http_server(server_instance):
def handler_factory(server_instance):
def create_handler(*args, **kwargs):
return MCPRequestHandler(server_instance, *args, **kwargs)
return create_handler
httpd = HTTPServer(('0.0.0.0', 7860), handler_factory(server_instance))
print("Simple MCP HTTP Server started on port 7860")
httpd.serve_forever()
async def main():
server = SimpleMCPServer()
server.register_tool("scrape_site", scrape_site, "Get HTML content from a website URL")
server.register_tool("plaintext", plaintext, "Get plain text content from a website URL (removes HTML tags, scripts, and styles)")
server_thread = threading.Thread(target=run_http_server, args=(server,))
server_thread.daemon = True
server_thread.start()
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
asyncio.run(main())