File size: 3,474 Bytes
a559920
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# playwright_tool.py
from typing import Dict, Optional
from playwright.async_api import async_playwright
import re
import html
import time

try:
    # if you have the same decorator you used for serper
    from agents import function_tool  # or wherever your decorator is
except Exception:
    # fallback no-op if you call it directly
    def function_tool(fn): return fn

def _collapse_ws(s: str) -> str:
    s = html.unescape(s or "")
    s = re.sub(r"\r\n|\r", "\n", s)
    s = re.sub(r"[ \t\f\v]+", " ", s)
    s = re.sub(r"\n[ \t]+", "\n", s)
    s = re.sub(r"\n{3,}", "\n\n", s)
    return s.strip()

@function_tool
async def playwright_web_read(
    url: str,
    wait_selector: Optional[str] = None,
    render_js: bool = True,
    timeout_ms: int = 120000,
    max_chars: int = 200_000,
    user_agent: Optional[str] = None,
) -> Dict[str, object]:
    """
    Fetch visible page text using Playwright (Chromium, headless).
    Args:
      url: The URL to visit.
      wait_selector: CSS selector to wait for (optional).
      render_js: If False, disable JS for faster loads on static pages.
      timeout_ms: Overall nav+wait timeout.
      max_chars: Truncate returned text to avoid huge payloads.
      user_agent: Optional UA string.
    Returns:
      { "title", "final_url", "status", "text", "elapsed_ms" }
    """
    t0 = time.time()
    title = ""
    final_url = url
    status = 0
    text = ""

    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        try:
            context_kwargs = {}
            if user_agent:
                context_kwargs["user_agent"] = user_agent
            if not render_js:
                context_kwargs["java_script_enabled"] = False

            context = await browser.new_context(**context_kwargs)
            page = await context.new_page()

            # Conservative wait_until to get dynamic content when render_js=True
            wait_until = "networkidle" if render_js else "domcontentloaded"
            resp = await page.goto(url, wait_until=wait_until, timeout=timeout_ms)
            if resp:
                status = resp.status or 0
                final_url = page.url

            if wait_selector:
                try:
                    await page.wait_for_selector(wait_selector, timeout=timeout_ms)
                except Exception:
                    pass  # don't fail just because selector not found

            try:
                title = await page.title() or ""
            except Exception:
                title = ""

            # Prefer visible text; fall back to body textContent.
            try:
                # inner_text("body") respects visibility better than content()
                text = await page.inner_text("body", timeout=2000)
            except Exception:
                try:
                    text = await page.evaluate("document.body ? document.body.innerText : ''") or ""
                except Exception:
                    text = ""

            text = _collapse_ws(text)
            if len(text) > max_chars:
                text = text[:max_chars]

            return {
                "title": title,
                "final_url": final_url,
                "status": status,
                "text": text,
                "elapsed_ms": int((time.time() - t0) * 1000),
            }
        finally:
            try:
                await browser.close()
            except Exception:
                pass