Spaces:
Running
Running
| import json | |
| import logging | |
| from typing import Optional, Dict, Any, List | |
| from langchain_core.tools import tool | |
| from patchright.async_api import async_playwright, Browser, BrowserContext, Page | |
| import asyncio | |
| logger = logging.getLogger(__name__) | |
| # Global state for persistent browser session | |
| # Note: In a production API with multiple workers, this should be managed per-session/request. | |
| # For now, we use a simple mechanism to store session-specific browsers if session_id is provided via context, | |
| # but since tools interface doesn't easily pass that, we default to a singleton for single-user/cli usage. | |
| class BrowserManager: | |
| _instance = None | |
| _playwright = None | |
| _browser = None | |
| _context = None | |
| _page = None | |
| _lock = asyncio.Lock() | |
| async def get_page(cls, headless: bool = True) -> Page: | |
| async with cls._lock: | |
| if cls._playwright is None: | |
| cls._playwright = await async_playwright().start() | |
| if cls._browser is None: | |
| # Use Scrapling-compatible browser launch if needed, or standard playwright | |
| # Using standard playwright for tools to ensure full interactivity | |
| cls._browser = await cls._playwright.chromium.launch(headless=headless) | |
| if cls._context is None: | |
| cls._context = await cls._browser.new_context( | |
| viewport={'width': 1280, 'height': 800}, | |
| user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36' | |
| ) | |
| if cls._page is None: | |
| cls._page = await cls._context.new_page() | |
| return cls._page | |
| async def close(cls): | |
| async with cls._lock: | |
| if cls._context: | |
| await cls._context.close() | |
| cls._context = None | |
| cls._page = None | |
| if cls._browser: | |
| await cls._browser.close() | |
| cls._browser = None | |
| if cls._playwright: | |
| await cls._playwright.stop() | |
| cls._playwright = None | |
| # Helper to run async code synchronously for tool execution | |
| def run_sync(coro): | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| if loop.is_running(): | |
| # If we are already in an async loop (e.g. FastAPI), we can't block. | |
| # This is a limitation of the current tool design which returns strings immediately. | |
| # We might need to use nest_asyncio or assume tools are awaited by the caller. | |
| # However, LangChain tools can be async. | |
| # For this implementation, we will use a separate thread or new loop if possible, | |
| # but nest_asyncio is safer if available. | |
| import nest_asyncio | |
| nest_asyncio.apply() | |
| return loop.run_until_complete(coro) | |
| else: | |
| return loop.run_until_complete(coro) | |
| def browse_and_extract(url: str, selector: str = "body", use_persistent: bool = True) -> str: | |
| """Browse to a URL and extract text content from the specified CSS selector.""" | |
| async def _action(): | |
| try: | |
| page = await BrowserManager.get_page() | |
| await page.goto(url, wait_until="domcontentloaded") | |
| # Wait for selector if specific one provided | |
| if selector != "body": | |
| try: | |
| await page.wait_for_selector(selector, timeout=5000) | |
| except: | |
| pass # Continue if selector not found immediately | |
| element = await page.query_selector(selector) | |
| if element: | |
| text = await element.inner_text() | |
| return text | |
| return f"Element '{selector}' not found." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| return run_sync(_action()) | |
| def click_element(url: str, selector: str, use_persistent: bool = True) -> str: | |
| """Click an element on the page identified by the CSS selector. URL is ignored if persistent session is active.""" | |
| async def _action(): | |
| try: | |
| page = await BrowserManager.get_page() | |
| # If URL is different from current, navigate? | |
| # Usually agent provides URL context, but in persistent mode we might want to stay on current page | |
| # ignoring URL arg if we are already there-ish or if use_persistent is True | |
| if page.url == "about:blank" and url: | |
| await page.goto(url) | |
| await page.click(selector) | |
| return "Clicked element." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| return run_sync(_action()) | |
| def fill_field(url: str, selector: str, text: str, use_persistent: bool = True) -> str: | |
| """Fill a text field or form element identified by the CSS selector with the provided text.""" | |
| async def _action(): | |
| try: | |
| page = await BrowserManager.get_page() | |
| await page.fill(selector, text) | |
| return f"Filled '{selector}' with text." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| return run_sync(_action()) | |
| def execute_javascript(url: str, script: str, use_persistent: bool = True) -> str: | |
| """Execute custom JavaScript on the page and return the result.""" | |
| async def _action(): | |
| try: | |
| page = await BrowserManager.get_page() | |
| result = await page.evaluate(script) | |
| return str(result) | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| return run_sync(_action()) | |
| def get_cookies(url: str, use_persistent: bool = True) -> str: | |
| """Get all cookies for the current domain in JSON format.""" | |
| async def _action(): | |
| try: | |
| page = await BrowserManager.get_page() | |
| cookies = await page.context.cookies() | |
| return json.dumps(cookies) | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| return run_sync(_action()) | |
| def set_cookies(url: str, cookies_json: str, use_persistent: bool = True) -> str: | |
| """Set cookies on the page from a JSON string.""" | |
| async def _action(): | |
| try: | |
| page = await BrowserManager.get_page() | |
| cookies = json.loads(cookies_json) | |
| await page.context.add_cookies(cookies) | |
| return "Cookies set." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| return run_sync(_action()) | |
| def scroll_page(url: str, direction: str = "bottom", pixels: float = 500, use_persistent: bool = True) -> str: | |
| """Scroll the page in a specified direction ('bottom', 'top', 'down', 'up').""" | |
| async def _action(): | |
| try: | |
| page = await BrowserManager.get_page() | |
| if direction == "bottom": | |
| await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") | |
| elif direction == "top": | |
| await page.evaluate("window.scrollTo(0, 0)") | |
| elif direction == "down": | |
| await page.evaluate(f"window.scrollBy(0, {pixels})") | |
| elif direction == "up": | |
| await page.evaluate(f"window.scrollBy(0, -{pixels})") | |
| return f"Scrolled {direction}." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| return run_sync(_action()) | |
| def take_screenshot(url: str, full_page: bool = False, use_persistent: bool = True) -> str: | |
| """Take a screenshot of the current page and return base64 string.""" | |
| async def _action(): | |
| try: | |
| page = await BrowserManager.get_page() | |
| import base64 | |
| screenshot = await page.screenshot(full_page=full_page) | |
| return f"Screenshot captured (base64): {base64.b64encode(screenshot).decode()[:100]}..." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| return run_sync(_action()) | |
| def get_html_source(url: str, use_persistent: bool = True) -> str: | |
| """Get the full HTML source code of the current page.""" | |
| async def _action(): | |
| try: | |
| page = await BrowserManager.get_page() | |
| return await page.content() | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| return run_sync(_action()) | |
| def get_page_info(url: str, use_persistent: bool = True) -> str: | |
| """Get comprehensive page information including title, URL.""" | |
| async def _action(): | |
| try: | |
| page = await BrowserManager.get_page() | |
| return json.dumps({ | |
| "title": await page.title(), | |
| "url": page.url, | |
| "content_preview": (await page.content())[:500] | |
| }) | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| return run_sync(_action()) | |
| def wait_for_element(url: str, selector: str, timeout: float = 10, use_persistent: bool = True) -> str: | |
| """Wait for an element matching the CSS selector to appear on the page.""" | |
| async def _action(): | |
| try: | |
| page = await BrowserManager.get_page() | |
| await page.wait_for_selector(selector, timeout=timeout * 1000) | |
| return f"Element '{selector}' appeared." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| return run_sync(_action()) | |
| def task_complete(reason: str) -> str: | |
| """Call this tool when you have successfully completed the task and have the final data or answer.""" | |
| return f"TASK COMPLETE: {reason}" | |
| def agent_reflection(thought: str, adaptation_plan: str) -> str: | |
| """Call this tool to reflect on your progress, especially after an error or unexpected result. | |
| Explain what you've learned and how you're adapting your strategy.""" | |
| return f"REFLECTION: {thought}\nADAPTATION PLAN: {adaptation_plan}" | |
| def get_all_browser_tools(): | |
| """Returns a list of all browser automation tools.""" | |
| return [ | |
| browse_and_extract, | |
| click_element, | |
| fill_field, | |
| execute_javascript, | |
| get_cookies, | |
| set_cookies, | |
| scroll_page, | |
| take_screenshot, | |
| get_html_source, | |
| get_page_info, | |
| wait_for_element, | |
| task_complete, | |
| agent_reflection | |
| ] | |