| """ |
| mcp_tools.py — MCP Bridge for Enhanced Computer Control |
| ======================================================= |
| Playwright Browser MCP + Code Execution + FileSystem + HF Hub MCP |
| """ |
|
|
| import os |
| import json |
| import time |
| import base64 |
| import tempfile |
| from typing import Any, Dict, List, Optional, Tuple |
| from dataclasses import dataclass |
| from io import BytesIO |
|
|
| from PIL import Image |
|
|
| |
| from smolagents import tool |
|
|
| |
| try: |
| from playwright.sync_api import sync_playwright, Page, Browser, BrowserContext |
| HAS_PLAYWRIGHT = True |
| except ImportError: |
| HAS_PLAYWRIGHT = False |
| sync_playwright = None |
| Page = Browser = BrowserContext = Any |
|
|
| |
| try: |
| from e2b_code_interpreter import Sandbox as CodeSandbox |
| HAS_E2B_CODE = True |
| except ImportError: |
| HAS_E2B_CODE = False |
| CodeSandbox = Any |
|
|
|
|
| |
| |
| |
|
|
| class BrowserMCP: |
| """High-level browser automation via Playwright. |
| Replaces raw coordinate clicking with semantic selectors. |
| """ |
|
|
| def __init__(self, headless: bool = True, browser_type: str = "chromium"): |
| self.headless = headless |
| self.browser_type = browser_type |
| self._playwright = None |
| self._browser: Optional[Browser] = None |
| self._context: Optional[BrowserContext] = None |
| self._page: Optional[Page] = None |
| self._closed = True |
|
|
| def start(self): |
| if not HAS_PLAYWRIGHT: |
| raise RuntimeError("Playwright not installed. Run: pip install playwright && playwright install chromium") |
| self._playwright = sync_playwright().start() |
| browser_cls = getattr(self._playwright, self.browser_type) |
| self._browser = browser_cls.launch(headless=self.headless) |
| self._context = self._browser.new_context( |
| viewport={"width": 1280, "height": 720}, |
| user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", |
| ) |
| self._page = self._context.new_page() |
| self._closed = False |
| return self._page |
|
|
| def close(self): |
| if self._context: |
| self._context.close() |
| if self._browser: |
| self._browser.close() |
| if self._playwright: |
| self._playwright.stop() |
| self._closed = True |
|
|
| def ensure_page(self) -> Page: |
| if self._closed or self._page is None: |
| self.start() |
| return self._page |
|
|
| def goto(self, url: str, wait_until: str = "networkidle") -> str: |
| page = self.ensure_page() |
| if not url.startswith(("http://", "https://")): |
| url = "https://" + url |
| page.goto(url, wait_until=wait_until, timeout=30000) |
| return f"Navigated to {url}" |
|
|
| def click(self, selector: str, by: str = "css") -> str: |
| page = self.ensure_page() |
| if by == "text": |
| page.get_by_text(selector).first.click() |
| elif by == "role": |
| role, name = selector.split("::", 1) |
| page.get_by_role(role.strip(), name=name.strip()).first.click() |
| else: |
| page.locator(selector).first.click() |
| return f"Clicked element: {selector}" |
|
|
| def fill(self, selector: str, text: str, by: str = "css") -> str: |
| page = self.ensure_page() |
| if by == "text": |
| el = page.get_by_text(selector).first |
| elif by == "role": |
| role, name = selector.split("::", 1) |
| el = page.get_by_role(role.strip(), name=name.strip()).first |
| else: |
| el = page.locator(selector).first |
| el.fill(text) |
| return f"Filled '{selector}' with '{text}'" |
|
|
| def press(self, key: str) -> str: |
| page = self.ensure_page() |
| page.keyboard.press(key) |
| return f"Pressed key: {key}" |
|
|
| def scroll(self, direction: str = "down", amount: int = 500) -> str: |
| page = self.ensure_page() |
| if direction == "down": |
| page.mouse.wheel(0, amount) |
| else: |
| page.mouse.wheel(0, -amount) |
| return f"Scrolled {direction} by {amount}" |
|
|
| def get_text(self, selector: str = "body") -> str: |
| page = self.ensure_page() |
| return page.locator(selector).first.inner_text() |
|
|
| def get_html(self) -> str: |
| page = self.ensure_page() |
| return page.content() |
|
|
| def screenshot(self, path: Optional[str] = None) -> str: |
| page = self.ensure_page() |
| if path: |
| page.screenshot(path=path, full_page=True) |
| return f"Screenshot saved to {path}" |
| else: |
| buf = page.screenshot(full_page=True) |
| return base64.b64encode(buf).decode("utf-8") |
|
|
| def find_and_click(self, text: str) -> str: |
| """Semantic find-and-click by visible text.""" |
| page = self.ensure_page() |
| page.get_by_text(text).first.click() |
| return f"Found and clicked text: {text}" |
|
|
| def search_on_page(self, query: str) -> str: |
| page = self.ensure_page() |
| page.keyboard.press("Control+f") |
| page.keyboard.insert_text(query) |
| page.keyboard.press("Enter") |
| time.sleep(0.5) |
| page.keyboard.press("Escape") |
| return f"Searched for '{query}' on page" |
|
|
| def download_file(self, url: str, save_path: str) -> str: |
| page = self.ensure_page() |
| with page.expect_download() as dl_info: |
| page.goto(url) |
| dl = dl_info.value |
| dl.save_as(save_path) |
| return f"Downloaded to {save_path}" |
|
|
| def extract_links(self) -> List[Dict[str, str]]: |
| page = self.ensure_page() |
| links = page.eval_on_selector_all("a", """elements => elements.map(a => ({href: a.href, text: a.innerText.trim()}))""") |
| return links |
|
|
| def extract_tables(self) -> List[List[List[str]]]: |
| page = self.ensure_page() |
| tables = page.eval_on_selector_all("table", """ |
| tables => tables.map(t => { |
| return Array.from(t.querySelectorAll('tr')).map(row => |
| Array.from(row.querySelectorAll('td, th')).map(cell => cell.innerText.trim()) |
| ); |
| }) |
| """) |
| return tables |
|
|
| def evaluate_js(self, script: str) -> Any: |
| page = self.ensure_page() |
| return page.evaluate(script) |
|
|
|
|
| |
| |
| |
|
|
| def make_browser_tools(browser_mcp: BrowserMCP) -> Dict[str, Any]: |
| """Generate smolagents @tool functions from BrowserMCP.""" |
|
|
| @tool |
| def browser_goto(url: str) -> str: |
| """Navigate the browser to a URL. Prefer this over clicking browser icons.""" |
| return browser_mcp.goto(url) |
|
|
| @tool |
| def browser_click(selector: str, by: str = "css") -> str: |
| """Click an element by CSS selector, text content, or ARIA role. |
| by can be 'css', 'text', or 'role' (role::name format).""" |
| return browser_mcp.click(selector, by) |
|
|
| @tool |
| def browser_fill(selector: str, text: str, by: str = "css") -> str: |
| """Fill a form field with text.""" |
| return browser_mcp.fill(selector, text, by) |
|
|
| @tool |
| def browser_press_key(key: str) -> str: |
| """Press a keyboard key (e.g., 'Enter', 'Tab', 'Escape').""" |
| return browser_mcp.press(key) |
|
|
| @tool |
| def browser_scroll(direction: str = "down", amount: int = 500) -> str: |
| """Scroll the page up or down.""" |
| return browser_mcp.scroll(direction, amount) |
|
|
| @tool |
| def browser_get_text(selector: str = "body") -> str: |
| """Extract text content from a page element.""" |
| return browser_mcp.get_text(selector) |
|
|
| @tool |
| def browser_find_and_click(text: str) -> str: |
| """Find an element by its visible text and click it.""" |
| return browser_mcp.find_and_click(text) |
|
|
| @tool |
| def browser_screenshot(path: str = "") -> str: |
| """Take a screenshot of the current page. If path is empty, returns base64.""" |
| return browser_mcp.screenshot(path or None) |
|
|
| @tool |
| def browser_extract_links() -> str: |
| """Extract all links from the current page as JSON.""" |
| links = browser_mcp.extract_links() |
| return json.dumps(links[:50], indent=2) |
|
|
| @tool |
| def browser_extract_tables() -> str: |
| """Extract all tables from the current page as JSON.""" |
| tables = browser_mcp.extract_tables() |
| return json.dumps(tables[:5], indent=2) |
|
|
| @tool |
| def browser_evaluate_js(script: str) -> str: |
| """Execute JavaScript in the browser context and return the result.""" |
| result = browser_mcp.evaluate_js(script) |
| return json.dumps(result, default=str) |
|
|
| return { |
| "browser_goto": browser_goto, |
| "browser_click": browser_click, |
| "browser_fill": browser_fill, |
| "browser_press_key": browser_press_key, |
| "browser_scroll": browser_scroll, |
| "browser_get_text": browser_get_text, |
| "browser_find_and_click": browser_find_and_click, |
| "browser_screenshot": browser_screenshot, |
| "browser_extract_links": browser_extract_links, |
| "browser_extract_tables": browser_extract_tables, |
| "browser_evaluate_js": browser_evaluate_js, |
| } |
|
|
|
|
| |
| |
| |
|
|
| class CodeExecutionMCP: |
| """Sandboxed Python/JS code execution via E2B.""" |
|
|
| def __init__(self, api_key: Optional[str] = None): |
| self.api_key = api_key or os.getenv("E2B_API_KEY") |
| self._sandbox: Optional[Any] = None |
|
|
| def _get_sandbox(self): |
| if not HAS_E2B_CODE: |
| raise RuntimeError("e2b_code_interpreter not installed") |
| if self._sandbox is None: |
| self._sandbox = CodeSandbox(api_key=self.api_key) |
| return self._sandbox |
|
|
| def run_python(self, code: str, timeout: int = 30) -> Dict[str, Any]: |
| sb = self._get_sandbox() |
| execution = sb.run_code(code, timeout=timeout) |
| return { |
| "stdout": execution.logs.stdout, |
| "stderr": execution.logs.stderr, |
| "results": [str(r) for r in execution.results], |
| "error": execution.error, |
| } |
|
|
| def run_shell(self, command: str, timeout: int = 30) -> Dict[str, Any]: |
| sb = self._get_sandbox() |
| execution = sb.run_code(f"!{command}", timeout=timeout) |
| return { |
| "stdout": execution.logs.stdout, |
| "stderr": execution.logs.stderr, |
| "error": execution.error, |
| } |
|
|
| def install_package(self, package: str) -> str: |
| result = self.run_shell(f"pip install {package}") |
| return f"Installed {package}: {result['stdout'][:500]}" |
|
|
| def close(self): |
| if self._sandbox: |
| self._sandbox.kill() |
| self._sandbox = None |
|
|
|
|
| def make_code_tools(code_mcp: CodeExecutionMCP) -> Dict[str, Any]: |
|
|
| @tool |
| def execute_python(code: str) -> str: |
| """Execute Python code in a sandboxed environment. Use for data processing, calculations, or parsing.""" |
| result = code_mcp.run_python(code) |
| if result["error"]: |
| return f"Error: {result['error']}\nStderr: {result['stderr']}" |
| out = "\n".join(result["stdout"]) |
| if result["results"]: |
| out += f"\nResults: {result['results']}" |
| return out[:3000] |
|
|
| @tool |
| def execute_shell(command: str) -> str: |
| """Execute a shell command in the sandbox.""" |
| result = code_mcp.run_shell(command) |
| if result["error"]: |
| return f"Error: {result['error']}" |
| return "\n".join(result["stdout"])[:3000] |
|
|
| @tool |
| def install_python_package(package: str) -> str: |
| """Install a Python package in the sandbox.""" |
| return code_mcp.install_package(package) |
|
|
| return { |
| "execute_python": execute_python, |
| "execute_shell": execute_shell, |
| "install_python_package": install_python_package, |
| } |
|
|
|
|
| |
| |
| |
|
|
| class FileSystemMCP: |
| """Read/write files either locally or in the E2B sandbox.""" |
|
|
| def __init__(self, base_dir: str = "./workspace"): |
| self.base_dir = os.path.abspath(base_dir) |
| os.makedirs(self.base_dir, exist_ok=True) |
|
|
| def _safe_path(self, path: str) -> str: |
| abs_path = os.path.abspath(os.path.join(self.base_dir, path)) |
| if not abs_path.startswith(self.base_dir): |
| raise ValueError("Path traversal attempt detected") |
| return abs_path |
|
|
| def read_file(self, path: str) -> str: |
| sp = self._safe_path(path) |
| with open(sp, "r", encoding="utf-8", errors="ignore") as f: |
| return f.read() |
|
|
| def write_file(self, path: str, content: str) -> str: |
| sp = self._safe_path(path) |
| os.makedirs(os.path.dirname(sp), exist_ok=True) |
| with open(sp, "w", encoding="utf-8") as f: |
| f.write(content) |
| return f"Wrote {len(content)} chars to {path}" |
|
|
| def list_dir(self, path: str = ".") -> List[str]: |
| sp = self._safe_path(path) |
| return os.listdir(sp) |
|
|
| def read_image(self, path: str) -> Image.Image: |
| sp = self._safe_path(path) |
| return Image.open(sp) |
|
|
|
|
| def make_fs_tools(fs_mcp: FileSystemMCP) -> Dict[str, Any]: |
|
|
| @tool |
| def fs_read(path: str) -> str: |
| """Read a text file from the workspace.""" |
| return fs_mcp.read_file(path) |
|
|
| @tool |
| def fs_write(path: str, content: str) -> str: |
| """Write text content to a file in the workspace.""" |
| return fs_mcp.write_file(path, content) |
|
|
| @tool |
| def fs_list(path: str = ".") -> str: |
| """List files in a workspace directory.""" |
| return json.dumps(fs_mcp.list_dir(path)) |
|
|
| return { |
| "fs_read": fs_read, |
| "fs_write": fs_write, |
| "fs_list": fs_list, |
| } |
|
|
|
|
| |
| |
| |
|
|
| class HFHubMCP: |
| """Interact with the Hugging Face Hub from within the agent.""" |
|
|
| def __init__(self, token: Optional[str] = None): |
| self.token = token or os.getenv("HF_TOKEN") |
| from huggingface_hub import HfApi, upload_file, create_repo |
| self.api = HfApi(token=self.token) |
| self._upload_file = upload_file |
| self._create_repo = create_repo |
|
|
| def search_models(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: |
| models = self.api.list_models(search=query, limit=limit, sort="downloads") |
| return [{"id": m.id, "downloads": m.downloads, "tags": m.tags} for m in models] |
|
|
| def search_datasets(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: |
| datasets = self.api.list_datasets(search=query, limit=limit) |
| return [{"id": d.id, "tags": d.tags} for d in datasets] |
|
|
| def search_spaces(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: |
| spaces = self.api.list_spaces(search=query, limit=limit) |
| return [{"id": s.id, "sdk": getattr(s, "sdk", "unknown")} for s in spaces] |
|
|
| def upload_to_dataset(self, repo_id: str, file_path: str, path_in_repo: str) -> str: |
| self._upload_file( |
| path_or_fileobj=file_path, |
| path_in_repo=path_in_repo, |
| repo_id=repo_id, |
| repo_type="dataset", |
| token=self.token, |
| ) |
| return f"Uploaded {file_path} to {repo_id}/{path_in_repo}" |
|
|
|
|
| def make_hf_tools(hf_mcp: HFHubMCP) -> Dict[str, Any]: |
|
|
| @tool |
| def hf_search_models(query: str, limit: int = 10) -> str: |
| """Search Hugging Face Hub for models.""" |
| results = hf_mcp.search_models(query, limit) |
| return json.dumps(results, indent=2) |
|
|
| @tool |
| def hf_search_datasets(query: str, limit: int = 10) -> str: |
| """Search Hugging Face Hub for datasets.""" |
| results = hf_mcp.search_datasets(query, limit) |
| return json.dumps(results, indent=2) |
|
|
| @tool |
| def hf_search_spaces(query: str, limit: int = 10) -> str: |
| """Search Hugging Face Hub for Spaces.""" |
| results = hf_mcp.search_spaces(query, limit) |
| return json.dumps(results, indent=2) |
|
|
| @tool |
| def hf_upload_dataset_file(repo_id: str, file_path: str, path_in_repo: str) -> str: |
| """Upload a file to a Hugging Face dataset repository.""" |
| return hf_mcp.upload_to_dataset(repo_id, file_path, path_in_repo) |
|
|
| return { |
| "hf_search_models": hf_search_models, |
| "hf_search_datasets": hf_search_datasets, |
| "hf_search_spaces": hf_search_spaces, |
| "hf_upload_dataset_file": hf_upload_dataset_file, |
| } |
|
|