Baskar2005 commited on
Commit
d376d1c
·
verified ·
1 Parent(s): 122e402

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +83 -149
app.py CHANGED
@@ -4,6 +4,7 @@ from concurrent.futures import ThreadPoolExecutor
4
  import time
5
  import os
6
  import re
 
7
 
8
  app = Flask(__name__)
9
 
@@ -16,79 +17,48 @@ def identify_url_type(url):
16
  if "/reel/" in url: return "REEL"
17
  if "/p/" in url: return "POST"
18
  if url.strip("/") == "https://www.instagram.com": return "SYSTEM"
19
- if "/explore/" in url or "/direct/" in url or "/stories/" in url: return "SYSTEM"
20
  if "instagram.com/" in url: return "PROFILE"
21
  return "UNKNOWN"
22
 
23
- # --- HELPER: MANUAL STEALTH (The Magic Fix) ---
24
  def apply_stealth(page):
25
- """
26
- Manually overrides browser variables to hide 'Headless' status.
27
- This replaces the broken 'playwright-stealth' library.
28
- """
29
- # 1. Hide WebDriver Flag
30
  page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
31
-
32
- # 2. Mock Chrome Runtime
33
  page.add_init_script("window.navigator.chrome = { runtime: {} };")
34
-
35
- # 3. Mock Plugins (Headless browsers have 0, Humans have many)
36
  page.add_init_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})")
37
-
38
- # 4. Mock Languages
39
  page.add_init_script("Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})")
40
 
41
- # --- HELPER: RECURSIVE SEARCH ---
42
- def find_username_in_json(obj):
 
43
  if isinstance(obj, dict):
44
- if "owner" in obj and isinstance(obj["owner"], dict):
45
- if "username" in obj["owner"]: return obj["owner"]["username"]
46
- if "username" in obj and "is_verified" in obj: return obj["username"]
47
  for k, v in obj.items():
48
- if isinstance(v, (dict, list)):
49
- res = find_username_in_json(v)
50
- if res: return res
51
  elif isinstance(obj, list):
52
  for item in obj:
53
- res = find_username_in_json(item)
54
- if res: return res
55
  return None
56
 
57
  def scrape_single_url(url):
58
  if not url or not url.strip(): return None
59
 
60
  with sync_playwright() as p:
61
- # 1. LAUNCH BROWSER (Headless=True for Server)
62
  browser = p.chromium.launch(
63
  headless=True,
64
- args=[
65
- "--disable-blink-features=AutomationControlled", # Standard bot hide
66
- "--no-sandbox",
67
- "--disable-dev-shm-usage"
68
- ]
69
  )
70
 
71
- # 2. CONFIGURE CONTEXT (Windows 10 Fingerprint)
72
- context_args = {
73
- "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
74
- "viewport": {"width": 1920, "height": 1080},
75
- "locale": "en-US",
76
- "timezone_id": "America/New_York"
77
- }
78
-
79
- # Load Session if available, else Guest
80
- if os.path.exists(SESSION_FILE):
81
- try:
82
- context = browser.new_context(storage_state=SESSION_FILE, **context_args)
83
- except:
84
- print("⚠️ Session Corrupt. Switching to Guest Mode.")
85
- context = browser.new_context(**context_args)
86
- else:
87
- context = browser.new_context(**context_args)
88
 
89
  page = context.new_page()
90
-
91
- # 3. APPLY MANUAL STEALTH
92
  apply_stealth(page)
93
 
94
  print(f"⚡ Processing: {url}")
@@ -103,131 +73,94 @@ def scrape_single_url(url):
103
  "status": "Starting"
104
  }
105
 
106
- if data["type"] in ["SYSTEM", "UNKNOWN"]:
107
- data["status"] = "Skipped"
108
- browser.close()
109
- return data
110
-
111
- try:
112
- # === NAVIGATION ===
113
- page.goto(url, wait_until="commit", timeout=60000)
114
-
115
- # Check Login Wall
116
- time.sleep(4)
117
- if "Login" in page.title():
118
- data["status"] = "Failed (Login Block)"
119
- browser.close()
120
- return data
121
 
122
- # === PATH A: PROFILE ===
123
- if data["type"] == "PROFILE":
124
- time.sleep(2)
125
  try:
126
- followers_link = page.locator("a[href*='/followers/']").first
127
- if followers_link.count() > 0:
128
- title = followers_link.locator("span[title]").first
129
- if title.count() > 0:
130
- data["followers"] = title.get_attribute("title")
131
- else:
132
- data["followers"] = followers_link.inner_text().split("\n")[0]
 
 
 
 
 
 
133
  except: pass
134
-
135
- if not data["author"]:
136
- data["author"] = url.strip("/").split("/")[-1]
137
- data["status"] = "Success"
138
 
139
- # === PATH B: MEDIA (REEL/POST) ===
140
- elif data["type"] in ["REEL", "POST"]:
141
- if "/reel/" in url:
142
- shortcode = url.split("/reel/")[1].split("/")[0]
143
- else:
144
- shortcode = url.split("/p/")[1].split("/")[0]
145
 
146
- captured_info = {"username": None}
147
-
148
- def handle_response(response):
149
- if "instagram.com" in response.url and "json" in response.headers.get("content-type", ""):
150
- try:
151
- json_data = response.json()
152
- found = find_username_in_json(json_data)
153
- if found and not captured_info["username"]:
154
- captured_info["username"] = found
155
- except: pass
156
 
157
- page.on("response", handle_response)
158
- time.sleep(3)
159
- page.remove_listener("response", handle_response)
 
 
 
 
160
 
161
- # Likes
162
- try:
163
- meta_desc = page.locator('meta[property="og:description"]').get_attribute("content")
164
- if meta_desc:
165
- likes_match = re.search(r'^([0-9,.]+[KkMm]?) likes', meta_desc)
166
- if likes_match: data["likes"] = likes_match.group(1)
167
- except: pass
168
 
169
- # Author
170
- if captured_info["username"]:
171
- data["author"] = captured_info["username"]
 
172
 
 
173
  if not data["author"]:
174
  try:
175
  title = page.title()
176
- match = re.search(r'\(@(.*?)\)', title)
177
  if match: data["author"] = match.group(1)
178
  except: pass
179
 
180
- if not data["author"]:
181
- try:
182
- links = page.locator("a[href*='/reels/']").all()
183
- for link in links:
184
- href = link.get_attribute("href")
185
- if href and "/reels/" in href:
186
- parts = href.strip("/").split("/")
187
- if len(parts) >= 2 and parts[-1] == "reels":
188
- data["author"] = parts[-2]
189
- break
190
- except: pass
191
-
192
- # Views
193
  if data["author"]:
194
- is_video = (data["type"] == "REEL")
 
 
 
 
195
  try:
196
- if "video" in page.locator('meta[property="og:type"]').get_attribute("content"): is_video = True
 
 
 
197
  except: pass
198
 
199
- if is_video:
200
- page.goto(f"https://www.instagram.com/{data['author']}/reels/", wait_until="domcontentloaded")
201
- time.sleep(3)
202
-
203
- if "/reels/" in page.url:
204
- try:
205
- target_card = page.locator(f"a[href*='{shortcode}']").first
206
- card_text = target_card.inner_text()
207
- for line in card_text.split('\n'):
208
- if any(char.isdigit() for char in line):
209
  data["views"] = line.strip()
210
  break
211
- except:
212
- data["views"] = "Not Found"
213
- else:
214
- data["views"] = "N/A (Photo)"
215
-
216
- # Followers (Bonus)
217
- try:
218
- fol_link = page.locator("a[href*='/followers/']").first
219
- if fol_link.count() > 0:
220
- t = fol_link.locator("span[title]").first
221
- data["followers"] = t.get_attribute("title")
222
- except: pass
223
-
224
- data["status"] = "Success"
225
- else:
226
- data["status"] = "Failed (No Author)"
227
 
228
  except Exception as e:
229
  data["status"] = "Error"
230
  print(f"❌ Error: {e}")
 
 
231
 
232
  browser.close()
233
  return data
@@ -266,4 +199,5 @@ def scrape_api():
266
  return jsonify(results)
267
 
268
  if __name__ == '__main__':
269
- app.run(host='0.0.0.0', port=7860)
 
 
4
  import time
5
  import os
6
  import re
7
+ import json
8
 
9
  app = Flask(__name__)
10
 
 
17
  if "/reel/" in url: return "REEL"
18
  if "/p/" in url: return "POST"
19
  if url.strip("/") == "https://www.instagram.com": return "SYSTEM"
 
20
  if "instagram.com/" in url: return "PROFILE"
21
  return "UNKNOWN"
22
 
23
+ # 🔥 MANUAL STEALTH: Hides "Headless" status from Instagram
24
  def apply_stealth(page):
 
 
 
 
 
25
  page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
 
 
26
  page.add_init_script("window.navigator.chrome = { runtime: {} };")
 
 
27
  page.add_init_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})")
 
 
28
  page.add_init_script("Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})")
29
 
30
+ # --- DATA HELPER ---
31
+ def safe_find_key(obj, key):
32
+ """Recursively searches for a key in nested JSON."""
33
  if isinstance(obj, dict):
34
+ if key in obj: return obj[key]
 
 
35
  for k, v in obj.items():
36
+ res = safe_find_key(v, key)
37
+ if res is not None: return res
 
38
  elif isinstance(obj, list):
39
  for item in obj:
40
+ res = safe_find_key(item, key)
41
+ if res is not None: return res
42
  return None
43
 
44
  def scrape_single_url(url):
45
  if not url or not url.strip(): return None
46
 
47
  with sync_playwright() as p:
48
+ # 1. LAUNCH BROWSER (Optimized for Server)
49
  browser = p.chromium.launch(
50
  headless=True,
51
+ args=["--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-dev-shm-usage"]
 
 
 
 
52
  )
53
 
54
+ # 2. CONTEXT (Mobile User Agent = Easier Data Access)
55
+ context = browser.new_context(
56
+ user_agent="Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36",
57
+ viewport={"width": 412, "height": 915},
58
+ locale="en-US"
59
+ )
 
 
 
 
 
 
 
 
 
 
 
60
 
61
  page = context.new_page()
 
 
62
  apply_stealth(page)
63
 
64
  print(f"⚡ Processing: {url}")
 
73
  "status": "Starting"
74
  }
75
 
76
+ # --- 3. NETWORK SNIFFER SETUP ---
77
+ captured_data = {"play_count": None, "username": None, "like_count": None}
 
 
 
 
 
 
 
 
 
 
 
 
 
78
 
79
+ def handle_response(response):
80
+ if "instagram.com" in response.url and ("json" in response.headers.get("content-type", "") or "graphql" in response.url):
 
81
  try:
82
+ json_data = response.json()
83
+ # Capture Views/Plays
84
+ if not captured_data["play_count"]:
85
+ plays = safe_find_key(json_data, "play_count") or safe_find_key(json_data, "video_view_count")
86
+ if plays: captured_data["play_count"] = plays
87
+ # Capture Likes
88
+ if not captured_data["like_count"]:
89
+ likes = safe_find_key(json_data, "like_count")
90
+ if likes: captured_data["like_count"] = likes
91
+ # Capture Author
92
+ if not captured_data["username"]:
93
+ user = safe_find_key(json_data, "username")
94
+ if user: captured_data["username"] = user
95
  except: pass
 
 
 
 
96
 
97
+ page.on("response", handle_response)
 
 
 
 
 
98
 
99
+ try:
100
+ # === NAVIGATION ===
101
+ page.goto(url, wait_until="commit", timeout=45000)
102
+ page.wait_for_timeout(5000) # Wait for network packets
 
 
 
 
 
 
103
 
104
+ # 📸 DEBUG: Take screenshot if blocked
105
+ if "Login" in page.title() or "Page Not Found" in page.title():
106
+ print(" ⚠️ Blocked! Saving debug_error.png")
107
+ page.screenshot(path="debug_error.png")
108
+ data["status"] = "Failed (Login Block)"
109
+ browser.close()
110
+ return data
111
 
112
+ # Fill data from Network Sniffer
113
+ if captured_data["play_count"]: data["views"] = str(captured_data["play_count"])
114
+ if captured_data["like_count"]: data["likes"] = str(captured_data["like_count"])
115
+ if captured_data["username"]: data["author"] = captured_data["username"]
 
 
 
116
 
117
+ # --- 4. FALLBACK: VISUAL SCRAPING ---
118
+ # If network failed, try reading the screen
119
+ if (data["views"] == "N/A" and data["type"] == "REEL") or not data["author"]:
120
+ print(" ⚠️ Network missed data. Switching to Visual Scraping...")
121
 
122
+ # Get Author from Title if missing
123
  if not data["author"]:
124
  try:
125
  title = page.title()
126
+ match = re.search(r'\(@(.*?)\)', title)
127
  if match: data["author"] = match.group(1)
128
  except: pass
129
 
130
+ # Go to Profile for Followers & Views
 
 
 
 
 
 
 
 
 
 
 
 
131
  if data["author"]:
132
+ if "/reels/" not in page.url:
133
+ page.goto(f"https://www.instagram.com/{data['author']}/reels/", wait_until="domcontentloaded")
134
+ page.wait_for_timeout(3000)
135
+
136
+ # Try to find Followers (Meta Description)
137
  try:
138
+ meta = page.locator('meta[property="og:description"]').get_attribute("content")
139
+ if meta:
140
+ parts = meta.split("Followers")
141
+ if len(parts) > 1: data["followers"] = parts[0].strip().split(" ")[-1]
142
  except: pass
143
 
144
+ # Try to find View Count on Grid
145
+ if data["views"] == "N/A":
146
+ try:
147
+ shortcode = url.split("/reel/")[1].split("/")[0]
148
+ card = page.locator(f"a[href*='{shortcode}']").first
149
+ if card.count() > 0:
150
+ txt = card.inner_text()
151
+ for line in txt.split('\n'):
152
+ if any(c.isdigit() for c in line):
 
153
  data["views"] = line.strip()
154
  break
155
+ except: pass
156
+
157
+ data["status"] = "Success"
 
 
 
 
 
 
 
 
 
 
 
 
 
158
 
159
  except Exception as e:
160
  data["status"] = "Error"
161
  print(f"❌ Error: {e}")
162
+ try: page.screenshot(path="debug_crash.png")
163
+ except: pass
164
 
165
  browser.close()
166
  return data
 
199
  return jsonify(results)
200
 
201
  if __name__ == '__main__':
202
+ port = int(os.environ.get("PORT", 10000))
203
+ app.run(host='0.0.0.0', port=port)