Baskar2005 commited on
Commit
701bf65
·
verified ·
1 Parent(s): 78a2b61

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +252 -252
app.py CHANGED
@@ -1,252 +1,252 @@
1
- from flask import Flask, render_template, request, jsonify
2
- from playwright.sync_api import sync_playwright
3
- from concurrent.futures import ThreadPoolExecutor
4
- import time
5
- import os
6
- import re
7
-
8
- app = Flask(__name__)
9
-
10
- # ---------------- CONFIGURATION ---------------- #
11
- SESSION_FILE = "instagram_session.json"
12
- HEADLESS_MODE = False # Keep False for stability
13
- MAX_WORKERS = 3 # Reduced to 3 to prevent lagging your PC
14
- # ----------------------------------------------- #
15
-
16
- def identify_url_type(url):
17
- if "/reel/" in url: return "REEL"
18
- if "/p/" in url: return "POST"
19
- if url.strip("/") == "https://www.instagram.com": return "SYSTEM"
20
- if "/explore/" in url or "/direct/" in url or "/stories/" in url: return "SYSTEM"
21
- if "instagram.com/" in url: return "PROFILE"
22
- return "UNKNOWN"
23
-
24
- # --- HELPER: RECURSIVE SEARCH ( The "Main.py" Logic ) ---
25
- def find_username_in_json(obj):
26
- if isinstance(obj, dict):
27
- # Priority 1: Check inside 'owner' object
28
- if "owner" in obj and isinstance(obj["owner"], dict):
29
- if "username" in obj["owner"]:
30
- return obj["owner"]["username"]
31
-
32
- # Priority 2: Check standard user object
33
- if "username" in obj and "is_verified" in obj:
34
- return obj["username"]
35
-
36
- # Recursive Loop
37
- for k, v in obj.items():
38
- if isinstance(v, (dict, list)):
39
- res = find_username_in_json(v)
40
- if res: return res
41
-
42
- elif isinstance(obj, list):
43
- for item in obj:
44
- res = find_username_in_json(item)
45
- if res: return res
46
- return None
47
-
48
- # --- WORKER FUNCTION ---
49
- def scrape_single_url(url):
50
- if not url or not url.strip(): return None
51
-
52
- # New Browser Instance for Thread Safety
53
- with sync_playwright() as p:
54
- browser = p.chromium.launch(headless=HEADLESS_MODE)
55
- if os.path.exists(SESSION_FILE):
56
- context = browser.new_context(storage_state=SESSION_FILE)
57
- else:
58
- context = browser.new_context()
59
-
60
- page = context.new_page()
61
- print(f"⚡ Processing: {url}")
62
-
63
- data = {
64
- "url": url,
65
- "type": identify_url_type(url),
66
- "author": None,
67
- "followers": "N/A",
68
- "likes": "N/A",
69
- "views": "N/A",
70
- "status": "Starting"
71
- }
72
-
73
- # Skip System Links
74
- if data["type"] in ["SYSTEM", "UNKNOWN"]:
75
- data["status"] = "Skipped"
76
- browser.close()
77
- return data
78
-
79
- try:
80
- # === PATH A: PROFILE ===
81
- if data["type"] == "PROFILE":
82
- page.goto(url, wait_until="domcontentloaded", timeout=60000)
83
- time.sleep(3)
84
- try:
85
- followers_link = page.locator("a[href*='/followers/']").first
86
- if followers_link.count() > 0:
87
- title = followers_link.locator("span[title]").first
88
- if title.count() > 0:
89
- data["followers"] = title.get_attribute("title")
90
- else:
91
- data["followers"] = followers_link.inner_text().split("\n")[0]
92
- except: pass
93
- data["author"] = url.strip("/").split("/")[-1]
94
- data["status"] = "Success"
95
-
96
- # === PATH B: MEDIA (REEL/POST) ===
97
- elif data["type"] in ["REEL", "POST"]:
98
- if "/reel/" in url:
99
- shortcode = url.split("/reel/")[1].split("/")[0]
100
- else:
101
- shortcode = url.split("/p/")[1].split("/")[0]
102
-
103
- # 1. NETWORK LISTENER (Restored Robust Logic)
104
- captured_info = {"username": None}
105
-
106
- def handle_response(response):
107
- if "instagram.com" in response.url and "json" in response.headers.get("content-type", ""):
108
- try:
109
- json_data = response.json()
110
- found = find_username_in_json(json_data)
111
- if found and not captured_info["username"]:
112
- captured_info["username"] = found
113
- except: pass
114
-
115
- page.on("response", handle_response)
116
- page.goto(url, wait_until="domcontentloaded", timeout=60000)
117
- time.sleep(4)
118
- page.remove_listener("response", handle_response)
119
-
120
- # 2. GET LIKES (Meta Tag)
121
- try:
122
- meta_desc = page.locator('meta[property="og:description"]').get_attribute("content")
123
- if meta_desc:
124
- likes_match = re.search(r'^([0-9,.]+[KkMm]?) likes', meta_desc)
125
- if likes_match: data["likes"] = likes_match.group(1)
126
- except: pass
127
-
128
- # 3. GET AUTHOR (Network > Title > Pattern)
129
- if captured_info["username"]:
130
- data["author"] = captured_info["username"]
131
-
132
- # Fallback: Title Tag
133
- if not data["author"]:
134
- try:
135
- title = page.title()
136
- # Matches "Username (@handle) on Instagram"
137
- match = re.search(r'\(@(.*?)\)', title)
138
- if match:
139
- data["author"] = match.group(1)
140
- else:
141
- # Matches "Username on Instagram" (Start of title)
142
- match_b = re.search(r'^(.*?)\son\sInstagram', title)
143
- if match_b:
144
- parts = match_b.group(1).split(" ")
145
- if len(parts) == 1: data["author"] = parts[0]
146
- except: pass
147
-
148
- # Fallback: Link Pattern
149
- if not data["author"]:
150
- try:
151
- links = page.locator("a[href*='/reels/']").all()
152
- for link in links:
153
- href = link.get_attribute("href")
154
- if href:
155
- parts = href.strip("/").split("/")
156
- if len(parts) >= 2 and parts[-1] == "reels":
157
- candidate = parts[-2]
158
- if candidate not in ["reels", "instagram"]:
159
- data["author"] = candidate
160
- break
161
- except: pass
162
-
163
- # 4. GET VIEWS (Hop to Profile)
164
- if data["author"]:
165
- is_video = False
166
- # Check if Reel or Video Post
167
- try:
168
- og_type = page.locator('meta[property="og:type"]').get_attribute("content")
169
- if og_type and "video" in og_type: is_video = True
170
- except: pass
171
- if data["type"] == "REEL": is_video = True
172
-
173
- if is_video:
174
- # Hop to Reels Tab
175
- profile_reels_url = f"https://www.instagram.com/{data['author']}/reels/"
176
- page.goto(profile_reels_url, wait_until="domcontentloaded")
177
- time.sleep(3)
178
-
179
- if "/reels/" not in page.url:
180
- data["views"] = "Hidden (Main Grid)"
181
- else:
182
- try:
183
- target_selector = f"a[href*='{shortcode}']"
184
- page.wait_for_selector(target_selector, timeout=8000)
185
- target_card = page.locator(target_selector).first
186
- card_text = target_card.inner_text()
187
- for line in card_text.split('\n'):
188
- if any(char.isdigit() for char in line):
189
- data["views"] = line.strip()
190
- break
191
- except:
192
- data["views"] = "Not Found"
193
- else:
194
- data["views"] = "N/A (Photo)"
195
-
196
- # Followers (Bonus)
197
- try:
198
- fol_link = page.locator("a[href*='/followers/']").first
199
- if fol_link.count() > 0:
200
- title = fol_link.locator("span[title]").first
201
- if title.count() > 0:
202
- data["followers"] = title.get_attribute("title")
203
- except: pass
204
-
205
- data["status"] = "Success"
206
- else:
207
- data["status"] = "Failed (No Author)"
208
-
209
- except Exception as e:
210
- data["status"] = f"Error"
211
- print(f"❌ Error: {e}")
212
-
213
- browser.close()
214
- return data
215
-
216
- # --- ROUTES ---
217
- @app.route('/')
218
- def home():
219
- return render_template('index.html')
220
-
221
- @app.route('/api/scrape', methods=['POST'])
222
- def scrape_api():
223
- data = request.json
224
- raw_urls = data.get('urls', [])
225
-
226
- final_urls = []
227
- if isinstance(raw_urls, list):
228
- raw_string = ",".join(raw_urls)
229
- else:
230
- raw_string = str(raw_urls)
231
-
232
- cleaned_items = raw_string.replace('\n', ',').split(',')
233
- for item in cleaned_items:
234
- clean_link = item.strip()
235
- if clean_link:
236
- final_urls.append(clean_link)
237
-
238
- if not final_urls:
239
- return jsonify({"error": "No valid URLs provided"}), 400
240
-
241
- print(f"🔥 API Request: Processing {len(final_urls)} links...")
242
-
243
- results = []
244
- with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
245
- results_iterator = executor.map(scrape_single_url, final_urls)
246
- for res in results_iterator:
247
- if res: results.append(res)
248
-
249
- return jsonify(results)
250
-
251
- if __name__ == '__main__':
252
- app.run(debug=True, port=7860, use_reloader=False)
 
1
+ from flask import Flask, render_template, request, jsonify
2
+ from playwright.sync_api import sync_playwright
3
+ from concurrent.futures import ThreadPoolExecutor
4
+ import time
5
+ import os
6
+ import re
7
+
8
+ app = Flask(__name__)
9
+
10
+ # ---------------- CONFIGURATION ---------------- #
11
+ SESSION_FILE = "instagram_session.json"
12
+ HEADLESS_MODE = False # Keep False for stability
13
+ MAX_WORKERS = 3 # Reduced to 3 to prevent lagging your PC
14
+ # ----------------------------------------------- #
15
+
16
+ def identify_url_type(url):
17
+ if "/reel/" in url: return "REEL"
18
+ if "/p/" in url: return "POST"
19
+ if url.strip("/") == "https://www.instagram.com": return "SYSTEM"
20
+ if "/explore/" in url or "/direct/" in url or "/stories/" in url: return "SYSTEM"
21
+ if "instagram.com/" in url: return "PROFILE"
22
+ return "UNKNOWN"
23
+
24
+ # --- HELPER: RECURSIVE SEARCH ( The "Main.py" Logic ) ---
25
+ def find_username_in_json(obj):
26
+ if isinstance(obj, dict):
27
+ # Priority 1: Check inside 'owner' object
28
+ if "owner" in obj and isinstance(obj["owner"], dict):
29
+ if "username" in obj["owner"]:
30
+ return obj["owner"]["username"]
31
+
32
+ # Priority 2: Check standard user object
33
+ if "username" in obj and "is_verified" in obj:
34
+ return obj["username"]
35
+
36
+ # Recursive Loop
37
+ for k, v in obj.items():
38
+ if isinstance(v, (dict, list)):
39
+ res = find_username_in_json(v)
40
+ if res: return res
41
+
42
+ elif isinstance(obj, list):
43
+ for item in obj:
44
+ res = find_username_in_json(item)
45
+ if res: return res
46
+ return None
47
+
48
+ # --- WORKER FUNCTION ---
49
+ def scrape_single_url(url):
50
+ if not url or not url.strip(): return None
51
+
52
+ # New Browser Instance for Thread Safety
53
+ with sync_playwright() as p:
54
+ browser = p.chromium.launch(headless=HEADLESS_MODE)
55
+ if os.path.exists(SESSION_FILE):
56
+ context = browser.new_context(storage_state=SESSION_FILE)
57
+ else:
58
+ context = browser.new_context()
59
+
60
+ page = context.new_page()
61
+ print(f"⚡ Processing: {url}")
62
+
63
+ data = {
64
+ "url": url,
65
+ "type": identify_url_type(url),
66
+ "author": None,
67
+ "followers": "N/A",
68
+ "likes": "N/A",
69
+ "views": "N/A",
70
+ "status": "Starting"
71
+ }
72
+
73
+ # Skip System Links
74
+ if data["type"] in ["SYSTEM", "UNKNOWN"]:
75
+ data["status"] = "Skipped"
76
+ browser.close()
77
+ return data
78
+
79
+ try:
80
+ # === PATH A: PROFILE ===
81
+ if data["type"] == "PROFILE":
82
+ page.goto(url, wait_until="domcontentloaded", timeout=60000)
83
+ time.sleep(3)
84
+ try:
85
+ followers_link = page.locator("a[href*='/followers/']").first
86
+ if followers_link.count() > 0:
87
+ title = followers_link.locator("span[title]").first
88
+ if title.count() > 0:
89
+ data["followers"] = title.get_attribute("title")
90
+ else:
91
+ data["followers"] = followers_link.inner_text().split("\n")[0]
92
+ except: pass
93
+ data["author"] = url.strip("/").split("/")[-1]
94
+ data["status"] = "Success"
95
+
96
+ # === PATH B: MEDIA (REEL/POST) ===
97
+ elif data["type"] in ["REEL", "POST"]:
98
+ if "/reel/" in url:
99
+ shortcode = url.split("/reel/")[1].split("/")[0]
100
+ else:
101
+ shortcode = url.split("/p/")[1].split("/")[0]
102
+
103
+ # 1. NETWORK LISTENER (Restored Robust Logic)
104
+ captured_info = {"username": None}
105
+
106
+ def handle_response(response):
107
+ if "instagram.com" in response.url and "json" in response.headers.get("content-type", ""):
108
+ try:
109
+ json_data = response.json()
110
+ found = find_username_in_json(json_data)
111
+ if found and not captured_info["username"]:
112
+ captured_info["username"] = found
113
+ except: pass
114
+
115
+ page.on("response", handle_response)
116
+ page.goto(url, wait_until="domcontentloaded", timeout=60000)
117
+ time.sleep(4)
118
+ page.remove_listener("response", handle_response)
119
+
120
+ # 2. GET LIKES (Meta Tag)
121
+ try:
122
+ meta_desc = page.locator('meta[property="og:description"]').get_attribute("content")
123
+ if meta_desc:
124
+ likes_match = re.search(r'^([0-9,.]+[KkMm]?) likes', meta_desc)
125
+ if likes_match: data["likes"] = likes_match.group(1)
126
+ except: pass
127
+
128
+ # 3. GET AUTHOR (Network > Title > Pattern)
129
+ if captured_info["username"]:
130
+ data["author"] = captured_info["username"]
131
+
132
+ # Fallback: Title Tag
133
+ if not data["author"]:
134
+ try:
135
+ title = page.title()
136
+ # Matches "Username (@handle) on Instagram"
137
+ match = re.search(r'\(@(.*?)\)', title)
138
+ if match:
139
+ data["author"] = match.group(1)
140
+ else:
141
+ # Matches "Username on Instagram" (Start of title)
142
+ match_b = re.search(r'^(.*?)\son\sInstagram', title)
143
+ if match_b:
144
+ parts = match_b.group(1).split(" ")
145
+ if len(parts) == 1: data["author"] = parts[0]
146
+ except: pass
147
+
148
+ # Fallback: Link Pattern
149
+ if not data["author"]:
150
+ try:
151
+ links = page.locator("a[href*='/reels/']").all()
152
+ for link in links:
153
+ href = link.get_attribute("href")
154
+ if href:
155
+ parts = href.strip("/").split("/")
156
+ if len(parts) >= 2 and parts[-1] == "reels":
157
+ candidate = parts[-2]
158
+ if candidate not in ["reels", "instagram"]:
159
+ data["author"] = candidate
160
+ break
161
+ except: pass
162
+
163
+ # 4. GET VIEWS (Hop to Profile)
164
+ if data["author"]:
165
+ is_video = False
166
+ # Check if Reel or Video Post
167
+ try:
168
+ og_type = page.locator('meta[property="og:type"]').get_attribute("content")
169
+ if og_type and "video" in og_type: is_video = True
170
+ except: pass
171
+ if data["type"] == "REEL": is_video = True
172
+
173
+ if is_video:
174
+ # Hop to Reels Tab
175
+ profile_reels_url = f"https://www.instagram.com/{data['author']}/reels/"
176
+ page.goto(profile_reels_url, wait_until="domcontentloaded")
177
+ time.sleep(3)
178
+
179
+ if "/reels/" not in page.url:
180
+ data["views"] = "Hidden (Main Grid)"
181
+ else:
182
+ try:
183
+ target_selector = f"a[href*='{shortcode}']"
184
+ page.wait_for_selector(target_selector, timeout=8000)
185
+ target_card = page.locator(target_selector).first
186
+ card_text = target_card.inner_text()
187
+ for line in card_text.split('\n'):
188
+ if any(char.isdigit() for char in line):
189
+ data["views"] = line.strip()
190
+ break
191
+ except:
192
+ data["views"] = "Not Found"
193
+ else:
194
+ data["views"] = "N/A (Photo)"
195
+
196
+ # Followers (Bonus)
197
+ try:
198
+ fol_link = page.locator("a[href*='/followers/']").first
199
+ if fol_link.count() > 0:
200
+ title = fol_link.locator("span[title]").first
201
+ if title.count() > 0:
202
+ data["followers"] = title.get_attribute("title")
203
+ except: pass
204
+
205
+ data["status"] = "Success"
206
+ else:
207
+ data["status"] = "Failed (No Author)"
208
+
209
+ except Exception as e:
210
+ data["status"] = f"Error"
211
+ print(f"❌ Error: {e}")
212
+
213
+ browser.close()
214
+ return data
215
+
216
+ # --- ROUTES ---
217
+ @app.route('/')
218
+ def home():
219
+ return render_template('index.html')
220
+
221
+ @app.route('/api/scrape', methods=['POST'])
222
+ def scrape_api():
223
+ data = request.json
224
+ raw_urls = data.get('urls', [])
225
+
226
+ final_urls = []
227
+ if isinstance(raw_urls, list):
228
+ raw_string = ",".join(raw_urls)
229
+ else:
230
+ raw_string = str(raw_urls)
231
+
232
+ cleaned_items = raw_string.replace('\n', ',').split(',')
233
+ for item in cleaned_items:
234
+ clean_link = item.strip()
235
+ if clean_link:
236
+ final_urls.append(clean_link)
237
+
238
+ if not final_urls:
239
+ return jsonify({"error": "No valid URLs provided"}), 400
240
+
241
+ print(f"🔥 API Request: Processing {len(final_urls)} links...")
242
+
243
+ results = []
244
+ with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
245
+ results_iterator = executor.map(scrape_single_url, final_urls)
246
+ for res in results_iterator:
247
+ if res: results.append(res)
248
+
249
+ return jsonify(results)
250
+
251
+ if __name__ == '__main__':
252
+ app.run(debug=True, host='0.0.0.0',port=7860, use_reloader=False)