TestingI / app.py
Baskar2005's picture
Update app.py
122e402 verified
raw
history blame
10.3 kB
from flask import Flask, render_template, request, jsonify
from playwright.sync_api import sync_playwright
from concurrent.futures import ThreadPoolExecutor
import time
import os
import re
app = Flask(__name__)
# ---------------- CONFIGURATION ---------------- #
SESSION_FILE = "instagram_session.json"
MAX_WORKERS = 3
# ----------------------------------------------- #
def identify_url_type(url):
if "/reel/" in url: return "REEL"
if "/p/" in url: return "POST"
if url.strip("/") == "https://www.instagram.com": return "SYSTEM"
if "/explore/" in url or "/direct/" in url or "/stories/" in url: return "SYSTEM"
if "instagram.com/" in url: return "PROFILE"
return "UNKNOWN"
# --- HELPER: MANUAL STEALTH (The Magic Fix) ---
def apply_stealth(page):
"""
Manually overrides browser variables to hide 'Headless' status.
This replaces the broken 'playwright-stealth' library.
"""
# 1. Hide WebDriver Flag
page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
# 2. Mock Chrome Runtime
page.add_init_script("window.navigator.chrome = { runtime: {} };")
# 3. Mock Plugins (Headless browsers have 0, Humans have many)
page.add_init_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})")
# 4. Mock Languages
page.add_init_script("Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})")
# --- HELPER: RECURSIVE SEARCH ---
def find_username_in_json(obj):
if isinstance(obj, dict):
if "owner" in obj and isinstance(obj["owner"], dict):
if "username" in obj["owner"]: return obj["owner"]["username"]
if "username" in obj and "is_verified" in obj: return obj["username"]
for k, v in obj.items():
if isinstance(v, (dict, list)):
res = find_username_in_json(v)
if res: return res
elif isinstance(obj, list):
for item in obj:
res = find_username_in_json(item)
if res: return res
return None
def scrape_single_url(url):
if not url or not url.strip(): return None
with sync_playwright() as p:
# 1. LAUNCH BROWSER (Headless=True for Server)
browser = p.chromium.launch(
headless=True,
args=[
"--disable-blink-features=AutomationControlled", # Standard bot hide
"--no-sandbox",
"--disable-dev-shm-usage"
]
)
# 2. CONFIGURE CONTEXT (Windows 10 Fingerprint)
context_args = {
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"viewport": {"width": 1920, "height": 1080},
"locale": "en-US",
"timezone_id": "America/New_York"
}
# Load Session if available, else Guest
if os.path.exists(SESSION_FILE):
try:
context = browser.new_context(storage_state=SESSION_FILE, **context_args)
except:
print("⚠️ Session Corrupt. Switching to Guest Mode.")
context = browser.new_context(**context_args)
else:
context = browser.new_context(**context_args)
page = context.new_page()
# 3. APPLY MANUAL STEALTH
apply_stealth(page)
print(f"⚡ Processing: {url}")
data = {
"url": url,
"type": identify_url_type(url),
"author": None,
"followers": "N/A",
"likes": "N/A",
"views": "N/A",
"status": "Starting"
}
if data["type"] in ["SYSTEM", "UNKNOWN"]:
data["status"] = "Skipped"
browser.close()
return data
try:
# === NAVIGATION ===
page.goto(url, wait_until="commit", timeout=60000)
# Check Login Wall
time.sleep(4)
if "Login" in page.title():
data["status"] = "Failed (Login Block)"
browser.close()
return data
# === PATH A: PROFILE ===
if data["type"] == "PROFILE":
time.sleep(2)
try:
followers_link = page.locator("a[href*='/followers/']").first
if followers_link.count() > 0:
title = followers_link.locator("span[title]").first
if title.count() > 0:
data["followers"] = title.get_attribute("title")
else:
data["followers"] = followers_link.inner_text().split("\n")[0]
except: pass
if not data["author"]:
data["author"] = url.strip("/").split("/")[-1]
data["status"] = "Success"
# === PATH B: MEDIA (REEL/POST) ===
elif data["type"] in ["REEL", "POST"]:
if "/reel/" in url:
shortcode = url.split("/reel/")[1].split("/")[0]
else:
shortcode = url.split("/p/")[1].split("/")[0]
captured_info = {"username": None}
def handle_response(response):
if "instagram.com" in response.url and "json" in response.headers.get("content-type", ""):
try:
json_data = response.json()
found = find_username_in_json(json_data)
if found and not captured_info["username"]:
captured_info["username"] = found
except: pass
page.on("response", handle_response)
time.sleep(3)
page.remove_listener("response", handle_response)
# Likes
try:
meta_desc = page.locator('meta[property="og:description"]').get_attribute("content")
if meta_desc:
likes_match = re.search(r'^([0-9,.]+[KkMm]?) likes', meta_desc)
if likes_match: data["likes"] = likes_match.group(1)
except: pass
# Author
if captured_info["username"]:
data["author"] = captured_info["username"]
if not data["author"]:
try:
title = page.title()
match = re.search(r'\(@(.*?)\)', title)
if match: data["author"] = match.group(1)
except: pass
if not data["author"]:
try:
links = page.locator("a[href*='/reels/']").all()
for link in links:
href = link.get_attribute("href")
if href and "/reels/" in href:
parts = href.strip("/").split("/")
if len(parts) >= 2 and parts[-1] == "reels":
data["author"] = parts[-2]
break
except: pass
# Views
if data["author"]:
is_video = (data["type"] == "REEL")
try:
if "video" in page.locator('meta[property="og:type"]').get_attribute("content"): is_video = True
except: pass
if is_video:
page.goto(f"https://www.instagram.com/{data['author']}/reels/", wait_until="domcontentloaded")
time.sleep(3)
if "/reels/" in page.url:
try:
target_card = page.locator(f"a[href*='{shortcode}']").first
card_text = target_card.inner_text()
for line in card_text.split('\n'):
if any(char.isdigit() for char in line):
data["views"] = line.strip()
break
except:
data["views"] = "Not Found"
else:
data["views"] = "N/A (Photo)"
# Followers (Bonus)
try:
fol_link = page.locator("a[href*='/followers/']").first
if fol_link.count() > 0:
t = fol_link.locator("span[title]").first
data["followers"] = t.get_attribute("title")
except: pass
data["status"] = "Success"
else:
data["status"] = "Failed (No Author)"
except Exception as e:
data["status"] = "Error"
print(f"❌ Error: {e}")
browser.close()
return data
@app.route('/')
def home():
return render_template('index.html')
@app.route('/api/scrape', methods=['POST'])
def scrape_api():
data = request.json
raw_urls = data.get('urls', [])
final_urls = []
if isinstance(raw_urls, list):
raw_string = ",".join(raw_urls)
else:
raw_string = str(raw_urls)
cleaned_items = raw_string.replace('\n', ',').split(',')
for item in cleaned_items:
clean_link = item.strip()
if clean_link:
final_urls.append(clean_link)
if not final_urls:
return jsonify({"error": "No valid URLs provided"}), 400
print(f"🔥 Processing {len(final_urls)} links...")
results = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
results_iterator = executor.map(scrape_single_url, final_urls)
for res in results_iterator:
if res: results.append(res)
return jsonify(results)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)