Spaces:
Sleeping
Sleeping
File size: 6,654 Bytes
5c3dc0d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import streamlit as st
import requests
import json
import re
import time
import random
from datetime import datetime
class InstagramScraperV2:
def __init__(self):
self.session = requests.Session()
self.setup_session()
def setup_session(self):
"""Setup session with better anti-detection measures"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
]
self.session.headers.update({
'User-Agent': random.choice(user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
})
def get_page_with_retry(self, url, max_retries=3):
"""Get page with retry mechanism"""
for attempt in range(max_retries):
try:
time.sleep(random.uniform(2, 4))
response = self.session.get(url, timeout=20)
response.raise_for_status()
return response.text
except Exception as e:
st.warning(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt == max_retries - 1:
raise
return None
def extract_instagram_data(self, url):
"""Extract data from Instagram with improved error handling"""
scraped_data = {
"url": url,
"timestamp": datetime.now().isoformat(),
"platform": "instagram",
"images": [],
"posts": [],
"profile_info": {},
"errors": []
}
try:
page_text = self.get_page_with_retry(url)
if not page_text:
scraped_data["errors"].append("Failed to load Instagram page")
return scraped_data
# Extract images
scraped_data["images"] = self.extract_images_from_page(page_text)
# Extract profile info
scraped_data["profile_info"] = self.extract_profile_info(page_text)
# Extract posts
scraped_data["posts"] = self.extract_recent_posts(page_text)
except Exception as e:
scraped_data["errors"].append(f"Instagram scraping error: {str(e)}")
return scraped_data
def extract_images_from_page(self, page_text):
"""Extract images with improved patterns"""
images = []
try:
# Enhanced patterns for Instagram images
patterns = [
r'https://scontent[^"]*\.jpg[^"]*',
r'https://scontent[^"]*\.jpeg[^"]*',
r'https://scontent[^"]*\.png[^"]*',
r'"display_url":"([^"]+)"',
r'"display_src":"([^"]+)"'
]
found_images = set()
for pattern in patterns:
matches = re.findall(pattern, page_text)
for match in matches:
if match and ('scontent' in match.lower() or 'instagram' in match.lower()):
clean_url = match.replace('\\u0026', '&').replace('\\/', '/')
found_images.add(clean_url)
for i, img_url in enumerate(list(found_images)):
images.append({
"src": img_url,
"alt": f"Instagram image {i+1}",
"title": f"Instagram image {i+1}",
"width": "",
"height": ""
})
except Exception as e:
st.error(f"Failed to extract images: {str(e)}")
return images
def extract_profile_info(self, page_text):
"""Extract profile information"""
profile_info = {
"username": "",
"display_name": "",
"bio": "",
"followers": "",
"following": "",
"posts_count": ""
}
try:
# Extract username from title
title_match = re.search(r'<title>([^<]+)</title>', page_text)
if title_match:
title = title_match.group(1)
if '(' in title and ')' in title:
username = title.split('(')[1].split(')')[0]
profile_info["username"] = username
# Look for JSON data
json_patterns = [
r'"username":"([^"]+)"',
r'"full_name":"([^"]+)"',
r'"biography":"([^"]+)"'
]
for pattern in json_patterns:
matches = re.findall(pattern, page_text)
if matches:
if "username" in pattern:
profile_info["username"] = matches[0]
elif "full_name" in pattern:
profile_info["display_name"] = matches[0]
elif "biography" in pattern:
profile_info["bio"] = matches[0]
except Exception as e:
profile_info["error"] = f"Failed to extract profile info: {str(e)}"
return profile_info
def extract_recent_posts(self, page_text):
"""Extract recent posts"""
posts = []
try:
post_patterns = [
r'"shortcode":"([^"]+)"',
r'/p/([^/"]+)'
]
found_posts = set()
for pattern in post_patterns:
matches = re.findall(pattern, page_text)
for match in matches:
if match:
found_posts.add(match)
for i, post_code in enumerate(list(found_posts)[:10]):
posts.append({
"shortcode": post_code,
"url": f"https://www.instagram.com/p/{post_code}/",
"index": i + 1
})
except Exception as e:
st.error(f"Failed to extract posts: {str(e)}")
return posts
# Global instance
instagram_scraper_v2 = InstagramScraperV2() |