|
|
from selenium import webdriver
|
|
|
from selenium.webdriver.common.by import By
|
|
|
from selenium.webdriver.chrome.service import Service
|
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
from selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementException
|
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
|
import time
|
|
|
import json
|
|
|
import logging
|
|
|
import re
|
|
|
from datetime import datetime
|
|
|
from contextlib import contextmanager
|
|
|
from typing import List, Dict, Any, Optional
|
|
|
from bs4 import BeautifulSoup
|
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FacebookScraper:
|
|
|
"""
|
|
|
Enhanced Facebook Ads Library scraper with improved robustness and features.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, headless: bool = True, timeout: int = 10, use_proxy: bool = False, proxy: str = None):
|
|
|
"""
|
|
|
Initialize the Facebook scraper with configurable options.
|
|
|
|
|
|
Args:
|
|
|
headless: Whether to run the browser in headless mode
|
|
|
timeout: Default timeout for waiting operations in seconds
|
|
|
use_proxy: Whether to use a proxy
|
|
|
proxy: Proxy server address (e.g., "http://user:pass@ip:port")
|
|
|
"""
|
|
|
self.driver = None
|
|
|
self.headless = headless
|
|
|
self.timeout = timeout
|
|
|
self.use_proxy = use_proxy
|
|
|
self.proxy = proxy
|
|
|
|
|
|
def _setup_driver(self):
|
|
|
"""Configure and initialize the Chrome WebDriver with optimal settings."""
|
|
|
options = webdriver.ChromeOptions()
|
|
|
|
|
|
if self.headless:
|
|
|
options.add_argument("--headless")
|
|
|
|
|
|
|
|
|
options.add_argument("--no-sandbox")
|
|
|
options.add_argument("--disable-dev-shm-usage")
|
|
|
options.add_argument("--disable-gpu")
|
|
|
options.add_argument("--window-size=1920,1080")
|
|
|
|
|
|
|
|
|
options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36")
|
|
|
|
|
|
|
|
|
if self.use_proxy and self.proxy:
|
|
|
options.add_argument(f'--proxy-server={self.proxy}')
|
|
|
|
|
|
|
|
|
options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
|
|
options.add_experimental_option('useAutomationExtension', False)
|
|
|
|
|
|
return webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
|
|
|
|
|
|
@contextmanager
|
|
|
def _get_driver(self):
|
|
|
"""Context manager for browser session to ensure proper cleanup."""
|
|
|
try:
|
|
|
self.driver = self._setup_driver()
|
|
|
yield self.driver
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error initializing WebDriver: {e}")
|
|
|
raise
|
|
|
finally:
|
|
|
if self.driver:
|
|
|
self.driver.quit()
|
|
|
|
|
|
def _wait_for_element(self, driver, selector: str, by: By = By.CSS_SELECTOR, timeout: int = None) -> Any:
|
|
|
"""
|
|
|
Wait for an element to be present and return it.
|
|
|
|
|
|
Args:
|
|
|
driver: WebDriver instance
|
|
|
selector: Element selector
|
|
|
by: Selector type (CSS, XPATH, etc.)
|
|
|
timeout: Wait timeout in seconds
|
|
|
|
|
|
Returns:
|
|
|
The found web element
|
|
|
"""
|
|
|
if timeout is None:
|
|
|
timeout = self.timeout
|
|
|
|
|
|
wait = WebDriverWait(driver, timeout)
|
|
|
return wait.until(EC.presence_of_element_located((by, selector)))
|
|
|
|
|
|
def _wait_for_elements(self, driver, selector: str, by: By = By.CSS_SELECTOR, timeout: int = None) -> List[Any]:
|
|
|
"""
|
|
|
Wait for elements to be present and return them.
|
|
|
|
|
|
Args:
|
|
|
driver: WebDriver instance
|
|
|
selector: Elements selector
|
|
|
by: Selector type (CSS, XPATH, etc.)
|
|
|
timeout: Wait timeout in seconds
|
|
|
|
|
|
Returns:
|
|
|
List of found web elements
|
|
|
"""
|
|
|
if timeout is None:
|
|
|
timeout = self.timeout
|
|
|
|
|
|
wait = WebDriverWait(driver, timeout)
|
|
|
return wait.until(EC.presence_of_all_elements_located((by, selector)))
|
|
|
|
|
|
def _scroll_to_load_more(self, driver, scroll_count: int = 5, scroll_pause: float = 2.0):
|
|
|
"""
|
|
|
Scroll down the page to load more content.
|
|
|
|
|
|
Args:
|
|
|
driver: WebDriver instance
|
|
|
scroll_count: Number of times to scroll
|
|
|
scroll_pause: Pause between scrolls in seconds
|
|
|
"""
|
|
|
for i in range(scroll_count):
|
|
|
|
|
|
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
|
|
|
|
|
|
|
|
|
time.sleep(scroll_pause)
|
|
|
|
|
|
|
|
|
logger.debug(f"Completed scroll {i+1}/{scroll_count}")
|
|
|
|
|
|
def _extract_ad_details(self, ad_element) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Extract detailed information from an ad element.
|
|
|
|
|
|
Args:
|
|
|
ad_element: WebElement containing the ad
|
|
|
|
|
|
Returns:
|
|
|
Dictionary with ad details
|
|
|
"""
|
|
|
ad_data = {
|
|
|
"scrape_time": datetime.now().isoformat(),
|
|
|
"platform": "facebook",
|
|
|
"raw_text": ad_element.text
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
|
|
|
advertiser_elem = ad_element.find_elements(By.CSS_SELECTOR, "span[dir='auto']")
|
|
|
if advertiser_elem:
|
|
|
ad_data["advertiser"] = advertiser_elem[0].text
|
|
|
|
|
|
|
|
|
content_elem = ad_element.find_elements(By.CSS_SELECTOR, "div[dir='auto']")
|
|
|
if content_elem:
|
|
|
ad_data["content"] = "\n".join([elem.text for elem in content_elem])
|
|
|
|
|
|
|
|
|
img_elems = ad_element.find_elements(By.TAG_NAME, "img")
|
|
|
if img_elems:
|
|
|
ad_data["images"] = [img.get_attribute("src") for img in img_elems if img.get_attribute("src")]
|
|
|
|
|
|
|
|
|
link_elems = ad_element.find_elements(By.TAG_NAME, "a")
|
|
|
if link_elems:
|
|
|
ad_data["links"] = [link.get_attribute("href") for link in link_elems if link.get_attribute("href")]
|
|
|
|
|
|
|
|
|
if "links" in ad_data and ad_data["links"]:
|
|
|
for link in ad_data["links"]:
|
|
|
id_match = re.search(r'id=(\d+)', link)
|
|
|
if id_match:
|
|
|
ad_data["ad_id"] = id_match.group(1)
|
|
|
break
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Error extracting ad details: {e}")
|
|
|
|
|
|
return ad_data
|
|
|
|
|
|
def scrape_ads(self, search_query: str, num_scrolls: int = 5, country_code: str = "ALL") -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Scrape ads from Facebook Ads Library based on a search query.
|
|
|
|
|
|
Args:
|
|
|
search_query: Keyword to search for
|
|
|
num_scrolls: Number of times to scroll to load more ads
|
|
|
country_code: Country code filter (e.g., "US", "GB", "ALL")
|
|
|
|
|
|
Returns:
|
|
|
List of dictionaries containing ad information
|
|
|
"""
|
|
|
with self._get_driver() as driver:
|
|
|
try:
|
|
|
|
|
|
url = f"https://www.facebook.com/ads/library/?active_status=all&ad_type=all&country={country_code}&q={search_query}&search_type=keyword"
|
|
|
logger.info(f"Accessing Facebook Ads Library: {url}")
|
|
|
|
|
|
|
|
|
driver.get(url)
|
|
|
|
|
|
|
|
|
try:
|
|
|
self._wait_for_element(driver, "div[role='main']")
|
|
|
except TimeoutException:
|
|
|
logger.warning("Timeout waiting for main content to load")
|
|
|
|
|
|
|
|
|
self._scroll_to_load_more(driver, num_scrolls)
|
|
|
|
|
|
|
|
|
|
|
|
selectors = [
|
|
|
"div.x1yztbdb",
|
|
|
"div[role='article']",
|
|
|
"div.x1iorvi4"
|
|
|
]
|
|
|
|
|
|
ad_elements = []
|
|
|
for selector in selectors:
|
|
|
try:
|
|
|
elements = driver.find_elements(By.CSS_SELECTOR, selector)
|
|
|
if elements:
|
|
|
ad_elements = elements
|
|
|
logger.info(f"Found {len(elements)} ads using selector: {selector}")
|
|
|
break
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Selector {selector} failed: {e}")
|
|
|
|
|
|
if not ad_elements:
|
|
|
logger.warning("No ad elements found with any selector")
|
|
|
return []
|
|
|
|
|
|
|
|
|
ads_data = []
|
|
|
for i, ad_element in enumerate(ad_elements):
|
|
|
try:
|
|
|
ad_data = self._extract_ad_details(ad_element)
|
|
|
ad_data["position"] = i + 1
|
|
|
ad_data["search_query"] = search_query
|
|
|
ads_data.append(ad_data)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing ad {i+1}: {e}")
|
|
|
|
|
|
logger.info(f"Successfully scraped {len(ads_data)} ads")
|
|
|
return ads_data
|
|
|
|
|
|
except (TimeoutException, WebDriverException) as e:
|
|
|
logger.error(f"Error during scraping: {e}")
|
|
|
return []
|
|
|
|
|
|
def scrape_advertiser_details(self, advertiser_id: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Scrape details about a specific advertiser.
|
|
|
|
|
|
Args:
|
|
|
advertiser_id: Facebook ID of the advertiser
|
|
|
|
|
|
Returns:
|
|
|
Dictionary with advertiser information
|
|
|
"""
|
|
|
with self._get_driver() as driver:
|
|
|
try:
|
|
|
url = f"https://www.facebook.com/ads/library/?active_status=all&ad_type=all&country=ALL&view_all_page_id={advertiser_id}"
|
|
|
logger.info(f"Accessing advertiser page: {url}")
|
|
|
|
|
|
driver.get(url)
|
|
|
|
|
|
|
|
|
try:
|
|
|
self._wait_for_element(driver, "div[role='main']")
|
|
|
except TimeoutException:
|
|
|
logger.warning("Timeout waiting for advertiser page to load")
|
|
|
|
|
|
|
|
|
advertiser_data = {
|
|
|
"id": advertiser_id,
|
|
|
"scrape_time": datetime.now().isoformat()
|
|
|
}
|
|
|
|
|
|
|
|
|
try:
|
|
|
name_elem = self._wait_for_element(driver, "div[role='main'] h1", timeout=5)
|
|
|
advertiser_data["name"] = name_elem.text
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
|
|
|
try:
|
|
|
count_text = driver.find_element(By.XPATH, "//div[contains(text(), 'ads')]").text
|
|
|
count_match = re.search(r'(\d+)\s+ads', count_text)
|
|
|
if count_match:
|
|
|
advertiser_data["ad_count"] = int(count_match.group(1))
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
|
|
|
self._scroll_to_load_more(driver, 3)
|
|
|
|
|
|
|
|
|
ad_elements = driver.find_elements(By.CSS_SELECTOR, "div.x1yztbdb")
|
|
|
sample_ads = []
|
|
|
|
|
|
for i, ad_element in enumerate(ad_elements[:5]):
|
|
|
try:
|
|
|
ad_data = self._extract_ad_details(ad_element)
|
|
|
sample_ads.append(ad_data)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing sample ad {i+1}: {e}")
|
|
|
|
|
|
advertiser_data["sample_ads"] = sample_ads
|
|
|
advertiser_data["sample_ad_count"] = len(sample_ads)
|
|
|
|
|
|
return advertiser_data
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error scraping advertiser details: {e}")
|
|
|
return {"id": advertiser_id, "error": str(e)}
|
|
|
|
|
|
def scrape_ads_by_topic(self, topic: str, num_scrolls: int = 5, country_code: str = "ALL") -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Scrape ads related to a specific topic.
|
|
|
|
|
|
Args:
|
|
|
topic: Topic to search for (e.g., "politics", "health", "finance")
|
|
|
num_scrolls: Number of times to scroll to load more ads
|
|
|
country_code: Country code filter
|
|
|
|
|
|
Returns:
|
|
|
List of dictionaries containing ad information
|
|
|
"""
|
|
|
|
|
|
return self.scrape_ads(topic, num_scrolls, country_code)
|
|
|
|
|
|
def scrape_ads_by_page(self, page_name: str, num_scrolls: int = 5) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Scrape ads from a specific Facebook page.
|
|
|
|
|
|
Args:
|
|
|
page_name: Name of the Facebook page
|
|
|
num_scrolls: Number of times to scroll to load more ads
|
|
|
|
|
|
Returns:
|
|
|
List of dictionaries containing ad information
|
|
|
"""
|
|
|
with self._get_driver() as driver:
|
|
|
try:
|
|
|
|
|
|
search_url = f"https://www.facebook.com/ads/library/?active_status=all&ad_type=all&country=ALL&q={page_name}&search_type=page"
|
|
|
logger.info(f"Searching for page: {search_url}")
|
|
|
|
|
|
driver.get(search_url)
|
|
|
|
|
|
|
|
|
try:
|
|
|
self._wait_for_element(driver, "div[role='main']")
|
|
|
except TimeoutException:
|
|
|
logger.warning("Timeout waiting for page search results")
|
|
|
|
|
|
|
|
|
try:
|
|
|
page_links = driver.find_elements(By.CSS_SELECTOR, "a[href*='view_all_page_id=']")
|
|
|
if page_links:
|
|
|
|
|
|
href = page_links[0].get_attribute("href")
|
|
|
page_id_match = re.search(r'view_all_page_id=(\d+)', href)
|
|
|
|
|
|
if page_id_match:
|
|
|
page_id = page_id_match.group(1)
|
|
|
logger.info(f"Found page ID: {page_id}")
|
|
|
|
|
|
|
|
|
page_url = f"https://www.facebook.com/ads/library/?active_status=all&ad_type=all&country=ALL&view_all_page_id={page_id}"
|
|
|
driver.get(page_url)
|
|
|
|
|
|
|
|
|
try:
|
|
|
self._wait_for_element(driver, "div[role='main']")
|
|
|
except TimeoutException:
|
|
|
logger.warning("Timeout waiting for page ads to load")
|
|
|
|
|
|
|
|
|
self._scroll_to_load_more(driver, num_scrolls)
|
|
|
|
|
|
|
|
|
ad_elements = driver.find_elements(By.CSS_SELECTOR, "div.x1yztbdb")
|
|
|
|
|
|
|
|
|
ads_data = []
|
|
|
for i, ad_element in enumerate(ad_elements):
|
|
|
try:
|
|
|
ad_data = self._extract_ad_details(ad_element)
|
|
|
ad_data["position"] = i + 1
|
|
|
ad_data["page_name"] = page_name
|
|
|
ad_data["page_id"] = page_id
|
|
|
ads_data.append(ad_data)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing ad {i+1}: {e}")
|
|
|
|
|
|
logger.info(f"Successfully scraped {len(ads_data)} ads from page {page_name}")
|
|
|
return ads_data
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error finding page: {e}")
|
|
|
|
|
|
|
|
|
logger.warning(f"Could not find page {page_name}, falling back to keyword search")
|
|
|
return self.scrape_ads(page_name, num_scrolls)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error during page scraping: {e}")
|
|
|
return [] |