import os, time, asyncio, subprocess, json from typing import Any, Dict, List from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse import socketio # --- Socket.IO ASGI server (no eventlet) --- sio = socketio.AsyncServer( async_mode="asgi", cors_allowed_origins=[], # <= FIX 403 Chrome extension allow_eio3=True, # <= FIX service worker origin handshake transports=["websocket", "polling"], ) app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] ) asgi = socketio.ASGIApp(sio, other_asgi_app=app) ROOM_HISTORY: Dict[str, List[Dict[str, Any]]] = {} MAX_HISTORY = 50 # === Browser MCP Integration === class BrowserMCPClient: """Client pour communiquer avec le serveur Browser MCP""" def __init__(self): self.process = None self.is_connected = False async def start_server(self): """Démarrer le serveur Browser MCP""" if self.process and self.process.poll() is None: return True try: # Déterminer le chemin Python approprié import sys python_path = sys.executable # Dans HF Spaces, utiliser le Python du système if os.environ.get('SPACE_ID') or os.environ.get('HUGGINGFACE_HUB_CACHE'): python_path = "python" print("🚀 Mode HF Spaces détecté pour Browser MCP") # Démarrer le serveur MCP en tant que subprocess self.process = await asyncio.create_subprocess_exec( python_path, "browser_mcp_server.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=os.path.dirname(__file__) ) # Initialiser la connexion MCP init_msg = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "pagechat", "version": "1.0.0"} } } await self._send_message(init_msg) response = await self._read_message() if response and "result" in response: self.is_connected = True print("✅ Browser MCP connecté") return True else: print("❌ Échec de connexion Browser MCP") return False except Exception as e: print(f"❌ Erreur Browser MCP: {e}") return False async def _send_message(self, message: dict): """Envoyer un message au serveur MCP""" if not self.process or not self.process.stdin: return msg_str = json.dumps(message) + "\n" self.process.stdin.write(msg_str.encode()) await self.process.stdin.drain() async def _read_message(self) -> dict: """Lire une réponse du serveur MCP""" if not self.process or not self.process.stdout: return {} try: line = await asyncio.wait_for(self.process.stdout.readline(), timeout=10.0) if line: return json.loads(line.decode().strip()) except (asyncio.TimeoutError, json.JSONDecodeError) as e: print(f"Erreur lecture MCP: {e}") return {} async def call_tool(self, tool_name: str, args: dict) -> str: """Appeler un outil Browser MCP""" if not self.is_connected: if not await self.start_server(): return "❌ Impossible de démarrer Browser MCP" try: request = { "jsonrpc": "2.0", "id": int(time.time() * 1000), "method": "tools/call", "params": { "name": tool_name, "arguments": args } } await self._send_message(request) response = await self._read_message() if response and "result" in response: content = response["result"].get("content", []) if content and len(content) > 0: return content[0].get("text", "Pas de réponse") return "❌ Erreur lors de l'appel MCP" except Exception as e: return f"❌ Erreur MCP: {str(e)}" # Instance globale du client Browser MCP browser_mcp = BrowserMCPClient() def _add_history(room: str, msg: Dict[str, Any]) -> None: ROOM_HISTORY.setdefault(room, []).append(msg) if len(ROOM_HISTORY[room]) > MAX_HISTORY: ROOM_HISTORY[room] = ROOM_HISTORY[room][-MAX_HISTORY:] @app.get("/") async def root(): return {"ok": True, "msg": "PageChat ASGI Socket.IO up", "port": os.environ.get("PORT", 7860)} @app.get("/test", response_class=HTMLResponse) async def test_page(): return """ SocketIO Chat Test

SocketIO Chat Test

Connecting...
""" @sio.event async def connect(sid, environ): print(f"🔌 Client {sid} connected") # Auto-join global room for new connections await sio.enter_room(sid, "global") if "global" in ROOM_HISTORY: await sio.emit("history", ROOM_HISTORY["global"], room=sid) @sio.event async def disconnect(sid): print(f"❌ Client {sid} disconnected") @sio.event async def join_room(sid, data): print(f"🏠 join_room event from {sid}: {data}") room = (data or {}).get("room") or "global" name = (data or {}).get("name") or "Anon" await sio.enter_room(sid, room) if room in ROOM_HISTORY: await sio.emit("history", ROOM_HISTORY[room], room=sid) sysmsg = {"type": "system", "text": f"{name} joined", "ts": time.time()} _add_history(room, sysmsg) await sio.emit("system", sysmsg, room=room) print(f"📢 System message sent to room {room}: {sysmsg}") @sio.event async def send_message(sid, data): print(f"💬 send_message event from {sid}: {data}") # Accept both dict and plain string: if isinstance(data, str): data = {"text": data} elif not isinstance(data, dict): data = {} room = data.get("room", "global") text = data.get("text", "") name = data.get("name", "Anon") if not text: print(f"⚠️ Empty message from {sid}, ignoring") return # 🤖 Détecter les commandes Browser MCP if text.startswith("/browse ") or text.startswith("/browser ") or text.startswith("/web "): await handle_browser_command(sid, room, name, text) return msg = {"type": "message", "name": name, "text": text, "ts": time.time()} _add_history(room, msg) await sio.emit("message", msg, room=room) print(f"📤 Message sent to room {room}: {msg}") async def handle_browser_command(sid: str, room: str, name: str, text: str): """Gérer les commandes de navigation web avec Browser MCP""" print(f"🌐 Browser command from {name}: {text}") # Extraction des commandes parts = text.split(" ", 2) if len(parts) < 2: error_msg = {"type": "message", "name": "🤖 Browser", "text": "Usage: /browse ou /web ", "ts": time.time()} await sio.emit("message", error_msg, room=room) return command = parts[1].lower() args = parts[2] if len(parts) > 2 else "" try: if command.startswith("http") or text.startswith("/browser "): # Navigation vers URL (supporter /browser et /browse) url = command if command.startswith("http") else command result = await browser_mcp.call_tool("navigate_to_url", {"url": url}) # Auto-capture d'écran après navigation if "✅" in result: screenshot_result = await browser_mcp.call_tool("take_screenshot", {"full_page": True}) result += f"\n\n{screenshot_result}" elif command == "screenshot": result = await browser_mcp.call_tool("take_screenshot", {"full_page": True}) elif command == "text": selector = args if args else None result = await browser_mcp.call_tool("extract_text", {"selector": selector, "clean": True}) elif command == "click": if not args: result = "❌ Usage: /web click " else: result = await browser_mcp.call_tool("click_element", {"selector": args}) elif command == "fill": if "=" not in args: result = "❌ Usage: /web fill =" else: selector, value = args.split("=", 1) result = await browser_mcp.call_tool("fill_input", {"selector": selector.strip(), "value": value.strip()}) elif command == "info": result = await browser_mcp.call_tool("get_page_info", {}) elif command == "search": if not args: result = "❌ Usage: /web search " else: result = await browser_mcp.call_tool("search_text", {"text": args}) elif command == "links": result = await browser_mcp.call_tool("extract_links", {}) elif command == "wait": if not args: result = "❌ Usage: /web wait " else: result = await browser_mcp.call_tool("wait_for_element", {"selector": args}) elif command == "help": result = """🌐 **Commandes Browser MCP disponibles:** • `/browse ` ou `/browser ` - Naviguer vers une URL • `/web screenshot` - Capture d'écran • `/web text [selector]` - Extraire le texte • `/web click ` - Cliquer sur un élément • `/web fill =` - Remplir un champ • `/web info` - Infos de la page • `/web search ` - Rechercher du texte • `/web links` - Extrairer les liens • `/web wait ` - Attendre un élément • `/web help` - Cette aide""" else: result = f"❌ Commande inconnue: {command}. Tapez '/web help' pour l'aide." except Exception as e: import traceback error_details = traceback.format_exc() print(f"❌ Browser MCP Error: {error_details}") result = f"❌ Erreur lors de l'appel MCP: {str(e)}" # Envoyer la réponse response_msg = {"type": "message", "name": "🤖 Browser", "text": result, "ts": time.time()} _add_history(room, response_msg) await sio.emit("message", response_msg, room=room) print(f"🌐 Browser response sent to room {room}") @sio.event async def message(sid, data): print(f"📨 message event from {sid}: {data}") # Alias for backward compatibility await send_message(sid, data) if __name__ == "__main__": import uvicorn port = int(os.environ.get("PORT", 7860)) # HF Spaces utilise le port 7860 print(f"🚀 Server on http://0.0.0.0:{port}") uvicorn.run(asgi, host="0.0.0.0", port=port)