Philippe
­ƒñû Ajout Browser MCP - Navigation web intelligente
39b89ab
import os, time, asyncio, subprocess, json
from typing import Any, Dict, List
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
import socketio
# --- Socket.IO ASGI server (no eventlet) ---
sio = socketio.AsyncServer(
async_mode="asgi",
cors_allowed_origins=[], # <= FIX 403 Chrome extension
allow_eio3=True, # <= FIX service worker origin handshake
transports=["websocket", "polling"],
)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]
)
asgi = socketio.ASGIApp(sio, other_asgi_app=app)
ROOM_HISTORY: Dict[str, List[Dict[str, Any]]] = {}
MAX_HISTORY = 50
# === Browser MCP Integration ===
class BrowserMCPClient:
"""Client pour communiquer avec le serveur Browser MCP"""
def __init__(self):
self.process = None
self.is_connected = False
async def start_server(self):
"""Démarrer le serveur Browser MCP"""
if self.process and self.process.poll() is None:
return True
try:
# Déterminer le chemin Python approprié
import sys
python_path = sys.executable
# Dans HF Spaces, utiliser le Python du système
if os.environ.get('SPACE_ID') or os.environ.get('HUGGINGFACE_HUB_CACHE'):
python_path = "python"
print("🚀 Mode HF Spaces détecté pour Browser MCP")
# Démarrer le serveur MCP en tant que subprocess
self.process = await asyncio.create_subprocess_exec(
python_path,
"browser_mcp_server.py",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=os.path.dirname(__file__)
)
# Initialiser la connexion MCP
init_msg = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "pagechat", "version": "1.0.0"}
}
}
await self._send_message(init_msg)
response = await self._read_message()
if response and "result" in response:
self.is_connected = True
print("✅ Browser MCP connecté")
return True
else:
print("❌ Échec de connexion Browser MCP")
return False
except Exception as e:
print(f"❌ Erreur Browser MCP: {e}")
return False
async def _send_message(self, message: dict):
"""Envoyer un message au serveur MCP"""
if not self.process or not self.process.stdin:
return
msg_str = json.dumps(message) + "\n"
self.process.stdin.write(msg_str.encode())
await self.process.stdin.drain()
async def _read_message(self) -> dict:
"""Lire une réponse du serveur MCP"""
if not self.process or not self.process.stdout:
return {}
try:
line = await asyncio.wait_for(self.process.stdout.readline(), timeout=10.0)
if line:
return json.loads(line.decode().strip())
except (asyncio.TimeoutError, json.JSONDecodeError) as e:
print(f"Erreur lecture MCP: {e}")
return {}
async def call_tool(self, tool_name: str, args: dict) -> str:
"""Appeler un outil Browser MCP"""
if not self.is_connected:
if not await self.start_server():
return "❌ Impossible de démarrer Browser MCP"
try:
request = {
"jsonrpc": "2.0",
"id": int(time.time() * 1000),
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": args
}
}
await self._send_message(request)
response = await self._read_message()
if response and "result" in response:
content = response["result"].get("content", [])
if content and len(content) > 0:
return content[0].get("text", "Pas de réponse")
return "❌ Erreur lors de l'appel MCP"
except Exception as e:
return f"❌ Erreur MCP: {str(e)}"
# Instance globale du client Browser MCP
browser_mcp = BrowserMCPClient()
def _add_history(room: str, msg: Dict[str, Any]) -> None:
ROOM_HISTORY.setdefault(room, []).append(msg)
if len(ROOM_HISTORY[room]) > MAX_HISTORY:
ROOM_HISTORY[room] = ROOM_HISTORY[room][-MAX_HISTORY:]
@app.get("/")
async def root():
return {"ok": True, "msg": "PageChat ASGI Socket.IO up", "port": os.environ.get("PORT", 7860)}
@app.get("/test", response_class=HTMLResponse)
async def test_page():
return """
<!DOCTYPE html>
<html>
<head>
<title>SocketIO Chat Test</title>
<style>
body { font-family: Arial, sans-serif; padding: 20px; }
#messages { border: 1px solid #ccc; height: 300px; overflow-y: auto; padding: 10px; margin: 10px 0; background: #f9f9f9; }
.message { margin: 5px 0; }
.system { color: #666; font-style: italic; }
input[type="text"] { width: 300px; padding: 5px; }
button { padding: 5px 10px; margin: 2px; }
#status { font-weight: bold; margin: 10px 0; }
</style>
</head>
<body>
<h1>SocketIO Chat Test</h1>
<div id="status">Connecting...</div>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="Type a message..." />
<button onclick="sendMessage()">Send</button>
<button onclick="joinRoom()">Join Room</button>
<button onclick="clearMessages()">Clear</button>
<script src="https://cdn.socket.io/4.7.5/socket.io.min.js"></script>
<script>
const socket = io(window.location.origin, {
transports: ["websocket", "polling"]
});
const statusDiv = document.getElementById('status');
const messagesDiv = document.getElementById('messages');
const messageInput = document.getElementById('messageInput');
socket.on("connect", () => {
console.log("✅ CONNECTED");
statusDiv.textContent = "Connected! ID: " + socket.id;
statusDiv.style.color = "green";
// Auto-join room when connected
setTimeout(() => {
joinRoom();
}, 100);
});
socket.on("disconnect", () => {
console.log("❌ DISCONNECTED");
statusDiv.textContent = "Disconnected";
statusDiv.style.color = "red";
});
socket.on("message", msg => {
console.log("📩 Message received:", msg);
addMessage(`${msg.name}: ${msg.text}`, 'message');
});
socket.on("system", msg => {
console.log("📢 System message:", msg);
addMessage(msg.text, 'system');
});
socket.on("history", messages => {
console.log("📚 History received:", messages);
messages.forEach(msg => {
if (msg.type === 'message') {
addMessage(`${msg.name}: ${msg.text}`, 'message');
} else if (msg.type === 'system') {
addMessage(msg.text, 'system');
}
});
});
function addMessage(text, type) {
const div = document.createElement('div');
div.className = `message ${type}`;
div.textContent = new Date().toLocaleTimeString() + ' - ' + text;
messagesDiv.appendChild(div);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
function sendMessage() {
const text = messageInput.value.trim();
if (text) {
console.log("📤 Sending message:", text);
socket.emit("message", {
text: text,
name: "TestUser_" + Math.floor(Math.random()*1000),
room: "global"
});
messageInput.value = '';
}
}
function joinRoom() {
const userName = "TestUser_" + Math.floor(Math.random()*1000);
console.log("🏠 Joining room as", userName);
socket.emit("join_room", {
room: "global",
name: userName
});
}
function clearMessages() {
messagesDiv.innerHTML = '';
}
messageInput.addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
</script>
</body>
</html>
"""
@sio.event
async def connect(sid, environ):
print(f"🔌 Client {sid} connected")
# Auto-join global room for new connections
await sio.enter_room(sid, "global")
if "global" in ROOM_HISTORY:
await sio.emit("history", ROOM_HISTORY["global"], room=sid)
@sio.event
async def disconnect(sid):
print(f"❌ Client {sid} disconnected")
@sio.event
async def join_room(sid, data):
print(f"🏠 join_room event from {sid}: {data}")
room = (data or {}).get("room") or "global"
name = (data or {}).get("name") or "Anon"
await sio.enter_room(sid, room)
if room in ROOM_HISTORY:
await sio.emit("history", ROOM_HISTORY[room], room=sid)
sysmsg = {"type": "system", "text": f"{name} joined", "ts": time.time()}
_add_history(room, sysmsg)
await sio.emit("system", sysmsg, room=room)
print(f"📢 System message sent to room {room}: {sysmsg}")
@sio.event
async def send_message(sid, data):
print(f"💬 send_message event from {sid}: {data}")
# Accept both dict and plain string:
if isinstance(data, str):
data = {"text": data}
elif not isinstance(data, dict):
data = {}
room = data.get("room", "global")
text = data.get("text", "")
name = data.get("name", "Anon")
if not text:
print(f"⚠️ Empty message from {sid}, ignoring")
return
# 🤖 Détecter les commandes Browser MCP
if text.startswith("/browse ") or text.startswith("/browser ") or text.startswith("/web "):
await handle_browser_command(sid, room, name, text)
return
msg = {"type": "message", "name": name, "text": text, "ts": time.time()}
_add_history(room, msg)
await sio.emit("message", msg, room=room)
print(f"📤 Message sent to room {room}: {msg}")
async def handle_browser_command(sid: str, room: str, name: str, text: str):
"""Gérer les commandes de navigation web avec Browser MCP"""
print(f"🌐 Browser command from {name}: {text}")
# Extraction des commandes
parts = text.split(" ", 2)
if len(parts) < 2:
error_msg = {"type": "message", "name": "🤖 Browser", "text": "Usage: /browse <url> ou /web <commande>", "ts": time.time()}
await sio.emit("message", error_msg, room=room)
return
command = parts[1].lower()
args = parts[2] if len(parts) > 2 else ""
try:
if command.startswith("http") or text.startswith("/browser "):
# Navigation vers URL (supporter /browser et /browse)
url = command if command.startswith("http") else command
result = await browser_mcp.call_tool("navigate_to_url", {"url": url})
# Auto-capture d'écran après navigation
if "✅" in result:
screenshot_result = await browser_mcp.call_tool("take_screenshot", {"full_page": True})
result += f"\n\n{screenshot_result}"
elif command == "screenshot":
result = await browser_mcp.call_tool("take_screenshot", {"full_page": True})
elif command == "text":
selector = args if args else None
result = await browser_mcp.call_tool("extract_text", {"selector": selector, "clean": True})
elif command == "click":
if not args:
result = "❌ Usage: /web click <selector>"
else:
result = await browser_mcp.call_tool("click_element", {"selector": args})
elif command == "fill":
if "=" not in args:
result = "❌ Usage: /web fill <selector>=<value>"
else:
selector, value = args.split("=", 1)
result = await browser_mcp.call_tool("fill_input", {"selector": selector.strip(), "value": value.strip()})
elif command == "info":
result = await browser_mcp.call_tool("get_page_info", {})
elif command == "search":
if not args:
result = "❌ Usage: /web search <text>"
else:
result = await browser_mcp.call_tool("search_text", {"text": args})
elif command == "links":
result = await browser_mcp.call_tool("extract_links", {})
elif command == "wait":
if not args:
result = "❌ Usage: /web wait <selector>"
else:
result = await browser_mcp.call_tool("wait_for_element", {"selector": args})
elif command == "help":
result = """🌐 **Commandes Browser MCP disponibles:**
• `/browse <url>` ou `/browser <url>` - Naviguer vers une URL
• `/web screenshot` - Capture d'écran
• `/web text [selector]` - Extraire le texte
• `/web click <selector>` - Cliquer sur un élément
• `/web fill <selector>=<value>` - Remplir un champ
• `/web info` - Infos de la page
• `/web search <text>` - Rechercher du texte
• `/web links` - Extrairer les liens
• `/web wait <selector>` - Attendre un élément
• `/web help` - Cette aide"""
else:
result = f"❌ Commande inconnue: {command}. Tapez '/web help' pour l'aide."
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"❌ Browser MCP Error: {error_details}")
result = f"❌ Erreur lors de l'appel MCP: {str(e)}"
# Envoyer la réponse
response_msg = {"type": "message", "name": "🤖 Browser", "text": result, "ts": time.time()}
_add_history(room, response_msg)
await sio.emit("message", response_msg, room=room)
print(f"🌐 Browser response sent to room {room}")
@sio.event
async def message(sid, data):
print(f"📨 message event from {sid}: {data}")
# Alias for backward compatibility
await send_message(sid, data)
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", 7860)) # HF Spaces utilise le port 7860
print(f"🚀 Server on http://0.0.0.0:{port}")
uvicorn.run(asgi, host="0.0.0.0", port=port)