import os, time, asyncio, subprocess, json
from typing import Any, Dict, List
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
import socketio
# --- Socket.IO ASGI server (no eventlet) ---
sio = socketio.AsyncServer(
async_mode="asgi",
cors_allowed_origins=[], # <= FIX 403 Chrome extension
allow_eio3=True, # <= FIX service worker origin handshake
transports=["websocket", "polling"],
)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]
)
asgi = socketio.ASGIApp(sio, other_asgi_app=app)
ROOM_HISTORY: Dict[str, List[Dict[str, Any]]] = {}
MAX_HISTORY = 50
# === Browser MCP Integration ===
class BrowserMCPClient:
"""Client pour communiquer avec le serveur Browser MCP"""
def __init__(self):
self.process = None
self.is_connected = False
async def start_server(self):
"""Démarrer le serveur Browser MCP"""
if self.process and self.process.poll() is None:
return True
try:
# Déterminer le chemin Python approprié
import sys
python_path = sys.executable
# Dans HF Spaces, utiliser le Python du système
if os.environ.get('SPACE_ID') or os.environ.get('HUGGINGFACE_HUB_CACHE'):
python_path = "python"
print("🚀 Mode HF Spaces détecté pour Browser MCP")
# Démarrer le serveur MCP en tant que subprocess
self.process = await asyncio.create_subprocess_exec(
python_path,
"browser_mcp_server.py",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=os.path.dirname(__file__)
)
# Initialiser la connexion MCP
init_msg = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "pagechat", "version": "1.0.0"}
}
}
await self._send_message(init_msg)
response = await self._read_message()
if response and "result" in response:
self.is_connected = True
print("✅ Browser MCP connecté")
return True
else:
print("❌ Échec de connexion Browser MCP")
return False
except Exception as e:
print(f"❌ Erreur Browser MCP: {e}")
return False
async def _send_message(self, message: dict):
"""Envoyer un message au serveur MCP"""
if not self.process or not self.process.stdin:
return
msg_str = json.dumps(message) + "\n"
self.process.stdin.write(msg_str.encode())
await self.process.stdin.drain()
async def _read_message(self) -> dict:
"""Lire une réponse du serveur MCP"""
if not self.process or not self.process.stdout:
return {}
try:
line = await asyncio.wait_for(self.process.stdout.readline(), timeout=10.0)
if line:
return json.loads(line.decode().strip())
except (asyncio.TimeoutError, json.JSONDecodeError) as e:
print(f"Erreur lecture MCP: {e}")
return {}
async def call_tool(self, tool_name: str, args: dict) -> str:
"""Appeler un outil Browser MCP"""
if not self.is_connected:
if not await self.start_server():
return "❌ Impossible de démarrer Browser MCP"
try:
request = {
"jsonrpc": "2.0",
"id": int(time.time() * 1000),
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": args
}
}
await self._send_message(request)
response = await self._read_message()
if response and "result" in response:
content = response["result"].get("content", [])
if content and len(content) > 0:
return content[0].get("text", "Pas de réponse")
return "❌ Erreur lors de l'appel MCP"
except Exception as e:
return f"❌ Erreur MCP: {str(e)}"
# Instance globale du client Browser MCP
browser_mcp = BrowserMCPClient()
def _add_history(room: str, msg: Dict[str, Any]) -> None:
ROOM_HISTORY.setdefault(room, []).append(msg)
if len(ROOM_HISTORY[room]) > MAX_HISTORY:
ROOM_HISTORY[room] = ROOM_HISTORY[room][-MAX_HISTORY:]
@app.get("/")
async def root():
return {"ok": True, "msg": "PageChat ASGI Socket.IO up", "port": os.environ.get("PORT", 7860)}
@app.get("/test", response_class=HTMLResponse)
async def test_page():
return """
SocketIO Chat Test
SocketIO Chat Test
Connecting...
"""
@sio.event
async def connect(sid, environ):
print(f"🔌 Client {sid} connected")
# Auto-join global room for new connections
await sio.enter_room(sid, "global")
if "global" in ROOM_HISTORY:
await sio.emit("history", ROOM_HISTORY["global"], room=sid)
@sio.event
async def disconnect(sid):
print(f"❌ Client {sid} disconnected")
@sio.event
async def join_room(sid, data):
print(f"🏠 join_room event from {sid}: {data}")
room = (data or {}).get("room") or "global"
name = (data or {}).get("name") or "Anon"
await sio.enter_room(sid, room)
if room in ROOM_HISTORY:
await sio.emit("history", ROOM_HISTORY[room], room=sid)
sysmsg = {"type": "system", "text": f"{name} joined", "ts": time.time()}
_add_history(room, sysmsg)
await sio.emit("system", sysmsg, room=room)
print(f"📢 System message sent to room {room}: {sysmsg}")
@sio.event
async def send_message(sid, data):
print(f"💬 send_message event from {sid}: {data}")
# Accept both dict and plain string:
if isinstance(data, str):
data = {"text": data}
elif not isinstance(data, dict):
data = {}
room = data.get("room", "global")
text = data.get("text", "")
name = data.get("name", "Anon")
if not text:
print(f"⚠️ Empty message from {sid}, ignoring")
return
# 🤖 Détecter les commandes Browser MCP
if text.startswith("/browse ") or text.startswith("/browser ") or text.startswith("/web "):
await handle_browser_command(sid, room, name, text)
return
msg = {"type": "message", "name": name, "text": text, "ts": time.time()}
_add_history(room, msg)
await sio.emit("message", msg, room=room)
print(f"📤 Message sent to room {room}: {msg}")
async def handle_browser_command(sid: str, room: str, name: str, text: str):
"""Gérer les commandes de navigation web avec Browser MCP"""
print(f"🌐 Browser command from {name}: {text}")
# Extraction des commandes
parts = text.split(" ", 2)
if len(parts) < 2:
error_msg = {"type": "message", "name": "🤖 Browser", "text": "Usage: /browse ou /web ", "ts": time.time()}
await sio.emit("message", error_msg, room=room)
return
command = parts[1].lower()
args = parts[2] if len(parts) > 2 else ""
try:
if command.startswith("http") or text.startswith("/browser "):
# Navigation vers URL (supporter /browser et /browse)
url = command if command.startswith("http") else command
result = await browser_mcp.call_tool("navigate_to_url", {"url": url})
# Auto-capture d'écran après navigation
if "✅" in result:
screenshot_result = await browser_mcp.call_tool("take_screenshot", {"full_page": True})
result += f"\n\n{screenshot_result}"
elif command == "screenshot":
result = await browser_mcp.call_tool("take_screenshot", {"full_page": True})
elif command == "text":
selector = args if args else None
result = await browser_mcp.call_tool("extract_text", {"selector": selector, "clean": True})
elif command == "click":
if not args:
result = "❌ Usage: /web click "
else:
result = await browser_mcp.call_tool("click_element", {"selector": args})
elif command == "fill":
if "=" not in args:
result = "❌ Usage: /web fill ="
else:
selector, value = args.split("=", 1)
result = await browser_mcp.call_tool("fill_input", {"selector": selector.strip(), "value": value.strip()})
elif command == "info":
result = await browser_mcp.call_tool("get_page_info", {})
elif command == "search":
if not args:
result = "❌ Usage: /web search "
else:
result = await browser_mcp.call_tool("search_text", {"text": args})
elif command == "links":
result = await browser_mcp.call_tool("extract_links", {})
elif command == "wait":
if not args:
result = "❌ Usage: /web wait "
else:
result = await browser_mcp.call_tool("wait_for_element", {"selector": args})
elif command == "help":
result = """🌐 **Commandes Browser MCP disponibles:**
• `/browse ` ou `/browser ` - Naviguer vers une URL
• `/web screenshot` - Capture d'écran
• `/web text [selector]` - Extraire le texte
• `/web click ` - Cliquer sur un élément
• `/web fill =` - Remplir un champ
• `/web info` - Infos de la page
• `/web search ` - Rechercher du texte
• `/web links` - Extrairer les liens
• `/web wait ` - Attendre un élément
• `/web help` - Cette aide"""
else:
result = f"❌ Commande inconnue: {command}. Tapez '/web help' pour l'aide."
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"❌ Browser MCP Error: {error_details}")
result = f"❌ Erreur lors de l'appel MCP: {str(e)}"
# Envoyer la réponse
response_msg = {"type": "message", "name": "🤖 Browser", "text": result, "ts": time.time()}
_add_history(room, response_msg)
await sio.emit("message", response_msg, room=room)
print(f"🌐 Browser response sent to room {room}")
@sio.event
async def message(sid, data):
print(f"📨 message event from {sid}: {data}")
# Alias for backward compatibility
await send_message(sid, data)
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", 7860)) # HF Spaces utilise le port 7860
print(f"🚀 Server on http://0.0.0.0:{port}")
uvicorn.run(asgi, host="0.0.0.0", port=port)