mcp / app.py
GamerC0der's picture
Rename mcp.py to app.py
6fc0401 verified
raw
history blame
5.05 kB
import asyncio
import json
import sys
import urllib.request
import re
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
async def scrape_site(url: str) -> str:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req) as response:
return response.read().decode('utf-8')
async def plaintext(url: str) -> str:
html = await scrape_site(url)
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'<[^>]+>', '', html)
return ' '.join(text.split())
class SimpleMCPServer:
def __init__(self):
self.tools = {}
def register_tool(self, name, func, description=""):
self.tools[name] = {"function": func, "description": description}
async def handle_request(self, request):
try:
req = json.loads(request)
method = req.get("method")
if method == "initialize":
return json.dumps({
"jsonrpc": "2.0",
"id": req.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {"listChanged": True}},
"serverInfo": {"name": "simple-mcp", "version": "1.0.0"}
}
})
if method == "tools/list":
tools = [{"name": name, "description": info["description"]} for name, info in self.tools.items()]
return json.dumps({"jsonrpc": "2.0", "id": req.get("id"), "result": {"tools": tools}})
if method == "tools/call":
tool_name = req["params"]["name"]
args = req["params"].get("arguments", {})
if tool_name in self.tools:
result = await self.tools[tool_name]["function"](**args)
return json.dumps({
"jsonrpc": "2.0",
"id": req.get("id"),
"result": {"content": [{"type": "text", "text": result}]}
})
except Exception as e:
return json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}})
class MCPRequestHandler(BaseHTTPRequestHandler):
def __init__(self, server_instance, *args, **kwargs):
self.server_instance = server_instance
super().__init__(*args, **kwargs)
def do_POST(self):
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
response = loop.run_until_complete(self.server_instance.handle_request(post_data))
finally:
loop.close()
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
self.wfile.write(response.encode('utf-8'))
except Exception as e:
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self.end_headers()
error_response = json.dumps({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}})
self.wfile.write(error_response.encode('utf-8'))
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def run_http_server(server_instance):
def handler_factory(server_instance):
def create_handler(*args, **kwargs):
return MCPRequestHandler(server_instance, *args, **kwargs)
return create_handler
httpd = HTTPServer(('0.0.0.0', 7860), handler_factory(server_instance))
print("Simple MCP HTTP Server started on port 7860")
print("Available tools: scrape_site, plaintext")
httpd.serve_forever()
async def main():
server = SimpleMCPServer()
server.register_tool("scrape_site", scrape_site, "Get HTML content from a website URL")
server.register_tool("plaintext", plaintext, "Get plain text content from a website URL (removes HTML tags, scripts, and styles)")
server_thread = threading.Thread(target=run_http_server, args=(server,))
server_thread.daemon = True
server_thread.start()
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("Server shutting down...")
if __name__ == "__main__":
asyncio.run(main())