Sinketji commited on
Commit
d0202f5
·
verified ·
1 Parent(s): 53ef5e6

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +97 -48
app.py CHANGED
@@ -3,14 +3,20 @@ from fastapi.responses import JSONResponse, HTMLResponse
3
  import requests
4
  from bs4 import BeautifulSoup
5
  from fake_useragent import UserAgent
6
- from playwright.sync_api import sync_playwright
7
  import os
8
 
 
 
 
 
 
 
 
9
  app = FastAPI(title="Ultra Powerful Scraper")
10
 
11
- # ---------------------------
12
- # Utility: Anti-bot headers
13
- # ---------------------------
14
  def get_headers():
15
  ua = UserAgent()
16
  return {
@@ -21,76 +27,119 @@ def get_headers():
21
  "DNT": "1",
22
  }
23
 
24
- # ---------------------------
25
- # Static scraping
26
- # ---------------------------
27
- def static_scrape(url: str):
28
- r = requests.get(url, headers=get_headers(), timeout=20)
29
- r.raise_for_status()
30
-
31
- soup = BeautifulSoup(r.text, "lxml")
 
 
32
 
 
 
33
  return {
34
- "success": True,
35
- "mode": "static",
36
  "title": soup.title.string.strip() if soup.title else "",
37
  "headings": [h.get_text(strip=True) for h in soup.find_all(["h1", "h2", "h3"])],
38
  "paragraphs": [p.get_text(strip=True) for p in soup.find_all("p")],
39
  "links": list(set(a["href"] for a in soup.find_all("a", href=True))),
40
  }
41
 
42
- # ---------------------------
43
- # Dynamic scraping (JS-heavy)
44
- # ---------------------------
45
- def dynamic_scrape(url: str):
46
- with sync_playwright() as p:
47
- browser = p.chromium.launch(headless=True)
48
- page = browser.new_page()
49
- page.goto(url, timeout=30000)
50
- page.wait_for_timeout(3000)
51
- html = page.content()
52
- browser.close()
53
 
54
- soup = BeautifulSoup(html, "lxml")
 
55
 
 
56
  return {
57
  "success": True,
58
- "mode": "dynamic",
59
- "title": soup.title.string.strip() if soup.title else "",
60
- "headings": [h.get_text(strip=True) for h in soup.find_all(["h1", "h2", "h3"])],
61
- "paragraphs": [p.get_text(strip=True) for p in soup.find_all("p")],
62
- "links": list(set(a["href"] for a in soup.find_all("a", href=True))),
63
  }
64
 
65
- # ---------------------------
66
- # API endpoint
67
- # ---------------------------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  @app.post("/scrape")
69
  def scrape(url: str = Form(...)):
70
  if not url.startswith("http"):
71
  return JSONResponse(
72
  status_code=400,
73
- content={"success": False, "error": "Invalid URL. Must start with http or https."}
 
 
 
74
  )
75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
  try:
77
- return JSONResponse(static_scrape(url))
78
- except Exception:
79
- try:
80
- return JSONResponse(dynamic_scrape(url))
81
- except Exception as e:
82
- return JSONResponse(
83
- status_code=500,
84
- content={"success": False, "error": str(e)}
85
- )
86
-
87
- # ---------------------------
 
 
 
 
 
 
 
 
 
88
  # Serve UI
89
- # ---------------------------
90
  @app.get("/", response_class=HTMLResponse)
91
  def serve_ui():
92
  if not os.path.exists("index.html"):
93
- return "<h1>UI file (index.html) not found</h1>"
94
 
95
  with open("index.html", "r", encoding="utf-8") as f:
96
  return f.read()
 
3
  import requests
4
  from bs4 import BeautifulSoup
5
  from fake_useragent import UserAgent
 
6
  import os
7
 
8
+ # Stealth engine
9
+ from stealth_browser import (
10
+ launch_stealth_browser,
11
+ stealth_goto,
12
+ close_browser
13
+ )
14
+
15
  app = FastAPI(title="Ultra Powerful Scraper")
16
 
17
+ # --------------------------------
18
+ # Utils
19
+ # --------------------------------
20
  def get_headers():
21
  ua = UserAgent()
22
  return {
 
27
  "DNT": "1",
28
  }
29
 
30
+ def is_cloudflare_page(html: str) -> bool:
31
+ markers = [
32
+ "cf-browser-verification",
33
+ "cloudflare",
34
+ "Attention Required!",
35
+ "Checking your browser",
36
+ "/cdn-cgi/"
37
+ ]
38
+ html_lower = html.lower()
39
+ return any(m.lower() in html_lower for m in markers)
40
 
41
+ def parse_html(html: str):
42
+ soup = BeautifulSoup(html, "lxml")
43
  return {
 
 
44
  "title": soup.title.string.strip() if soup.title else "",
45
  "headings": [h.get_text(strip=True) for h in soup.find_all(["h1", "h2", "h3"])],
46
  "paragraphs": [p.get_text(strip=True) for p in soup.find_all("p")],
47
  "links": list(set(a["href"] for a in soup.find_all("a", href=True))),
48
  }
49
 
50
+ # --------------------------------
51
+ # Static scrape
52
+ # --------------------------------
53
+ def static_scrape(url: str):
54
+ r = requests.get(url, headers=get_headers(), timeout=20)
55
+ r.raise_for_status()
 
 
 
 
 
56
 
57
+ if is_cloudflare_page(r.text):
58
+ raise RuntimeError("Cloudflare detected")
59
 
60
+ data = parse_html(r.text)
61
  return {
62
  "success": True,
63
+ "engine": "requests",
64
+ "bypass": "not_needed",
65
+ **data
 
 
66
  }
67
 
68
+ # --------------------------------
69
+ # Stealth scrape (Cloudflare-aware)
70
+ # --------------------------------
71
+ def stealth_scrape(url: str):
72
+ p, browser, context, page = launch_stealth_browser(headless=True)
73
+
74
+ try:
75
+ html = stealth_goto(page, url)
76
+ data = parse_html(html)
77
+
78
+ return {
79
+ "success": True,
80
+ "engine": "playwright-stealth",
81
+ "bypass": "attempted",
82
+ **data
83
+ }
84
+ finally:
85
+ close_browser(p, browser)
86
+
87
+ # --------------------------------
88
+ # API
89
+ # --------------------------------
90
  @app.post("/scrape")
91
  def scrape(url: str = Form(...)):
92
  if not url.startswith("http"):
93
  return JSONResponse(
94
  status_code=400,
95
+ content={
96
+ "success": False,
97
+ "error": "Invalid URL. Must start with http or https."
98
+ }
99
  )
100
 
101
+ logs = []
102
+
103
+ # 1️⃣ Try static
104
+ try:
105
+ logs.append("Trying static scraping (requests)")
106
+ result = static_scrape(url)
107
+ logs.append("Static scrape successful")
108
+
109
+ result["logs"] = logs
110
+ return JSONResponse(result)
111
+
112
+ except Exception as e:
113
+ logs.append(f"Static failed: {str(e)}")
114
+
115
+ # 2️⃣ Fallback to stealth
116
  try:
117
+ logs.append("Switching to stealth browser (Cloudflare bypass)")
118
+ result = stealth_scrape(url)
119
+ logs.append("Stealth scrape completed")
120
+
121
+ result["logs"] = logs
122
+ return JSONResponse(result)
123
+
124
+ except Exception as e:
125
+ logs.append(f"Stealth failed: {str(e)}")
126
+
127
+ return JSONResponse(
128
+ status_code=500,
129
+ content={
130
+ "success": False,
131
+ "error": "All scraping methods failed",
132
+ "logs": logs
133
+ }
134
+ )
135
+
136
+ # --------------------------------
137
  # Serve UI
138
+ # --------------------------------
139
  @app.get("/", response_class=HTMLResponse)
140
  def serve_ui():
141
  if not os.path.exists("index.html"):
142
+ return "<h1>index.html not found</h1>"
143
 
144
  with open("index.html", "r", encoding="utf-8") as f:
145
  return f.read()