Spaces:
Runtime error
Runtime error
| from flask import Flask, render_template, request, jsonify | |
| from playwright.sync_api import sync_playwright | |
| import time | |
| import os | |
| import re | |
| app = Flask(__name__) | |
| # ---------------- CONFIGURATION ---------------- # | |
| SESSION_FILE = "instagram_session.json" | |
| # Keep False for successful scraping (Headless block evasion) | |
| HEADLESS_MODE = True | |
| # ----------------------------------------------- # | |
| def identify_url_type(url): | |
| if "/reel/" in url: return "REEL" | |
| if "/p/" in url: return "POST" | |
| if url.strip("/") == "https://www.instagram.com": return "SYSTEM" | |
| if "/explore/" in url or "/direct/" in url or "/stories/" in url: return "SYSTEM" | |
| if "instagram.com/" in url: return "PROFILE" | |
| return "UNKNOWN" | |
| def run_scraper(url_list): | |
| if not os.path.exists(SESSION_FILE): | |
| return [{"status": "Error", "author": "System", "likes": "N/A", "views": "N/A", "followers": "N/A", "type": "ERROR", "url": "", "msg": "Session file missing"}] | |
| results = [] | |
| with sync_playwright() as p: | |
| browser = p.chromium.launch(headless=HEADLESS_MODE) | |
| context = browser.new_context(storage_state=SESSION_FILE) | |
| page = context.new_page() | |
| for url in url_list: | |
| if not url.strip(): continue | |
| print(f"🔄 Processing: {url}") | |
| data = { | |
| "url": url, | |
| "type": identify_url_type(url), | |
| "author": None, | |
| "followers": "N/A", | |
| "likes": "N/A", | |
| "views": "N/A", | |
| "status": "Starting" | |
| } | |
| if data["type"] == "SYSTEM" or data["type"] == "UNKNOWN": | |
| data["status"] = "Skipped" | |
| results.append(data) | |
| continue | |
| try: | |
| # --- PATH A: PROFILE --- | |
| if data["type"] == "PROFILE": | |
| page.goto(url, wait_until="domcontentloaded", timeout=60000) | |
| time.sleep(3) | |
| try: | |
| followers_link = page.locator("a[href*='/followers/']").first | |
| if followers_link.count() > 0: | |
| title = followers_link.locator("span[title]").first | |
| if title.count() > 0: | |
| data["followers"] = title.get_attribute("title") | |
| else: | |
| data["followers"] = followers_link.inner_text().split("\n")[0] | |
| except: pass | |
| data["author"] = url.strip("/").split("/")[-1] | |
| data["status"] = "Success" | |
| # --- PATH B: MEDIA --- | |
| elif data["type"] in ["REEL", "POST"]: | |
| if "/reel/" in url: | |
| shortcode = url.split("/reel/")[1].split("/")[0] | |
| else: | |
| shortcode = url.split("/p/")[1].split("/")[0] | |
| captured_info = {"username": None} | |
| def handle_response(response): | |
| if "instagram.com" in response.url and "json" in response.headers.get("content-type", ""): | |
| try: | |
| json_data = response.json() | |
| def find_user(obj): | |
| if isinstance(obj, dict): | |
| if "owner" in obj and "username" in obj["owner"]: | |
| return obj["owner"]["username"] | |
| for v in obj.values(): | |
| res = find_user(v) | |
| if res: return res | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| res = find_user(item) | |
| if res: return res | |
| return None | |
| found = find_user(json_data) | |
| if found and not captured_info["username"]: | |
| captured_info["username"] = found | |
| except: pass | |
| page.on("response", handle_response) | |
| page.goto(url, wait_until="domcontentloaded", timeout=60000) | |
| time.sleep(4) | |
| page.remove_listener("response", handle_response) | |
| try: | |
| meta_desc = page.locator('meta[property="og:description"]').get_attribute("content") | |
| if meta_desc: | |
| likes_match = re.search(r'^([0-9,.]+[KkMm]?) likes', meta_desc) | |
| if likes_match: data["likes"] = likes_match.group(1) | |
| except: pass | |
| if captured_info["username"]: data["author"] = captured_info["username"] | |
| if not data["author"]: | |
| try: | |
| title = page.title() | |
| match = re.search(r'\(@(.*?)\)', title) | |
| if match: data["author"] = match.group(1) | |
| except: pass | |
| if not data["author"]: | |
| try: | |
| links = page.locator("a[href*='/reels/']").all() | |
| for link in links: | |
| href = link.get_attribute("href") | |
| if href: | |
| parts = href.strip("/").split("/") | |
| if len(parts) >= 2 and parts[-1] == "reels": | |
| candidate = parts[-2] | |
| if candidate not in ["reels", "instagram"]: | |
| data["author"] = candidate | |
| break | |
| except: pass | |
| if data["author"]: | |
| is_video = False | |
| try: | |
| og_type = page.locator('meta[property="og:type"]').get_attribute("content") | |
| if og_type and "video" in og_type: is_video = True | |
| except: pass | |
| if data["type"] == "REEL": is_video = True | |
| if is_video: | |
| profile_reels_url = f"https://www.instagram.com/{data['author']}/reels/" | |
| page.goto(profile_reels_url, wait_until="domcontentloaded") | |
| time.sleep(3) | |
| if "/reels/" not in page.url: | |
| data["views"] = "Hidden (Main Grid)" | |
| else: | |
| try: | |
| target_selector = f"a[href*='{shortcode}']" | |
| page.wait_for_selector(target_selector, timeout=8000) | |
| target_card = page.locator(target_selector).first | |
| card_text = target_card.inner_text() | |
| for line in card_text.split('\n'): | |
| if any(char.isdigit() for char in line): | |
| data["views"] = line.strip() | |
| break | |
| except: | |
| data["views"] = "Not Found" | |
| else: | |
| data["views"] = "N/A (Photo)" | |
| try: | |
| fol_link = page.locator("a[href*='/followers/']").first | |
| if fol_link.count() > 0: | |
| title = fol_link.locator("span[title]").first | |
| if title.count() > 0: | |
| data["followers"] = title.get_attribute("title") | |
| except: pass | |
| data["status"] = "Success" | |
| else: | |
| data["status"] = "Failed (No Author)" | |
| except Exception as e: | |
| data["status"] = "Error" | |
| print(f"❌ Error: {e}") | |
| print(f"✅ Finished: {data}") | |
| results.append(data) | |
| browser.close() | |
| return results | |
| # --- ROUTES --- | |
| def home(): | |
| return render_template('index.html') | |
| def scrape_api(): | |
| data = request.json | |
| raw_urls = data.get('urls', []) | |
| # Logic to handle both List and String input | |
| final_urls = [] | |
| if isinstance(raw_urls, list): | |
| # Convert list to comma-separated string first to unify handling | |
| raw_string = ",".join(raw_urls) | |
| else: | |
| raw_string = str(raw_urls) | |
| # 1. Replace newlines with commas | |
| # 2. Split by comma | |
| cleaned_items = raw_string.replace('\n', ',').split(',') | |
| for item in cleaned_items: | |
| clean_link = item.strip() | |
| if clean_link: | |
| final_urls.append(clean_link) | |
| if not final_urls: | |
| return jsonify({"error": "No valid URLs provided"}), 400 | |
| results = run_scraper(final_urls) | |
| return jsonify(results) | |
| if __name__ == '__main__': | |
| app.run(debug=True,host=0.0.0.0, port=5000, use_reloader=False) |