Spaces:
Sleeping
Sleeping
| """ | |
| mcp_tools.py — MCP Bridge for Enhanced Computer Control | |
| ======================================================= | |
| Playwright Browser MCP + Code Execution + FileSystem + HF Hub MCP | |
| """ | |
| import os | |
| import json | |
| import time | |
| import base64 | |
| import tempfile | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from dataclasses import dataclass | |
| from io import BytesIO | |
| from PIL import Image | |
| # Smolagents tool decorator | |
| from smolagents import tool | |
| # Playwright | |
| try: | |
| from playwright.sync_api import sync_playwright, Page, Browser, BrowserContext | |
| HAS_PLAYWRIGHT = True | |
| except ImportError: | |
| HAS_PLAYWRIGHT = False | |
| sync_playwright = None | |
| Page = Browser = BrowserContext = Any | |
| # E2B code execution | |
| try: | |
| from e2b_code_interpreter import Sandbox as CodeSandbox | |
| HAS_E2B_CODE = True | |
| except ImportError: | |
| HAS_E2B_CODE = False | |
| CodeSandbox = Any | |
| # --------------------------------------------------------------------------- | |
| # Playwright Browser MCP | |
| # --------------------------------------------------------------------------- | |
| class BrowserMCP: | |
| """High-level browser automation via Playwright. | |
| Replaces raw coordinate clicking with semantic selectors. | |
| """ | |
| def __init__(self, headless: bool = True, browser_type: str = "chromium"): | |
| self.headless = headless | |
| self.browser_type = browser_type | |
| self._playwright = None | |
| self._browser: Optional[Browser] = None | |
| self._context: Optional[BrowserContext] = None | |
| self._page: Optional[Page] = None | |
| self._closed = True | |
| def start(self): | |
| if not HAS_PLAYWRIGHT: | |
| raise RuntimeError("Playwright not installed. Run: pip install playwright && playwright install chromium") | |
| self._playwright = sync_playwright().start() | |
| browser_cls = getattr(self._playwright, self.browser_type) | |
| self._browser = browser_cls.launch(headless=self.headless) | |
| self._context = self._browser.new_context( | |
| viewport={"width": 1280, "height": 720}, | |
| user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| ) | |
| self._page = self._context.new_page() | |
| self._closed = False | |
| return self._page | |
| def close(self): | |
| if self._context: | |
| self._context.close() | |
| if self._browser: | |
| self._browser.close() | |
| if self._playwright: | |
| self._playwright.stop() | |
| self._closed = True | |
| def ensure_page(self) -> Page: | |
| if self._closed or self._page is None: | |
| self.start() | |
| return self._page | |
| def goto(self, url: str, wait_until: str = "networkidle") -> str: | |
| page = self.ensure_page() | |
| if not url.startswith(("http://", "https://")): | |
| url = "https://" + url | |
| page.goto(url, wait_until=wait_until, timeout=30000) | |
| return f"Navigated to {url}" | |
| def click(self, selector: str, by: str = "css") -> str: | |
| page = self.ensure_page() | |
| if by == "text": | |
| page.get_by_text(selector).first.click() | |
| elif by == "role": | |
| role, name = selector.split("::", 1) | |
| page.get_by_role(role.strip(), name=name.strip()).first.click() | |
| else: | |
| page.locator(selector).first.click() | |
| return f"Clicked element: {selector}" | |
| def fill(self, selector: str, text: str, by: str = "css") -> str: | |
| page = self.ensure_page() | |
| if by == "text": | |
| el = page.get_by_text(selector).first | |
| elif by == "role": | |
| role, name = selector.split("::", 1) | |
| el = page.get_by_role(role.strip(), name=name.strip()).first | |
| else: | |
| el = page.locator(selector).first | |
| el.fill(text) | |
| return f"Filled '{selector}' with '{text}'" | |
| def press(self, key: str) -> str: | |
| page = self.ensure_page() | |
| page.keyboard.press(key) | |
| return f"Pressed key: {key}" | |
| def scroll(self, direction: str = "down", amount: int = 500) -> str: | |
| page = self.ensure_page() | |
| if direction == "down": | |
| page.mouse.wheel(0, amount) | |
| else: | |
| page.mouse.wheel(0, -amount) | |
| return f"Scrolled {direction} by {amount}" | |
| def get_text(self, selector: str = "body") -> str: | |
| page = self.ensure_page() | |
| return page.locator(selector).first.inner_text() | |
| def get_html(self) -> str: | |
| page = self.ensure_page() | |
| return page.content() | |
| def screenshot(self, path: Optional[str] = None) -> str: | |
| page = self.ensure_page() | |
| if path: | |
| page.screenshot(path=path, full_page=True) | |
| return f"Screenshot saved to {path}" | |
| else: | |
| buf = page.screenshot(full_page=True) | |
| return base64.b64encode(buf).decode("utf-8") | |
| def find_and_click(self, text: str) -> str: | |
| """Semantic find-and-click by visible text.""" | |
| page = self.ensure_page() | |
| page.get_by_text(text).first.click() | |
| return f"Found and clicked text: {text}" | |
| def search_on_page(self, query: str) -> str: | |
| page = self.ensure_page() | |
| page.keyboard.press("Control+f") | |
| page.keyboard.insert_text(query) | |
| page.keyboard.press("Enter") | |
| time.sleep(0.5) | |
| page.keyboard.press("Escape") | |
| return f"Searched for '{query}' on page" | |
| def download_file(self, url: str, save_path: str) -> str: | |
| page = self.ensure_page() | |
| with page.expect_download() as dl_info: | |
| page.goto(url) | |
| dl = dl_info.value | |
| dl.save_as(save_path) | |
| return f"Downloaded to {save_path}" | |
| def extract_links(self) -> List[Dict[str, str]]: | |
| page = self.ensure_page() | |
| links = page.eval_on_selector_all("a", """elements => elements.map(a => ({href: a.href, text: a.innerText.trim()}))""") | |
| return links | |
| def extract_tables(self) -> List[List[List[str]]]: | |
| page = self.ensure_page() | |
| tables = page.eval_on_selector_all("table", """ | |
| tables => tables.map(t => { | |
| return Array.from(t.querySelectorAll('tr')).map(row => | |
| Array.from(row.querySelectorAll('td, th')).map(cell => cell.innerText.trim()) | |
| ); | |
| }) | |
| """) | |
| return tables | |
| def evaluate_js(self, script: str) -> Any: | |
| page = self.ensure_page() | |
| return page.evaluate(script) | |
| # --------------------------------------------------------------------------- | |
| # Tool factory for smolagents integration | |
| # --------------------------------------------------------------------------- | |
| def make_browser_tools(browser_mcp: BrowserMCP) -> Dict[str, Any]: | |
| """Generate smolagents @tool functions from BrowserMCP.""" | |
| def browser_goto(url: str) -> str: | |
| """Navigate the browser to a URL. Prefer this over clicking browser icons.""" | |
| return browser_mcp.goto(url) | |
| def browser_click(selector: str, by: str = "css") -> str: | |
| """Click an element by CSS selector, text content, or ARIA role. | |
| by can be 'css', 'text', or 'role' (role::name format).""" | |
| return browser_mcp.click(selector, by) | |
| def browser_fill(selector: str, text: str, by: str = "css") -> str: | |
| """Fill a form field with text.""" | |
| return browser_mcp.fill(selector, text, by) | |
| def browser_press_key(key: str) -> str: | |
| """Press a keyboard key (e.g., 'Enter', 'Tab', 'Escape').""" | |
| return browser_mcp.press(key) | |
| def browser_scroll(direction: str = "down", amount: int = 500) -> str: | |
| """Scroll the page up or down.""" | |
| return browser_mcp.scroll(direction, amount) | |
| def browser_get_text(selector: str = "body") -> str: | |
| """Extract text content from a page element.""" | |
| return browser_mcp.get_text(selector) | |
| def browser_find_and_click(text: str) -> str: | |
| """Find an element by its visible text and click it.""" | |
| return browser_mcp.find_and_click(text) | |
| def browser_screenshot(path: str = "") -> str: | |
| """Take a screenshot of the current page. If path is empty, returns base64.""" | |
| return browser_mcp.screenshot(path or None) | |
| def browser_extract_links() -> str: | |
| """Extract all links from the current page as JSON.""" | |
| links = browser_mcp.extract_links() | |
| return json.dumps(links[:50], indent=2) # Limit to 50 | |
| def browser_extract_tables() -> str: | |
| """Extract all tables from the current page as JSON.""" | |
| tables = browser_mcp.extract_tables() | |
| return json.dumps(tables[:5], indent=2) | |
| def browser_evaluate_js(script: str) -> str: | |
| """Execute JavaScript in the browser context and return the result.""" | |
| result = browser_mcp.evaluate_js(script) | |
| return json.dumps(result, default=str) | |
| return { | |
| "browser_goto": browser_goto, | |
| "browser_click": browser_click, | |
| "browser_fill": browser_fill, | |
| "browser_press_key": browser_press_key, | |
| "browser_scroll": browser_scroll, | |
| "browser_get_text": browser_get_text, | |
| "browser_find_and_click": browser_find_and_click, | |
| "browser_screenshot": browser_screenshot, | |
| "browser_extract_links": browser_extract_links, | |
| "browser_extract_tables": browser_extract_tables, | |
| "browser_evaluate_js": browser_evaluate_js, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Code Execution MCP (E2B Code Interpreter) | |
| # --------------------------------------------------------------------------- | |
| class CodeExecutionMCP: | |
| """Sandboxed Python/JS code execution via E2B.""" | |
| def __init__(self, api_key: Optional[str] = None): | |
| self.api_key = api_key or os.getenv("E2B_API_KEY") | |
| self._sandbox: Optional[Any] = None | |
| def _get_sandbox(self): | |
| if not HAS_E2B_CODE: | |
| raise RuntimeError("e2b_code_interpreter not installed") | |
| if self._sandbox is None: | |
| self._sandbox = CodeSandbox(api_key=self.api_key) | |
| return self._sandbox | |
| def run_python(self, code: str, timeout: int = 30) -> Dict[str, Any]: | |
| sb = self._get_sandbox() | |
| execution = sb.run_code(code, timeout=timeout) | |
| return { | |
| "stdout": execution.logs.stdout, | |
| "stderr": execution.logs.stderr, | |
| "results": [str(r) for r in execution.results], | |
| "error": execution.error, | |
| } | |
| def run_shell(self, command: str, timeout: int = 30) -> Dict[str, Any]: | |
| sb = self._get_sandbox() | |
| execution = sb.run_code(f"!{command}", timeout=timeout) | |
| return { | |
| "stdout": execution.logs.stdout, | |
| "stderr": execution.logs.stderr, | |
| "error": execution.error, | |
| } | |
| def install_package(self, package: str) -> str: | |
| result = self.run_shell(f"pip install {package}") | |
| return f"Installed {package}: {result['stdout'][:500]}" | |
| def close(self): | |
| if self._sandbox: | |
| self._sandbox.kill() | |
| self._sandbox = None | |
| def make_code_tools(code_mcp: CodeExecutionMCP) -> Dict[str, Any]: | |
| def execute_python(code: str) -> str: | |
| """Execute Python code in a sandboxed environment. Use for data processing, calculations, or parsing.""" | |
| result = code_mcp.run_python(code) | |
| if result["error"]: | |
| return f"Error: {result['error']}\nStderr: {result['stderr']}" | |
| out = "\n".join(result["stdout"]) | |
| if result["results"]: | |
| out += f"\nResults: {result['results']}" | |
| return out[:3000] | |
| def execute_shell(command: str) -> str: | |
| """Execute a shell command in the sandbox.""" | |
| result = code_mcp.run_shell(command) | |
| if result["error"]: | |
| return f"Error: {result['error']}" | |
| return "\n".join(result["stdout"])[:3000] | |
| def install_python_package(package: str) -> str: | |
| """Install a Python package in the sandbox.""" | |
| return code_mcp.install_package(package) | |
| return { | |
| "execute_python": execute_python, | |
| "execute_shell": execute_shell, | |
| "install_python_package": install_python_package, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # FileSystem MCP (Local + E2B) | |
| # --------------------------------------------------------------------------- | |
| class FileSystemMCP: | |
| """Read/write files either locally or in the E2B sandbox.""" | |
| def __init__(self, base_dir: str = "./workspace"): | |
| self.base_dir = os.path.abspath(base_dir) | |
| os.makedirs(self.base_dir, exist_ok=True) | |
| def _safe_path(self, path: str) -> str: | |
| abs_path = os.path.abspath(os.path.join(self.base_dir, path)) | |
| if not abs_path.startswith(self.base_dir): | |
| raise ValueError("Path traversal attempt detected") | |
| return abs_path | |
| def read_file(self, path: str) -> str: | |
| sp = self._safe_path(path) | |
| with open(sp, "r", encoding="utf-8", errors="ignore") as f: | |
| return f.read() | |
| def write_file(self, path: str, content: str) -> str: | |
| sp = self._safe_path(path) | |
| os.makedirs(os.path.dirname(sp), exist_ok=True) | |
| with open(sp, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| return f"Wrote {len(content)} chars to {path}" | |
| def list_dir(self, path: str = ".") -> List[str]: | |
| sp = self._safe_path(path) | |
| return os.listdir(sp) | |
| def read_image(self, path: str) -> Image.Image: | |
| sp = self._safe_path(path) | |
| return Image.open(sp) | |
| def make_fs_tools(fs_mcp: FileSystemMCP) -> Dict[str, Any]: | |
| def fs_read(path: str) -> str: | |
| """Read a text file from the workspace.""" | |
| return fs_mcp.read_file(path) | |
| def fs_write(path: str, content: str) -> str: | |
| """Write text content to a file in the workspace.""" | |
| return fs_mcp.write_file(path, content) | |
| def fs_list(path: str = ".") -> str: | |
| """List files in a workspace directory.""" | |
| return json.dumps(fs_mcp.list_dir(path)) | |
| return { | |
| "fs_read": fs_read, | |
| "fs_write": fs_write, | |
| "fs_list": fs_list, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # HF Hub MCP (Hugging Face ecosystem integration) | |
| # --------------------------------------------------------------------------- | |
| class HFHubMCP: | |
| """Interact with the Hugging Face Hub from within the agent.""" | |
| def __init__(self, token: Optional[str] = None): | |
| self.token = token or os.getenv("HF_TOKEN") | |
| from huggingface_hub import HfApi, upload_file, create_repo | |
| self.api = HfApi(token=self.token) | |
| self._upload_file = upload_file | |
| self._create_repo = create_repo | |
| def search_models(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: | |
| models = self.api.list_models(search=query, limit=limit, sort="downloads") | |
| return [{"id": m.id, "downloads": m.downloads, "tags": m.tags} for m in models] | |
| def search_datasets(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: | |
| datasets = self.api.list_datasets(search=query, limit=limit) | |
| return [{"id": d.id, "tags": d.tags} for d in datasets] | |
| def search_spaces(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: | |
| spaces = self.api.list_spaces(search=query, limit=limit) | |
| return [{"id": s.id, "sdk": getattr(s, "sdk", "unknown")} for s in spaces] | |
| def upload_to_dataset(self, repo_id: str, file_path: str, path_in_repo: str) -> str: | |
| self._upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=self.token, | |
| ) | |
| return f"Uploaded {file_path} to {repo_id}/{path_in_repo}" | |
| def make_hf_tools(hf_mcp: HFHubMCP) -> Dict[str, Any]: | |
| def hf_search_models(query: str, limit: int = 10) -> str: | |
| """Search Hugging Face Hub for models.""" | |
| results = hf_mcp.search_models(query, limit) | |
| return json.dumps(results, indent=2) | |
| def hf_search_datasets(query: str, limit: int = 10) -> str: | |
| """Search Hugging Face Hub for datasets.""" | |
| results = hf_mcp.search_datasets(query, limit) | |
| return json.dumps(results, indent=2) | |
| def hf_search_spaces(query: str, limit: int = 10) -> str: | |
| """Search Hugging Face Hub for Spaces.""" | |
| results = hf_mcp.search_spaces(query, limit) | |
| return json.dumps(results, indent=2) | |
| def hf_upload_dataset_file(repo_id: str, file_path: str, path_in_repo: str) -> str: | |
| """Upload a file to a Hugging Face dataset repository.""" | |
| return hf_mcp.upload_to_dataset(repo_id, file_path, path_in_repo) | |
| return { | |
| "hf_search_models": hf_search_models, | |
| "hf_search_datasets": hf_search_datasets, | |
| "hf_search_spaces": hf_search_spaces, | |
| "hf_upload_dataset_file": hf_upload_dataset_file, | |
| } | |