File size: 6,918 Bytes
ec55f11
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import re
import requests
from bs4 import BeautifulSoup
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type


class WebScraperInput(BaseModel):
    website_url: str = Field(description="The URL of the website to scrape")


class CleanWebScraperTool(BaseTool):
    """Scrapes a website using a Playwright headless browser so JavaScript
    renders and 'See more' buttons get clicked before extraction.
    Falls back to a plain requests fetch if Playwright is unavailable."""

    name: str = "Scrape and Clean Website"
    description: str = (
        "Scrapes a website URL using a headless browser, expands collapsed "
        "sections (e.g. LinkedIn 'See more'), and returns cleaned text content."
    )
    args_schema: Type[BaseModel] = WebScraperInput

    def _run(self, website_url: str) -> str:
        try:
            return self._scrape_with_playwright(website_url)
        except Exception:
            try:
                return self._scrape_with_requests(website_url)
            except Exception as e:
                return f"Error scraping website: {e}"

    # ------------------------------------------------------------------
    # Playwright path
    # ------------------------------------------------------------------

    def _scrape_with_playwright(self, url: str) -> str:
        from playwright.sync_api import sync_playwright

        with sync_playwright() as p:
            browser = p.chromium.launch(headless=True)
            context = browser.new_context(
                user_agent=(
                    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                    "AppleWebKit/537.36 (KHTML, like Gecko) "
                    "Chrome/120.0.0.0 Safari/537.36"
                ),
                viewport={"width": 1280, "height": 800},
            )
            page = context.new_page()
            try:
                page.goto(url, wait_until="domcontentloaded", timeout=30000)
                page.wait_for_timeout(2000)

                self._expand_see_more(page)

                if "linkedin.com" in url:
                    text = self._extract_linkedin_jd(page)
                    if text:
                        browser.close()
                        return self._clean_content(text)

                text = page.locator("body").inner_text(timeout=10000)
            finally:
                browser.close()

        return self._clean_content(text)

    def _expand_see_more(self, page) -> None:
        """Click any visible 'See more' / 'Show more' expand buttons."""
        selectors = [
            "button.show-more-less-button",
            "button[aria-label*='See more']",
            "button[aria-label*='Show more']",
            "button:has-text('See more')",
            "button:has-text('Show more')",
        ]
        for selector in selectors:
            try:
                for btn in page.locator(selector).all():
                    if btn.is_visible():
                        btn.click(timeout=2000)
                        page.wait_for_timeout(500)
            except Exception:
                pass

    def _extract_linkedin_jd(self, page) -> str:
        """Extract only the job description block from a LinkedIn page."""
        selectors = [
            ".show-more-less-html__markup",
            ".description__text",
            "[data-test-id='job-description']",
            ".job-details-jobs-unified-top-card__job-description",
        ]
        for selector in selectors:
            try:
                el = page.locator(selector).first
                if el.is_visible(timeout=2000):
                    return el.inner_text(timeout=3000)
            except Exception:
                pass
        return ""

    # ------------------------------------------------------------------
    # Requests fallback
    # ------------------------------------------------------------------

    def _scrape_with_requests(self, url: str) -> str:
        headers = {
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/120.0.0.0 Safari/537.36"
            ),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.9",
        }
        response = requests.get(url, headers=headers, timeout=15)
        soup = BeautifulSoup(response.text, "html.parser")
        return self._clean_content(soup.get_text(" "))

    # ------------------------------------------------------------------
    # Shared cleaning
    # ------------------------------------------------------------------

    def _clean_content(self, raw_text: str, max_chars: int = 5000) -> str:
        """Remove navigation noise and truncate to control token usage."""
        if not raw_text:
            return ""

        skip_patterns = [
            "Sign in", "Join now", "Clear text", "Show more", "Show less",
            "Show fewer", "Similar jobs", "People also viewed", "Similar Searches",
            "More searches", "Agree & Join", "Set alert", "Get notified",
            "Forgot password?", "Cookie Policy", "Privacy Policy",
            "User Agreement", "Brand Policy", "Guest Controls",
            "Community Guidelines", "Get the app", "Referrals increase",
            "See who you know", "Sign in with Email", "New to LinkedIn",
            "open jobs", "LinkedIn is better on the app",
            "By clicking Continue", "Email or phone", "Password",
            "Expand search", "Skip to main content",
        ]
        language_patterns = [
            "(Arabic)", "(Bangla)", "(Czech)", "(Danish)", "(German)",
            "(Greek)", "(English)", "(Spanish)", "(Persian)", "(Finnish)",
            "(French)", "(Hindi)", "(Hungarian)", "(Indonesian)", "(Italian)",
            "(Hebrew)", "(Japanese)", "(Korean)", "(Marathi)", "(Malay)",
            "(Dutch)", "(Norwegian)", "(Punjabi)", "(Polish)", "(Portuguese)",
            "(Romanian)", "(Russian)", "(Swedish)", "(Telugu)", "(Thai)",
            "(Tagalog)", "(Turkish)", "(Ukrainian)", "(Vietnamese)",
            "(Chinese", "Language",
        ]

        lines = raw_text.split("\n")
        cleaned = []
        for line in lines:
            stripped = line.strip()
            if not stripped or len(stripped) < 3:
                continue
            if any(p in stripped for p in skip_patterns):
                continue
            if any(p in stripped for p in language_patterns):
                continue
            cleaned.append(stripped)

        result = "\n".join(cleaned)
        # Collapse runs of blank lines left after filtering
        result = re.sub(r"\n{3,}", "\n\n", result)

        if len(result) > max_chars:
            result = result[:max_chars] + "\n\n[Content truncated to save tokens]"

        return result