File size: 3,846 Bytes
a53470d
6e9b7bf
a53470d
 
 
6e9b7bf
a53470d
d0202f5
 
 
 
 
 
 
a53470d
 
d0202f5
 
 
a53470d
 
 
 
 
 
 
 
 
 
d0202f5
 
 
 
 
 
 
 
 
 
a53470d
d0202f5
 
a53470d
6e9b7bf
a53470d
 
6e9b7bf
a53470d
 
d0202f5
 
 
 
 
 
a53470d
d0202f5
 
a53470d
d0202f5
a53470d
6e9b7bf
d0202f5
 
 
a53470d
 
d0202f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a53470d
 
6e9b7bf
 
 
d0202f5
 
 
 
6e9b7bf
 
d0202f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a53470d
d0202f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6e9b7bf
d0202f5
6e9b7bf
 
 
d0202f5
a53470d
6e9b7bf
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from fastapi import FastAPI, Form
from fastapi.responses import JSONResponse, HTMLResponse
import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
import os

# Stealth engine
from stealth_browser import (
    launch_stealth_browser,
    stealth_goto,
    close_browser
)

app = FastAPI(title="Ultra Powerful Scraper")

# --------------------------------
# Utils
# --------------------------------
def get_headers():
    ua = UserAgent()
    return {
        "User-Agent": ua.random,
        "Accept-Language": "en-US,en;q=0.9",
        "Accept": "text/html,application/xhtml+xml",
        "Connection": "keep-alive",
        "DNT": "1",
    }

def is_cloudflare_page(html: str) -> bool:
    markers = [
        "cf-browser-verification",
        "cloudflare",
        "Attention Required!",
        "Checking your browser",
        "/cdn-cgi/"
    ]
    html_lower = html.lower()
    return any(m.lower() in html_lower for m in markers)

def parse_html(html: str):
    soup = BeautifulSoup(html, "lxml")
    return {
        "title": soup.title.string.strip() if soup.title else "",
        "headings": [h.get_text(strip=True) for h in soup.find_all(["h1", "h2", "h3"])],
        "paragraphs": [p.get_text(strip=True) for p in soup.find_all("p")],
        "links": list(set(a["href"] for a in soup.find_all("a", href=True))),
    }

# --------------------------------
# Static scrape
# --------------------------------
def static_scrape(url: str):
    r = requests.get(url, headers=get_headers(), timeout=20)
    r.raise_for_status()

    if is_cloudflare_page(r.text):
        raise RuntimeError("Cloudflare detected")

    data = parse_html(r.text)
    return {
        "success": True,
        "engine": "requests",
        "bypass": "not_needed",
        **data
    }

# --------------------------------
# Stealth scrape (Cloudflare-aware)
# --------------------------------
def stealth_scrape(url: str):
    p, browser, context, page = launch_stealth_browser(headless=True)

    try:
        html = stealth_goto(page, url)
        data = parse_html(html)

        return {
            "success": True,
            "engine": "playwright-stealth",
            "bypass": "attempted",
            **data
        }
    finally:
        close_browser(p, browser)

# --------------------------------
# API
# --------------------------------
@app.post("/scrape")
def scrape(url: str = Form(...)):
    if not url.startswith("http"):
        return JSONResponse(
            status_code=400,
            content={
                "success": False,
                "error": "Invalid URL. Must start with http or https."
            }
        )

    logs = []

    # 1️⃣ Try static
    try:
        logs.append("Trying static scraping (requests)")
        result = static_scrape(url)
        logs.append("Static scrape successful")

        result["logs"] = logs
        return JSONResponse(result)

    except Exception as e:
        logs.append(f"Static failed: {str(e)}")

    # 2️⃣ Fallback to stealth
    try:
        logs.append("Switching to stealth browser (Cloudflare bypass)")
        result = stealth_scrape(url)
        logs.append("Stealth scrape completed")

        result["logs"] = logs
        return JSONResponse(result)

    except Exception as e:
        logs.append(f"Stealth failed: {str(e)}")

    return JSONResponse(
        status_code=500,
        content={
            "success": False,
            "error": "All scraping methods failed",
            "logs": logs
        }
    )

# --------------------------------
# Serve UI
# --------------------------------
@app.get("/", response_class=HTMLResponse)
def serve_ui():
    if not os.path.exists("index.html"):
        return "<h1>index.html not found</h1>"

    with open("index.html", "r", encoding="utf-8") as f:
        return f.read()