Spaces:
Runtime error
Runtime error
| from flask import Flask, render_template, request, jsonify | |
| from playwright.sync_api import sync_playwright | |
| from concurrent.futures import ThreadPoolExecutor | |
| import time | |
| import os | |
| import re | |
| app = Flask(__name__) | |
| # ---------------- CONFIGURATION ---------------- # | |
| SESSION_FILE = "instagram_session.json" | |
| MAX_WORKERS = 3 | |
| # ----------------------------------------------- # | |
| def identify_url_type(url): | |
| if "/reel/" in url: return "REEL" | |
| if "/p/" in url: return "POST" | |
| if url.strip("/") == "https://www.instagram.com": return "SYSTEM" | |
| if "/explore/" in url or "/direct/" in url or "/stories/" in url: return "SYSTEM" | |
| if "instagram.com/" in url: return "PROFILE" | |
| return "UNKNOWN" | |
| # --- HELPER: MANUAL STEALTH (The Magic Fix) --- | |
| def apply_stealth(page): | |
| """ | |
| Manually overrides browser variables to hide 'Headless' status. | |
| This replaces the broken 'playwright-stealth' library. | |
| """ | |
| # 1. Hide WebDriver Flag | |
| page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") | |
| # 2. Mock Chrome Runtime | |
| page.add_init_script("window.navigator.chrome = { runtime: {} };") | |
| # 3. Mock Plugins (Headless browsers have 0, Humans have many) | |
| page.add_init_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})") | |
| # 4. Mock Languages | |
| page.add_init_script("Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})") | |
| # --- HELPER: RECURSIVE SEARCH --- | |
| def find_username_in_json(obj): | |
| if isinstance(obj, dict): | |
| if "owner" in obj and isinstance(obj["owner"], dict): | |
| if "username" in obj["owner"]: return obj["owner"]["username"] | |
| if "username" in obj and "is_verified" in obj: return obj["username"] | |
| for k, v in obj.items(): | |
| if isinstance(v, (dict, list)): | |
| res = find_username_in_json(v) | |
| if res: return res | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| res = find_username_in_json(item) | |
| if res: return res | |
| return None | |
| def scrape_single_url(url): | |
| if not url or not url.strip(): return None | |
| with sync_playwright() as p: | |
| # 1. LAUNCH BROWSER (Headless=True for Server) | |
| browser = p.chromium.launch( | |
| headless=True, | |
| args=[ | |
| "--disable-blink-features=AutomationControlled", # Standard bot hide | |
| "--no-sandbox", | |
| "--disable-dev-shm-usage" | |
| ] | |
| ) | |
| # 2. CONFIGURE CONTEXT (Windows 10 Fingerprint) | |
| context_args = { | |
| "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", | |
| "viewport": {"width": 1920, "height": 1080}, | |
| "locale": "en-US", | |
| "timezone_id": "America/New_York" | |
| } | |
| # Load Session if available, else Guest | |
| if os.path.exists(SESSION_FILE): | |
| try: | |
| context = browser.new_context(storage_state=SESSION_FILE, **context_args) | |
| except: | |
| print("⚠️ Session Corrupt. Switching to Guest Mode.") | |
| context = browser.new_context(**context_args) | |
| else: | |
| context = browser.new_context(**context_args) | |
| page = context.new_page() | |
| # 3. APPLY MANUAL STEALTH | |
| apply_stealth(page) | |
| print(f"⚡ Processing: {url}") | |
| data = { | |
| "url": url, | |
| "type": identify_url_type(url), | |
| "author": None, | |
| "followers": "N/A", | |
| "likes": "N/A", | |
| "views": "N/A", | |
| "status": "Starting" | |
| } | |
| if data["type"] in ["SYSTEM", "UNKNOWN"]: | |
| data["status"] = "Skipped" | |
| browser.close() | |
| return data | |
| try: | |
| # === NAVIGATION === | |
| page.goto(url, wait_until="commit", timeout=60000) | |
| # Check Login Wall | |
| time.sleep(4) | |
| if "Login" in page.title(): | |
| data["status"] = "Failed (Login Block)" | |
| browser.close() | |
| return data | |
| # === PATH A: PROFILE === | |
| if data["type"] == "PROFILE": | |
| time.sleep(2) | |
| try: | |
| followers_link = page.locator("a[href*='/followers/']").first | |
| if followers_link.count() > 0: | |
| title = followers_link.locator("span[title]").first | |
| if title.count() > 0: | |
| data["followers"] = title.get_attribute("title") | |
| else: | |
| data["followers"] = followers_link.inner_text().split("\n")[0] | |
| except: pass | |
| if not data["author"]: | |
| data["author"] = url.strip("/").split("/")[-1] | |
| data["status"] = "Success" | |
| # === PATH B: MEDIA (REEL/POST) === | |
| elif data["type"] in ["REEL", "POST"]: | |
| if "/reel/" in url: | |
| shortcode = url.split("/reel/")[1].split("/")[0] | |
| else: | |
| shortcode = url.split("/p/")[1].split("/")[0] | |
| captured_info = {"username": None} | |
| def handle_response(response): | |
| if "instagram.com" in response.url and "json" in response.headers.get("content-type", ""): | |
| try: | |
| json_data = response.json() | |
| found = find_username_in_json(json_data) | |
| if found and not captured_info["username"]: | |
| captured_info["username"] = found | |
| except: pass | |
| page.on("response", handle_response) | |
| time.sleep(3) | |
| page.remove_listener("response", handle_response) | |
| # Likes | |
| try: | |
| meta_desc = page.locator('meta[property="og:description"]').get_attribute("content") | |
| if meta_desc: | |
| likes_match = re.search(r'^([0-9,.]+[KkMm]?) likes', meta_desc) | |
| if likes_match: data["likes"] = likes_match.group(1) | |
| except: pass | |
| # Author | |
| if captured_info["username"]: | |
| data["author"] = captured_info["username"] | |
| if not data["author"]: | |
| try: | |
| title = page.title() | |
| match = re.search(r'\(@(.*?)\)', title) | |
| if match: data["author"] = match.group(1) | |
| except: pass | |
| if not data["author"]: | |
| try: | |
| links = page.locator("a[href*='/reels/']").all() | |
| for link in links: | |
| href = link.get_attribute("href") | |
| if href and "/reels/" in href: | |
| parts = href.strip("/").split("/") | |
| if len(parts) >= 2 and parts[-1] == "reels": | |
| data["author"] = parts[-2] | |
| break | |
| except: pass | |
| # Views | |
| if data["author"]: | |
| is_video = (data["type"] == "REEL") | |
| try: | |
| if "video" in page.locator('meta[property="og:type"]').get_attribute("content"): is_video = True | |
| except: pass | |
| if is_video: | |
| page.goto(f"https://www.instagram.com/{data['author']}/reels/", wait_until="domcontentloaded") | |
| time.sleep(3) | |
| if "/reels/" in page.url: | |
| try: | |
| target_card = page.locator(f"a[href*='{shortcode}']").first | |
| card_text = target_card.inner_text() | |
| for line in card_text.split('\n'): | |
| if any(char.isdigit() for char in line): | |
| data["views"] = line.strip() | |
| break | |
| except: | |
| data["views"] = "Not Found" | |
| else: | |
| data["views"] = "N/A (Photo)" | |
| # Followers (Bonus) | |
| try: | |
| fol_link = page.locator("a[href*='/followers/']").first | |
| if fol_link.count() > 0: | |
| t = fol_link.locator("span[title]").first | |
| data["followers"] = t.get_attribute("title") | |
| except: pass | |
| data["status"] = "Success" | |
| else: | |
| data["status"] = "Failed (No Author)" | |
| except Exception as e: | |
| data["status"] = "Error" | |
| print(f"❌ Error: {e}") | |
| browser.close() | |
| return data | |
| def home(): | |
| return render_template('index.html') | |
| def scrape_api(): | |
| data = request.json | |
| raw_urls = data.get('urls', []) | |
| final_urls = [] | |
| if isinstance(raw_urls, list): | |
| raw_string = ",".join(raw_urls) | |
| else: | |
| raw_string = str(raw_urls) | |
| cleaned_items = raw_string.replace('\n', ',').split(',') | |
| for item in cleaned_items: | |
| clean_link = item.strip() | |
| if clean_link: | |
| final_urls.append(clean_link) | |
| if not final_urls: | |
| return jsonify({"error": "No valid URLs provided"}), 400 | |
| print(f"🔥 Processing {len(final_urls)} links...") | |
| results = [] | |
| with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | |
| results_iterator = executor.map(scrape_single_url, final_urls) | |
| for res in results_iterator: | |
| if res: results.append(res) | |
| return jsonify(results) | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860) |