Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Browser MCP Server - Automatisation web pour PageChat | |
| Permet aux IA d'interagir avec des pages web, capturer des écrans, extraire du contenu | |
| """ | |
| import asyncio | |
| import base64 | |
| import json | |
| import logging | |
| from typing import Optional, Dict, Any, List | |
| from dataclasses import dataclass | |
| import mcp.server.stdio | |
| import mcp.types as types | |
| from mcp.server import NotificationOptions, Server | |
| from mcp.server.models import InitializationOptions | |
| import mcp.server.stdio | |
| from playwright.async_api import async_playwright, Browser, Page, BrowserContext | |
| from bs4 import BeautifulSoup | |
| import re | |
| from urllib.parse import urljoin, urlparse | |
| # Configuration du logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("browser-mcp") | |
| class BrowserState: | |
| """État global du navigateur""" | |
| browser: Optional[Browser] = None | |
| context: Optional[BrowserContext] = None | |
| page: Optional[Page] = None | |
| playwright = None | |
| # Instance globale | |
| browser_state = BrowserState() | |
| # Serveur MCP | |
| server = Server("browser-mcp") | |
| async def handle_list_tools() -> list[types.Tool]: | |
| """Liste des outils disponibles""" | |
| return [ | |
| types.Tool( | |
| name="navigate_to_url", | |
| description="Naviguer vers une URL spécifique", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "url": {"type": "string", "description": "URL à visiter"}, | |
| "wait_for": {"type": "string", "description": "Sélecteur CSS à attendre (optionnel)"} | |
| }, | |
| "required": ["url"] | |
| }, | |
| ), | |
| types.Tool( | |
| name="take_screenshot", | |
| description="Capturer une capture d'écran de la page actuelle", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "full_page": {"type": "boolean", "description": "Capture complète de la page", "default": True}, | |
| "element_selector": {"type": "string", "description": "Sélecteur CSS d'un élément spécifique"} | |
| } | |
| }, | |
| ), | |
| types.Tool( | |
| name="extract_text", | |
| description="Extraire le texte visible de la page ou d'un élément", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "selector": {"type": "string", "description": "Sélecteur CSS (optionnel, toute la page si vide)"}, | |
| "clean": {"type": "boolean", "description": "Nettoyer le texte extrait", "default": True} | |
| } | |
| }, | |
| ), | |
| types.Tool( | |
| name="click_element", | |
| description="Cliquer sur un élément de la page", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "selector": {"type": "string", "description": "Sélecteur CSS de l'élément à cliquer"} | |
| }, | |
| "required": ["selector"] | |
| }, | |
| ), | |
| types.Tool( | |
| name="fill_input", | |
| description="Remplir un champ de saisie", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "selector": {"type": "string", "description": "Sélecteur CSS du champ"}, | |
| "value": {"type": "string", "description": "Valeur à saisir"} | |
| }, | |
| "required": ["selector", "value"] | |
| }, | |
| ), | |
| types.Tool( | |
| name="get_page_info", | |
| description="Obtenir les informations de base de la page (titre, URL, description)", | |
| inputSchema={"type": "object"}, | |
| ), | |
| types.Tool( | |
| name="wait_for_element", | |
| description="Attendre qu'un élément apparaisse sur la page", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "selector": {"type": "string", "description": "Sélecteur CSS à attendre"}, | |
| "timeout": {"type": "number", "description": "Timeout en millisecondes", "default": 10000} | |
| }, | |
| "required": ["selector"] | |
| }, | |
| ), | |
| types.Tool( | |
| name="search_text", | |
| description="Rechercher du texte sur la page actuelle", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "text": {"type": "string", "description": "Texte à rechercher"}, | |
| "case_sensitive": {"type": "boolean", "description": "Recherche sensible à la casse", "default": False} | |
| }, | |
| "required": ["text"] | |
| }, | |
| ), | |
| types.Tool( | |
| name="extract_links", | |
| description="Extraire tous les liens de la page", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "filter_domain": {"type": "string", "description": "Filtrer par domaine (optionnel)"} | |
| } | |
| }, | |
| ), | |
| types.Tool( | |
| name="get_element_info", | |
| description="Obtenir des informations sur un élément spécifique", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "selector": {"type": "string", "description": "Sélecteur CSS de l'élément"} | |
| }, | |
| "required": ["selector"] | |
| }, | |
| ) | |
| ] | |
| async def ensure_browser(): | |
| """S'assurer que le navigateur est initialisé""" | |
| if not browser_state.browser: | |
| browser_state.playwright = await async_playwright().start() | |
| # Configuration pour HF Spaces (environnement conteneurisé) | |
| launch_options = { | |
| 'headless': True, | |
| 'args': [ | |
| '--no-sandbox', | |
| '--disable-dev-shm-usage', | |
| '--disable-gpu', | |
| '--disable-dev-tools', | |
| '--no-first-run', | |
| '--disable-extensions', | |
| '--disable-default-apps', | |
| '--disable-background-timer-throttling', | |
| '--disable-backgrounding-occluded-windows', | |
| '--disable-renderer-backgrounding', | |
| '--single-process' # Important pour les conteneurs limités | |
| ] | |
| } | |
| # Détecter si on est dans HF Spaces | |
| import os | |
| if os.environ.get('SPACE_ID') or os.environ.get('HUGGINGFACE_HUB_CACHE'): | |
| # Configuration spéciale pour HF Spaces | |
| launch_options['args'].extend([ | |
| '--disable-features=VizDisplayCompositor', | |
| '--run-all-compositor-stages-before-draw', | |
| '--memory-pressure-off' | |
| ]) | |
| logger.info("🚀 Configuration HF Spaces détectée") | |
| try: | |
| browser_state.browser = await browser_state.playwright.chromium.launch(**launch_options) | |
| except Exception as e: | |
| if "Executable doesn't exist" in str(e): | |
| logger.error("❌ Navigateurs Playwright non installés. Tentative de réinstallation...") | |
| try: | |
| # Tentative de réinstallation automatique | |
| import subprocess | |
| import sys | |
| result = subprocess.run([sys.executable, "-m", "playwright", "install", "chromium"], | |
| capture_output=True, text=True, timeout=120) | |
| if result.returncode == 0: | |
| logger.info("✅ Réinstallation Playwright réussie") | |
| # Nouvelle tentative | |
| browser_state.browser = await browser_state.playwright.chromium.launch(**launch_options) | |
| else: | |
| logger.error(f"❌ Échec réinstallation: {result.stderr}") | |
| raise Exception("Impossible d'installer les navigateurs Playwright") | |
| except subprocess.TimeoutExpired: | |
| logger.error("❌ Timeout lors de l'installation Playwright") | |
| raise Exception("Timeout installation Playwright") | |
| except Exception as install_error: | |
| logger.error(f"❌ Erreur installation: {install_error}") | |
| raise | |
| else: | |
| raise | |
| browser_state.context = await browser_state.browser.new_context( | |
| viewport={'width': 1280, 'height': 720}, | |
| user_agent='Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' | |
| ) | |
| browser_state.page = await browser_state.context.new_page() | |
| logger.info("✅ Navigateur initialisé avec succès") | |
| # Configurer les timeouts pour les environnements cloud | |
| browser_state.page.set_default_timeout(30000) # 30 secondes | |
| browser_state.page.set_default_navigation_timeout(45000) # 45 secondes | |
| async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent]: | |
| """Gestionnaire des appels d'outils""" | |
| try: | |
| await ensure_browser() | |
| if name == "navigate_to_url": | |
| url = arguments["url"] | |
| wait_for = arguments.get("wait_for") | |
| logger.info(f"Navigation vers: {url}") | |
| await browser_state.page.goto(url, wait_until="domcontentloaded") | |
| if wait_for: | |
| await browser_state.page.wait_for_selector(wait_for, timeout=10000) | |
| title = await browser_state.page.title() | |
| return [types.TextContent( | |
| type="text", | |
| text=f"✅ Navigation réussie vers {url}\nTitre: {title}" | |
| )] | |
| elif name == "take_screenshot": | |
| full_page = arguments.get("full_page", True) | |
| element_selector = arguments.get("element_selector") | |
| if element_selector: | |
| element = await browser_state.page.query_selector(element_selector) | |
| if element: | |
| screenshot_bytes = await element.screenshot() | |
| else: | |
| return [types.TextContent(type="text", text="❌ Élément non trouvé")] | |
| else: | |
| screenshot_bytes = await browser_state.page.screenshot(full_page=full_page) | |
| # Encoder en base64 | |
| screenshot_b64 = base64.b64encode(screenshot_bytes).decode() | |
| return [types.TextContent( | |
| type="text", | |
| text=f"📸 Capture d'écran prise (taille: {len(screenshot_bytes)} bytes)\nBase64: data:image/png;base64,{screenshot_b64[:100]}..." | |
| )] | |
| elif name == "extract_text": | |
| selector = arguments.get("selector") | |
| clean = arguments.get("clean", True) | |
| if selector: | |
| elements = await browser_state.page.query_selector_all(selector) | |
| texts = [] | |
| for element in elements: | |
| text = await element.inner_text() | |
| if clean: | |
| text = re.sub(r'\s+', ' ', text.strip()) | |
| texts.append(text) | |
| result = "\n".join(texts) | |
| else: | |
| result = await browser_state.page.inner_text("body") | |
| if clean: | |
| result = re.sub(r'\s+', ' ', result.strip()) | |
| return [types.TextContent( | |
| type="text", | |
| text=f"📝 Texte extrait:\n{result[:2000]}..." if len(result) > 2000 else f"📝 Texte extrait:\n{result}" | |
| )] | |
| elif name == "click_element": | |
| selector = arguments["selector"] | |
| try: | |
| await browser_state.page.click(selector, timeout=5000) | |
| return [types.TextContent(type="text", text=f"👆 Clic effectué sur: {selector}")] | |
| except Exception as e: | |
| return [types.TextContent(type="text", text=f"❌ Erreur lors du clic: {str(e)}")] | |
| elif name == "fill_input": | |
| selector = arguments["selector"] | |
| value = arguments["value"] | |
| try: | |
| await browser_state.page.fill(selector, value) | |
| return [types.TextContent(type="text", text=f"✏️ Champ rempli: {selector} = '{value}'")] | |
| except Exception as e: | |
| return [types.TextContent(type="text", text=f"❌ Erreur lors de la saisie: {str(e)}")] | |
| elif name == "get_page_info": | |
| title = await browser_state.page.title() | |
| url = browser_state.page.url | |
| # Essayer d'extraire la description | |
| try: | |
| description_element = await browser_state.page.query_selector('meta[name="description"]') | |
| description = await description_element.get_attribute("content") if description_element else "Non disponible" | |
| except: | |
| description = "Non disponible" | |
| return [types.TextContent( | |
| type="text", | |
| text=f"📄 Informations de la page:\n• Titre: {title}\n• URL: {url}\n• Description: {description}" | |
| )] | |
| elif name == "wait_for_element": | |
| selector = arguments["selector"] | |
| timeout = arguments.get("timeout", 10000) | |
| try: | |
| await browser_state.page.wait_for_selector(selector, timeout=timeout) | |
| return [types.TextContent(type="text", text=f"✅ Élément trouvé: {selector}")] | |
| except Exception as e: | |
| return [types.TextContent(type="text", text=f"⏰ Timeout: élément non trouvé dans les {timeout}ms")] | |
| elif name == "search_text": | |
| text = arguments["text"] | |
| case_sensitive = arguments.get("case_sensitive", False) | |
| page_text = await browser_state.page.inner_text("body") | |
| if not case_sensitive: | |
| found = text.lower() in page_text.lower() | |
| else: | |
| found = text in page_text | |
| return [types.TextContent( | |
| type="text", | |
| text=f"🔍 Recherche de '{text}': {'✅ Trouvé' if found else '❌ Non trouvé'}" | |
| )] | |
| elif name == "extract_links": | |
| filter_domain = arguments.get("filter_domain") | |
| links = await browser_state.page.evaluate(''' | |
| () => { | |
| const links = Array.from(document.querySelectorAll('a[href]')); | |
| return links.map(link => ({ | |
| text: link.innerText.trim(), | |
| href: link.href, | |
| title: link.title || '' | |
| })); | |
| } | |
| ''') | |
| if filter_domain: | |
| links = [link for link in links if filter_domain in link['href']] | |
| result = f"🔗 {len(links)} liens trouvés:\n" | |
| for i, link in enumerate(links[:20]): # Limiter à 20 liens | |
| result += f"{i+1}. {link['text'][:50]} -> {link['href']}\n" | |
| if len(links) > 20: | |
| result += f"... et {len(links) - 20} autres liens" | |
| return [types.TextContent(type="text", text=result)] | |
| elif name == "get_element_info": | |
| selector = arguments["selector"] | |
| try: | |
| element = await browser_state.page.query_selector(selector) | |
| if element: | |
| info = await element.evaluate(''' | |
| (el) => ({ | |
| tagName: el.tagName, | |
| text: el.innerText?.substring(0, 200), | |
| href: el.href, | |
| src: el.src, | |
| id: el.id, | |
| className: el.className, | |
| visible: window.getComputedStyle(el).display !== 'none' | |
| }) | |
| ''') | |
| result = f"ℹ️ Informations sur l'élément '{selector}':\n" | |
| for key, value in info.items(): | |
| if value: | |
| result += f"• {key}: {value}\n" | |
| return [types.TextContent(type="text", text=result)] | |
| else: | |
| return [types.TextContent(type="text", text=f"❌ Élément non trouvé: {selector}")] | |
| except Exception as e: | |
| return [types.TextContent(type="text", text=f"❌ Erreur: {str(e)}")] | |
| else: | |
| return [types.TextContent(type="text", text=f"❌ Outil inconnu: {name}")] | |
| except Exception as e: | |
| logger.error(f"Erreur dans {name}: {str(e)}") | |
| return [types.TextContent(type="text", text=f"❌ Erreur: {str(e)}")] | |
| async def cleanup(): | |
| """Nettoyer les ressources""" | |
| if browser_state.browser: | |
| await browser_state.browser.close() | |
| if browser_state.playwright: | |
| await browser_state.playwright.stop() | |
| async def main(): | |
| """Point d'entrée principal""" | |
| try: | |
| # Configuration du serveur | |
| async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): | |
| await server.run( | |
| read_stream, | |
| write_stream, | |
| InitializationOptions( | |
| server_name="browser-mcp", | |
| server_version="1.0.0", | |
| capabilities=server.get_capabilities( | |
| notification_options=NotificationOptions(), | |
| experimental_capabilities={}, | |
| ), | |
| ), | |
| ) | |
| except KeyboardInterrupt: | |
| logger.info("Arrêt du serveur") | |
| finally: | |
| await cleanup() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |