Spaces:
Runtime error
Runtime error
| from flask import Flask, render_template, request, jsonify | |
| from playwright.sync_api import sync_playwright | |
| from concurrent.futures import ThreadPoolExecutor | |
| import time | |
| import os | |
| import re | |
| import json | |
| app = Flask(__name__) | |
| # ---------------- CONFIGURATION ---------------- # | |
| SESSION_FILE = "instagram_session.json" | |
| MAX_WORKERS = 3 | |
| # ----------------------------------------------- # | |
| def identify_url_type(url): | |
| if "/reel/" in url: return "REEL" | |
| if "/p/" in url: return "POST" | |
| if url.strip("/") == "https://www.instagram.com": return "SYSTEM" | |
| if "instagram.com/" in url: return "PROFILE" | |
| return "UNKNOWN" | |
| # 🔥 MANUAL STEALTH: The Key to Headless=True 🔥 | |
| def apply_stealth(page): | |
| """ | |
| Overwrites browser variables so Instagram thinks | |
| this is a real mobile device, not a server. | |
| """ | |
| page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") | |
| page.add_init_script("window.navigator.chrome = { runtime: {} };") | |
| page.add_init_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})") | |
| page.add_init_script("Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})") | |
| # --- HELPER: RECURSIVE SEARCH --- | |
| def safe_find_key(obj, key): | |
| if isinstance(obj, dict): | |
| if key in obj: return obj[key] | |
| for k, v in obj.items(): | |
| res = safe_find_key(v, key) | |
| if res is not None: return res | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| res = safe_find_key(item, key) | |
| if res is not None: return res | |
| return None | |
| def scrape_single_url(url): | |
| if not url or not url.strip(): return None | |
| with sync_playwright() as p: | |
| # 1. LAUNCH BROWSER (Headless=True is REQUIRED for Server) | |
| browser = p.chromium.launch( | |
| headless=True, | |
| args=["--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-dev-shm-usage"] | |
| ) | |
| # 2. CONFIGURE CONTEXT (Fake Android Phone) | |
| context = browser.new_context( | |
| user_agent="Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36", | |
| viewport={"width": 412, "height": 915}, | |
| locale="en-US" | |
| ) | |
| page = context.new_page() | |
| apply_stealth(page) | |
| print(f"⚡ Processing: {url}") | |
| data = { | |
| "url": url, | |
| "type": identify_url_type(url), | |
| "author": None, | |
| "followers": "N/A", | |
| "likes": "N/A", | |
| "views": "N/A", | |
| "status": "Starting" | |
| } | |
| # --- 3. NETWORK SNIFFER --- | |
| captured_data = {"play_count": None, "username": None, "like_count": None} | |
| def handle_response(response): | |
| if "instagram.com" in response.url and ("json" in response.headers.get("content-type", "") or "graphql" in response.url): | |
| try: | |
| json_data = response.json() | |
| if not captured_data["play_count"]: | |
| plays = safe_find_key(json_data, "play_count") or safe_find_key(json_data, "video_view_count") | |
| if plays: captured_data["play_count"] = plays | |
| if not captured_data["like_count"]: | |
| likes = safe_find_key(json_data, "like_count") | |
| if likes: captured_data["like_count"] = likes | |
| if not captured_data["username"]: | |
| user = safe_find_key(json_data, "username") | |
| if user: captured_data["username"] = user | |
| except: pass | |
| page.on("response", handle_response) | |
| try: | |
| # === NAVIGATION === | |
| page.goto(url, wait_until="commit", timeout=45000) | |
| page.wait_for_timeout(5000) | |
| if "Login" in page.title(): | |
| data["status"] = "Failed (Login Block)" | |
| browser.close() | |
| return data | |
| if captured_data["play_count"]: data["views"] = str(captured_data["play_count"]) | |
| if captured_data["like_count"]: data["likes"] = str(captured_data["like_count"]) | |
| if captured_data["username"]: data["author"] = captured_data["username"] | |
| # --- 4. FALLBACK: VISUAL SCRAPING --- | |
| if (data["views"] == "N/A" and data["type"] == "REEL") or not data["author"]: | |
| if not data["author"]: | |
| try: | |
| title = page.title() | |
| match = re.search(r'\(@(.*?)\)', title) | |
| if match: data["author"] = match.group(1) | |
| except: pass | |
| if data["author"]: | |
| if "/reels/" not in page.url: | |
| page.goto(f"https://www.instagram.com/{data['author']}/reels/", wait_until="domcontentloaded") | |
| page.wait_for_timeout(3000) | |
| try: | |
| meta = page.locator('meta[property="og:description"]').get_attribute("content") | |
| if meta: | |
| parts = meta.split("Followers") | |
| if len(parts) > 1: data["followers"] = parts[0].strip().split(" ")[-1] | |
| except: pass | |
| if data["views"] == "N/A": | |
| try: | |
| shortcode = url.split("/reel/")[1].split("/")[0] | |
| card = page.locator(f"a[href*='{shortcode}']").first | |
| if card.count() > 0: | |
| txt = card.inner_text() | |
| for line in txt.split('\n'): | |
| if any(c.isdigit() for c in line): | |
| data["views"] = line.strip() | |
| break | |
| except: pass | |
| data["status"] = "Success" | |
| except Exception as e: | |
| data["status"] = "Error" | |
| print(f"❌ Error: {e}") | |
| browser.close() | |
| return data | |
| def home(): | |
| return render_template('index.html') | |
| def scrape_api(): | |
| data = request.json | |
| raw_urls = data.get('urls', []) | |
| final_urls = [] | |
| if isinstance(raw_urls, list): | |
| raw_string = ",".join(raw_urls) | |
| else: | |
| raw_string = str(raw_urls) | |
| cleaned_items = raw_string.replace('\n', ',').split(',') | |
| for item in cleaned_items: | |
| clean_link = item.strip() | |
| if clean_link: | |
| final_urls.append(clean_link) | |
| if not final_urls: | |
| return jsonify({"error": "No valid URLs provided"}), 400 | |
| print(f"🔥 Processing {len(final_urls)} links...") | |
| results = [] | |
| with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | |
| results_iterator = executor.map(scrape_single_url, final_urls) | |
| for res in results_iterator: | |
| if res: results.append(res) | |
| return jsonify(results) | |
| if __name__ == '__main__': | |
| # Use ENV Port or default to 10000 | |
| port = int(os.environ.get("PORT", 10000)) | |
| app.run(host='0.0.0.0', port=port) |