TestingI / app.py
Baskar2005's picture
Update app.py
a915f4d verified
from flask import Flask, render_template, request, jsonify
from playwright.sync_api import sync_playwright
import time
import os
import re
app = Flask(__name__)
# ---------------- CONFIGURATION ---------------- #
SESSION_FILE = "instagram_session.json"
# Keep False for successful scraping (Headless block evasion)
HEADLESS_MODE = True
# ----------------------------------------------- #
def identify_url_type(url):
if "/reel/" in url: return "REEL"
if "/p/" in url: return "POST"
if url.strip("/") == "https://www.instagram.com": return "SYSTEM"
if "/explore/" in url or "/direct/" in url or "/stories/" in url: return "SYSTEM"
if "instagram.com/" in url: return "PROFILE"
return "UNKNOWN"
def run_scraper(url_list):
if not os.path.exists(SESSION_FILE):
return [{"status": "Error", "author": "System", "likes": "N/A", "views": "N/A", "followers": "N/A", "type": "ERROR", "url": "", "msg": "Session file missing"}]
results = []
with sync_playwright() as p:
browser = p.chromium.launch(headless=HEADLESS_MODE)
context = browser.new_context(storage_state=SESSION_FILE)
page = context.new_page()
for url in url_list:
if not url.strip(): continue
print(f"🔄 Processing: {url}")
data = {
"url": url,
"type": identify_url_type(url),
"author": None,
"followers": "N/A",
"likes": "N/A",
"views": "N/A",
"status": "Starting"
}
if data["type"] == "SYSTEM" or data["type"] == "UNKNOWN":
data["status"] = "Skipped"
results.append(data)
continue
try:
# --- PATH A: PROFILE ---
if data["type"] == "PROFILE":
page.goto(url, wait_until="domcontentloaded", timeout=60000)
time.sleep(3)
try:
followers_link = page.locator("a[href*='/followers/']").first
if followers_link.count() > 0:
title = followers_link.locator("span[title]").first
if title.count() > 0:
data["followers"] = title.get_attribute("title")
else:
data["followers"] = followers_link.inner_text().split("\n")[0]
except: pass
data["author"] = url.strip("/").split("/")[-1]
data["status"] = "Success"
# --- PATH B: MEDIA ---
elif data["type"] in ["REEL", "POST"]:
if "/reel/" in url:
shortcode = url.split("/reel/")[1].split("/")[0]
else:
shortcode = url.split("/p/")[1].split("/")[0]
captured_info = {"username": None}
def handle_response(response):
if "instagram.com" in response.url and "json" in response.headers.get("content-type", ""):
try:
json_data = response.json()
def find_user(obj):
if isinstance(obj, dict):
if "owner" in obj and "username" in obj["owner"]:
return obj["owner"]["username"]
for v in obj.values():
res = find_user(v)
if res: return res
elif isinstance(obj, list):
for item in obj:
res = find_user(item)
if res: return res
return None
found = find_user(json_data)
if found and not captured_info["username"]:
captured_info["username"] = found
except: pass
page.on("response", handle_response)
page.goto(url, wait_until="domcontentloaded", timeout=60000)
time.sleep(4)
page.remove_listener("response", handle_response)
try:
meta_desc = page.locator('meta[property="og:description"]').get_attribute("content")
if meta_desc:
likes_match = re.search(r'^([0-9,.]+[KkMm]?) likes', meta_desc)
if likes_match: data["likes"] = likes_match.group(1)
except: pass
if captured_info["username"]: data["author"] = captured_info["username"]
if not data["author"]:
try:
title = page.title()
match = re.search(r'\(@(.*?)\)', title)
if match: data["author"] = match.group(1)
except: pass
if not data["author"]:
try:
links = page.locator("a[href*='/reels/']").all()
for link in links:
href = link.get_attribute("href")
if href:
parts = href.strip("/").split("/")
if len(parts) >= 2 and parts[-1] == "reels":
candidate = parts[-2]
if candidate not in ["reels", "instagram"]:
data["author"] = candidate
break
except: pass
if data["author"]:
is_video = False
try:
og_type = page.locator('meta[property="og:type"]').get_attribute("content")
if og_type and "video" in og_type: is_video = True
except: pass
if data["type"] == "REEL": is_video = True
if is_video:
profile_reels_url = f"https://www.instagram.com/{data['author']}/reels/"
page.goto(profile_reels_url, wait_until="domcontentloaded")
time.sleep(3)
if "/reels/" not in page.url:
data["views"] = "Hidden (Main Grid)"
else:
try:
target_selector = f"a[href*='{shortcode}']"
page.wait_for_selector(target_selector, timeout=8000)
target_card = page.locator(target_selector).first
card_text = target_card.inner_text()
for line in card_text.split('\n'):
if any(char.isdigit() for char in line):
data["views"] = line.strip()
break
except:
data["views"] = "Not Found"
else:
data["views"] = "N/A (Photo)"
try:
fol_link = page.locator("a[href*='/followers/']").first
if fol_link.count() > 0:
title = fol_link.locator("span[title]").first
if title.count() > 0:
data["followers"] = title.get_attribute("title")
except: pass
data["status"] = "Success"
else:
data["status"] = "Failed (No Author)"
except Exception as e:
data["status"] = "Error"
print(f"❌ Error: {e}")
print(f"✅ Finished: {data}")
results.append(data)
browser.close()
return results
# --- ROUTES ---
@app.route('/')
def home():
return render_template('index.html')
@app.route('/api/scrape', methods=['POST'])
def scrape_api():
data = request.json
raw_urls = data.get('urls', [])
# Logic to handle both List and String input
final_urls = []
if isinstance(raw_urls, list):
# Convert list to comma-separated string first to unify handling
raw_string = ",".join(raw_urls)
else:
raw_string = str(raw_urls)
# 1. Replace newlines with commas
# 2. Split by comma
cleaned_items = raw_string.replace('\n', ',').split(',')
for item in cleaned_items:
clean_link = item.strip()
if clean_link:
final_urls.append(clean_link)
if not final_urls:
return jsonify({"error": "No valid URLs provided"}), 400
results = run_scraper(final_urls)
return jsonify(results)
if __name__ == '__main__':
app.run(debug=True,host=0.0.0.0, port=5000, use_reloader=False)