| from gradio_client import Client |
| import os, json, re |
|
|
| class MCPWebDriver: |
| class ChromeOptions: |
| def __init__(self): pass |
| def add_argument(self, arg): pass |
| def add_experimental_option(self, name, value): pass |
|
|
| def __init__(self, space_id="diamond-in/Browser-Use-mcp"): |
| self.client = Client(space_id) |
| self.current_url = "" |
| self.width = 1920 |
| self.height = 1080 |
|
|
| def set_window_size(self, width, height): |
| self.width, self.height = width, height |
|
|
| def get(self, url): |
| self.current_url = url |
| self.client.predict(url, True, api_name="/get_page_info") |
|
|
| def save_screenshot(self, path): |
| res = self.client.predict(self.current_url, False, True, api_name="/screenshot") |
| if isinstance(res, dict) and 'path' in res: |
| os.rename(res['path'], path) |
| elif isinstance(res, str) and os.path.exists(res): |
| os.rename(res, path) |
|
|
| def execute_script(self, script, *args): |
| |
| wrapper_js = """ |
| (function() { |
| const wrapResult = (obj) => { |
| if (!obj) return obj; |
| if (obj instanceof Element) { |
| let id = obj.getAttribute('data-mcp-id'); |
| if (!id) { |
| id = 'mcp-' + Math.random().toString(36).substr(2, 9); |
| obj.setAttribute('data-mcp-id', id); |
| } |
| return { __mcp_type: 'element', selector: '[data-mcp-id="' + id + '"]', tag_name: obj.tagName.toLowerCase() }; |
| } |
| if (Array.isArray(obj)) return obj.map(wrapResult); |
| if (typeof obj === 'object') { |
| const newObj = {}; |
| for (let key in obj) { |
| try { newObj[key] = wrapResult(obj[key]); } |
| catch(e) { newObj[key] = null; } |
| } |
| return newObj; |
| } |
| return obj; |
| }; |
| |
| const userScript = // USER_SCRIPT_HERE; |
| const result = (function() { |
| // USER_SCRIPT_HERE |
| })(); |
| return wrapResult(result); |
| })() |
| """ |
| |
| |
| |
| |
| |
| |
| wrapper_template = """ |
| (function() { |
| const wrapResult = (obj) => { |
| if (!obj) return obj; |
| if (typeof Element !== 'undefined' && obj instanceof Element) { |
| let id = obj.getAttribute('data-mcp-id'); |
| if (!id) { |
| id = 'mcp-' + Math.random().toString(36).substr(2, 9); |
| obj.setAttribute('data-mcp-id', id); |
| } |
| return { __mcp_type: 'element', selector: '[data-mcp-id="' + id + '"]', tag_name: obj.tagName.toLowerCase() }; |
| } |
| if (Array.isArray(obj)) return obj.map(wrapResult); |
| if (typeof obj === 'object' && obj !== null) { |
| const newObj = {}; |
| for (let key in obj) { |
| if (Object.prototype.hasOwnProperty.call(obj, key)) { |
| try { newObj[key] = wrapResult(obj[key]); } |
| catch(e) { } |
| } |
| } |
| return newObj; |
| } |
| return obj; |
| }; |
| const result = (function() { |
| // USER_SCRIPT_HERE |
| })(); |
| return wrapResult(result); |
| })() |
| """ |
| final_script = wrapper_template.replace("// USER_SCRIPT_HERE", script) |
|
|
| try: |
| res = self.client.predict(self.current_url, final_script, True, api_name="/execute_js") |
| |
| |
| try: |
| data = json.loads(res) |
| except: |
| if res.startswith("Script executed. Result: "): |
| res = res[len("Script executed. Result: "):] |
| try: data = json.loads(res) |
| except: data = res |
| |
| return self._reconstruct_elements(data) |
| except Exception as e: |
| print(f"Error executing script: {e}") |
| |
| return [] |
|
|
| def _reconstruct_elements(self, data): |
| if isinstance(data, dict): |
| if data.get('__mcp_type') == 'element': |
| return MCPWebElement(self, data['selector'], data.get('tag_name', 'div')) |
| return {k: self._reconstruct_elements(v) for k, v in data.items()} |
| if isinstance(data, list): |
| return [self._reconstruct_elements(v) for v in data] |
| return data |
|
|
| def quit(self): self.client.predict(api_name="/close_persistent_driver") |
| def find_elements(self, by, value): return [] |
| @property |
| def title(self): |
| res = self.client.predict(self.current_url, True, api_name="/get_page_info") |
| try: return json.loads(res).get("title", "") |
| except: return "" |
|
|
| class MCPWebElement: |
| def __init__(self, driver, selector, tag_name="div"): |
| self.driver, self.selector, self.tag_name = driver, selector, tag_name |
| def click(self): self.driver.client.predict(self.driver.current_url, self.selector, True, api_name="/click") |
| def send_keys(self, *keys): |
| text = "".join([str(k) for k in keys if isinstance(k, str)]) |
| self.driver.client.predict(self.driver.current_url, self.selector, text, True, api_name="/fill") |
| def get_attribute(self, name): return "" |
| def clear(self): pass |
|
|
| class ActionChains: |
| def __init__(self, driver): self.driver = driver |
| def click(self, element=None): |
| if element: element.click() |
| return self |
| def send_keys(self, *keys): |
| text = "".join([str(k) for k in keys if isinstance(k, str)]) |
| self.driver.client.predict(self.driver.current_url, "body", text, True, api_name="/fill") |
| return self |
| def perform(self): pass |
| def pause(self, seconds): return self |
| def key_down(self, key): return self |
| def key_up(self, key): return self |
|
|