Baskar2005 commited on
Commit
43ed985
·
verified ·
1 Parent(s): 8b5cd50

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +158 -129
app.py CHANGED
@@ -1,157 +1,188 @@
1
  from flask import Flask, render_template, request, jsonify
2
  from playwright.sync_api import sync_playwright
3
- from concurrent.futures import ThreadPoolExecutor
4
  import time
5
  import os
6
  import re
7
- import json
8
 
9
  app = Flask(__name__)
10
 
11
  # ---------------- CONFIGURATION ---------------- #
12
  SESSION_FILE = "instagram_session.json"
13
- MAX_WORKERS = 3
 
14
  # ----------------------------------------------- #
15
 
16
  def identify_url_type(url):
17
  if "/reel/" in url: return "REEL"
18
  if "/p/" in url: return "POST"
19
  if url.strip("/") == "https://www.instagram.com": return "SYSTEM"
 
20
  if "instagram.com/" in url: return "PROFILE"
21
  return "UNKNOWN"
22
 
23
- # 🔥 MANUAL STEALTH: The Key to Headless=True 🔥
24
- def apply_stealth(page):
25
- """
26
- Overwrites browser variables so Instagram thinks
27
- this is a real mobile device, not a server.
28
- """
29
- page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
30
- page.add_init_script("window.navigator.chrome = { runtime: {} };")
31
- page.add_init_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})")
32
- page.add_init_script("Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})")
33
-
34
- # --- HELPER: RECURSIVE SEARCH ---
35
- def safe_find_key(obj, key):
36
- if isinstance(obj, dict):
37
- if key in obj: return obj[key]
38
- for k, v in obj.items():
39
- res = safe_find_key(v, key)
40
- if res is not None: return res
41
- elif isinstance(obj, list):
42
- for item in obj:
43
- res = safe_find_key(item, key)
44
- if res is not None: return res
45
- return None
46
-
47
- def scrape_single_url(url):
48
- if not url or not url.strip(): return None
49
 
50
  with sync_playwright() as p:
51
- # 1. LAUNCH BROWSER (Headless=True is REQUIRED for Server)
52
- browser = p.chromium.launch(
53
- headless=True,
54
- args=["--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-dev-shm-usage"]
55
- )
56
-
57
- # 2. CONFIGURE CONTEXT (Fake Android Phone)
58
- context = browser.new_context(
59
- user_agent="Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36",
60
- viewport={"width": 412, "height": 915},
61
- locale="en-US"
62
- )
63
-
64
  page = context.new_page()
65
- apply_stealth(page)
66
-
67
- print(f"⚡ Processing: {url}")
68
-
69
- data = {
70
- "url": url,
71
- "type": identify_url_type(url),
72
- "author": None,
73
- "followers": "N/A",
74
- "likes": "N/A",
75
- "views": "N/A",
76
- "status": "Starting"
77
- }
78
-
79
- # --- 3. NETWORK SNIFFER ---
80
- captured_data = {"play_count": None, "username": None, "like_count": None}
81
-
82
- def handle_response(response):
83
- if "instagram.com" in response.url and ("json" in response.headers.get("content-type", "") or "graphql" in response.url):
84
- try:
85
- json_data = response.json()
86
- if not captured_data["play_count"]:
87
- plays = safe_find_key(json_data, "play_count") or safe_find_key(json_data, "video_view_count")
88
- if plays: captured_data["play_count"] = plays
89
- if not captured_data["like_count"]:
90
- likes = safe_find_key(json_data, "like_count")
91
- if likes: captured_data["like_count"] = likes
92
- if not captured_data["username"]:
93
- user = safe_find_key(json_data, "username")
94
- if user: captured_data["username"] = user
95
- except: pass
96
-
97
- page.on("response", handle_response)
98
-
99
- try:
100
- # === NAVIGATION ===
101
- page.goto(url, wait_until="commit", timeout=45000)
102
- page.wait_for_timeout(5000)
103
-
104
- if "Login" in page.title():
105
- data["status"] = "Failed (Login Block)"
106
- browser.close()
107
- return data
108
-
109
- if captured_data["play_count"]: data["views"] = str(captured_data["play_count"])
110
- if captured_data["like_count"]: data["likes"] = str(captured_data["like_count"])
111
- if captured_data["username"]: data["author"] = captured_data["username"]
112
-
113
- # --- 4. FALLBACK: VISUAL SCRAPING ---
114
- if (data["views"] == "N/A" and data["type"] == "REEL") or not data["author"]:
115
- if not data["author"]:
116
  try:
117
- title = page.title()
118
- match = re.search(r'\(@(.*?)\)', title)
119
- if match: data["author"] = match.group(1)
 
 
 
 
120
  except: pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
 
122
- if data["author"]:
123
- if "/reels/" not in page.url:
124
- page.goto(f"https://www.instagram.com/{data['author']}/reels/", wait_until="domcontentloaded")
125
- page.wait_for_timeout(3000)
126
-
127
  try:
128
- meta = page.locator('meta[property="og:description"]').get_attribute("content")
129
- if meta:
130
- parts = meta.split("Followers")
131
- if len(parts) > 1: data["followers"] = parts[0].strip().split(" ")[-1]
132
  except: pass
133
 
134
- if data["views"] == "N/A":
 
 
135
  try:
136
- shortcode = url.split("/reel/")[1].split("/")[0]
137
- card = page.locator(f"a[href*='{shortcode}']").first
138
- if card.count() > 0:
139
- txt = card.inner_text()
140
- for line in txt.split('\n'):
141
- if any(c.isdigit() for c in line):
142
- data["views"] = line.strip()
143
- break
144
  except: pass
145
 
146
- data["status"] = "Success"
 
 
 
 
 
 
 
 
 
 
 
 
147
 
148
- except Exception as e:
149
- data["status"] = "Error"
150
- print(f"❌ Error: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
151
 
 
 
 
 
 
 
 
152
  browser.close()
153
- return data
154
 
 
155
  @app.route('/')
156
  def home():
157
  return render_template('index.html')
@@ -160,14 +191,20 @@ def home():
160
  def scrape_api():
161
  data = request.json
162
  raw_urls = data.get('urls', [])
 
 
163
  final_urls = []
164
 
165
  if isinstance(raw_urls, list):
 
166
  raw_string = ",".join(raw_urls)
167
  else:
168
  raw_string = str(raw_urls)
169
 
 
 
170
  cleaned_items = raw_string.replace('\n', ',').split(',')
 
171
  for item in cleaned_items:
172
  clean_link = item.strip()
173
  if clean_link:
@@ -176,16 +213,8 @@ def scrape_api():
176
  if not final_urls:
177
  return jsonify({"error": "No valid URLs provided"}), 400
178
 
179
- print(f"🔥 Processing {len(final_urls)} links...")
180
- results = []
181
- with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
182
- results_iterator = executor.map(scrape_single_url, final_urls)
183
- for res in results_iterator:
184
- if res: results.append(res)
185
-
186
  return jsonify(results)
187
 
188
  if __name__ == '__main__':
189
- # Use ENV Port or default to 10000
190
- port = int(os.environ.get("PORT", 10000))
191
- app.run(host='0.0.0.0', port=port)
 
1
  from flask import Flask, render_template, request, jsonify
2
  from playwright.sync_api import sync_playwright
 
3
  import time
4
  import os
5
  import re
 
6
 
7
  app = Flask(__name__)
8
 
9
  # ---------------- CONFIGURATION ---------------- #
10
  SESSION_FILE = "instagram_session.json"
11
+ # Keep False for successful scraping (Headless block evasion)
12
+ HEADLESS_MODE = True
13
  # ----------------------------------------------- #
14
 
15
  def identify_url_type(url):
16
  if "/reel/" in url: return "REEL"
17
  if "/p/" in url: return "POST"
18
  if url.strip("/") == "https://www.instagram.com": return "SYSTEM"
19
+ if "/explore/" in url or "/direct/" in url or "/stories/" in url: return "SYSTEM"
20
  if "instagram.com/" in url: return "PROFILE"
21
  return "UNKNOWN"
22
 
23
+ def run_scraper(url_list):
24
+ if not os.path.exists(SESSION_FILE):
25
+ return [{"status": "Error", "author": "System", "likes": "N/A", "views": "N/A", "followers": "N/A", "type": "ERROR", "url": "", "msg": "Session file missing"}]
26
+
27
+ results = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
 
29
  with sync_playwright() as p:
30
+ browser = p.chromium.launch(headless=HEADLESS_MODE)
31
+ context = browser.new_context(storage_state=SESSION_FILE)
 
 
 
 
 
 
 
 
 
 
 
32
  page = context.new_page()
33
+
34
+ for url in url_list:
35
+ if not url.strip(): continue
36
+ print(f"🔄 Processing: {url}")
37
+
38
+ data = {
39
+ "url": url,
40
+ "type": identify_url_type(url),
41
+ "author": None,
42
+ "followers": "N/A",
43
+ "likes": "N/A",
44
+ "views": "N/A",
45
+ "status": "Starting"
46
+ }
47
+
48
+ if data["type"] == "SYSTEM" or data["type"] == "UNKNOWN":
49
+ data["status"] = "Skipped"
50
+ results.append(data)
51
+ continue
52
+
53
+ try:
54
+ # --- PATH A: PROFILE ---
55
+ if data["type"] == "PROFILE":
56
+ page.goto(url, wait_until="domcontentloaded", timeout=60000)
57
+ time.sleep(3)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
  try:
59
+ followers_link = page.locator("a[href*='/followers/']").first
60
+ if followers_link.count() > 0:
61
+ title = followers_link.locator("span[title]").first
62
+ if title.count() > 0:
63
+ data["followers"] = title.get_attribute("title")
64
+ else:
65
+ data["followers"] = followers_link.inner_text().split("\n")[0]
66
  except: pass
67
+ data["author"] = url.strip("/").split("/")[-1]
68
+ data["status"] = "Success"
69
+
70
+ # --- PATH B: MEDIA ---
71
+ elif data["type"] in ["REEL", "POST"]:
72
+ if "/reel/" in url:
73
+ shortcode = url.split("/reel/")[1].split("/")[0]
74
+ else:
75
+ shortcode = url.split("/p/")[1].split("/")[0]
76
+
77
+ captured_info = {"username": None}
78
+ def handle_response(response):
79
+ if "instagram.com" in response.url and "json" in response.headers.get("content-type", ""):
80
+ try:
81
+ json_data = response.json()
82
+ def find_user(obj):
83
+ if isinstance(obj, dict):
84
+ if "owner" in obj and "username" in obj["owner"]:
85
+ return obj["owner"]["username"]
86
+ for v in obj.values():
87
+ res = find_user(v)
88
+ if res: return res
89
+ elif isinstance(obj, list):
90
+ for item in obj:
91
+ res = find_user(item)
92
+ if res: return res
93
+ return None
94
+ found = find_user(json_data)
95
+ if found and not captured_info["username"]:
96
+ captured_info["username"] = found
97
+ except: pass
98
+
99
+ page.on("response", handle_response)
100
+ page.goto(url, wait_until="domcontentloaded", timeout=60000)
101
+ time.sleep(4)
102
+ page.remove_listener("response", handle_response)
103
 
 
 
 
 
 
104
  try:
105
+ meta_desc = page.locator('meta[property="og:description"]').get_attribute("content")
106
+ if meta_desc:
107
+ likes_match = re.search(r'^([0-9,.]+[KkMm]?) likes', meta_desc)
108
+ if likes_match: data["likes"] = likes_match.group(1)
109
  except: pass
110
 
111
+ if captured_info["username"]: data["author"] = captured_info["username"]
112
+
113
+ if not data["author"]:
114
  try:
115
+ title = page.title()
116
+ match = re.search(r'\(@(.*?)\)', title)
117
+ if match: data["author"] = match.group(1)
 
 
 
 
 
118
  except: pass
119
 
120
+ if not data["author"]:
121
+ try:
122
+ links = page.locator("a[href*='/reels/']").all()
123
+ for link in links:
124
+ href = link.get_attribute("href")
125
+ if href:
126
+ parts = href.strip("/").split("/")
127
+ if len(parts) >= 2 and parts[-1] == "reels":
128
+ candidate = parts[-2]
129
+ if candidate not in ["reels", "instagram"]:
130
+ data["author"] = candidate
131
+ break
132
+ except: pass
133
 
134
+ if data["author"]:
135
+ is_video = False
136
+ try:
137
+ og_type = page.locator('meta[property="og:type"]').get_attribute("content")
138
+ if og_type and "video" in og_type: is_video = True
139
+ except: pass
140
+ if data["type"] == "REEL": is_video = True
141
+
142
+ if is_video:
143
+ profile_reels_url = f"https://www.instagram.com/{data['author']}/reels/"
144
+ page.goto(profile_reels_url, wait_until="domcontentloaded")
145
+ time.sleep(3)
146
+
147
+ if "/reels/" not in page.url:
148
+ data["views"] = "Hidden (Main Grid)"
149
+ else:
150
+ try:
151
+ target_selector = f"a[href*='{shortcode}']"
152
+ page.wait_for_selector(target_selector, timeout=8000)
153
+ target_card = page.locator(target_selector).first
154
+ card_text = target_card.inner_text()
155
+ for line in card_text.split('\n'):
156
+ if any(char.isdigit() for char in line):
157
+ data["views"] = line.strip()
158
+ break
159
+ except:
160
+ data["views"] = "Not Found"
161
+ else:
162
+ data["views"] = "N/A (Photo)"
163
+
164
+ try:
165
+ fol_link = page.locator("a[href*='/followers/']").first
166
+ if fol_link.count() > 0:
167
+ title = fol_link.locator("span[title]").first
168
+ if title.count() > 0:
169
+ data["followers"] = title.get_attribute("title")
170
+ except: pass
171
+ data["status"] = "Success"
172
+ else:
173
+ data["status"] = "Failed (No Author)"
174
 
175
+ except Exception as e:
176
+ data["status"] = "Error"
177
+ print(f"❌ Error: {e}")
178
+
179
+ print(f"✅ Finished: {data}")
180
+ results.append(data)
181
+
182
  browser.close()
183
+ return results
184
 
185
+ # --- ROUTES ---
186
  @app.route('/')
187
  def home():
188
  return render_template('index.html')
 
191
  def scrape_api():
192
  data = request.json
193
  raw_urls = data.get('urls', [])
194
+
195
+ # Logic to handle both List and String input
196
  final_urls = []
197
 
198
  if isinstance(raw_urls, list):
199
+ # Convert list to comma-separated string first to unify handling
200
  raw_string = ",".join(raw_urls)
201
  else:
202
  raw_string = str(raw_urls)
203
 
204
+ # 1. Replace newlines with commas
205
+ # 2. Split by comma
206
  cleaned_items = raw_string.replace('\n', ',').split(',')
207
+
208
  for item in cleaned_items:
209
  clean_link = item.strip()
210
  if clean_link:
 
213
  if not final_urls:
214
  return jsonify({"error": "No valid URLs provided"}), 400
215
 
216
+ results = run_scraper(final_urls)
 
 
 
 
 
 
217
  return jsonify(results)
218
 
219
  if __name__ == '__main__':
220
+ app.run(debug=True, port=5000, use_reloader=False)