File size: 3,459 Bytes
5d89b25
 
 
 
5795c69
5d89b25
74d8962
5795c69
 
5d89b25
5795c69
5d89b25
5795c69
 
 
 
 
 
5d89b25
 
 
 
 
 
 
5795c69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5d89b25
 
 
 
5795c69
5d89b25
5795c69
5d89b25
 
b1b78f4
 
 
 
 
 
 
5d89b25
b1b78f4
 
 
 
5d89b25
b1b78f4
5d89b25
5795c69
74d8962
 
 
5d89b25
 
 
5795c69
5d89b25
 
5795c69
5d89b25
 
 
5795c69
5d89b25
 
 
5795c69
5d89b25
 
 
5795c69
5d89b25
 
 
 
5795c69
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import nest_asyncio
nest_asyncio.apply()

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, HttpUrl
from playwright.async_api import async_playwright
from playwright_stealth import stealth_async
from bs4 import BeautifulSoup, Comment
import re
import asyncio

app = FastAPI(title="Web Scraper API")

class ScrapeRequest(BaseModel):
    url: HttpUrl

@app.get("/")
def read_root():
    return {"message": "Welcome to the Playwright Web Scraping Service! Send a POST request to /scrape with a JSON body {'url': '...'} or use GET /scrape?url=..."}

def clean_html(html_content: str):
    soup = BeautifulSoup(html_content, "lxml")

    # Extract title before cleaning
    title = soup.title.string.strip() if soup.title else "No title found"

    # Remove script, style, iframe, and other non-content tags
    for tag in soup(["script", "style", "iframe", "noscript", "meta", "link", "svg", "button", "input", "form"]):
        tag.decompose()

    # Remove comments
    for comment in soup.find_all(text=lambda text: isinstance(text, Comment)):
        comment.extract()

    # Remove common ad and clutter classes/ids
    ad_patterns = re.compile(
        r"(ad|ads|advert|advertisement|banner|social|share|nav|footer|header|menu|sidebar|cookie|popup|modal|newsletter)",
        re.IGNORECASE
    )

    for tag in soup.find_all(attrs={"class": ad_patterns}):
        tag.decompose()
    for tag in soup.find_all(attrs={"id": ad_patterns}):
        tag.decompose()

    # Extract text
    text = soup.get_text(separator="\n", strip=True)
    # Simple cleanup of excessive newlines
    text = re.sub(r'\n{3,}', '\n\n', text)

    return title, text

async def scrape_with_playwright(url: str):
    async with async_playwright() as p:
        # Launch with arguments to hide automation
        browser = await p.chromium.launch(
            headless=True,
            args=["--disable-blink-features=AutomationControlled"]
        )

        # Use a modern User-Agent and realistic viewport
        context = await browser.new_context(
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
            viewport={"width": 1920, "height": 1080},
            locale="en-US",
            timezone_id="America/New_York"
        )

        page = await context.new_page()

        # Apply stealth to the page
        await stealth_async(page)

        try:
            # Go to URL and wait for network to be idle (load complete)
            await page.goto(url, wait_until="networkidle", timeout=30000)

            # Get content
            content = await page.content()

            return content
        finally:
            await browser.close()

@app.post("/scrape")
async def scrape_url(request: ScrapeRequest):
    return await process_scrape(str(request.url))

@app.get("/scrape")
async def scrape_url_get(url: str):
    return await process_scrape(url)

async def process_scrape(url: str):
    try:
        html_content = await scrape_with_playwright(url)
        title, text = clean_html(html_content)

        return {
            "url": url,
            "title": title,
            "content": text,
            "status": "success"
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Scraping error: {str(e)}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860)