selenium-scraper / clickloom_scrape.py
apexherbert200's picture
First commit
f2c46e7
import chromedriver_autoinstaller
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException, WebDriverException
import json
import time
import threading
from typing import Optional, Dict, List
import atexit
from contextlib import contextmanager
# Install chromedriver once at module level
chromedriver_autoinstaller.install()
class DriverPool:
"""Thread-safe driver pool for reusing Chrome instances"""
def __init__(self, max_drivers: int = 3):
self.max_drivers = max_drivers
self.available_drivers = []
self.in_use_drivers = set()
self.lock = threading.Lock()
self._closed = False
# Register cleanup on exit
atexit.register(self.cleanup)
def _create_driver(self) -> webdriver.Chrome:
"""Create a new optimized Chrome driver"""
options = Options()
# Performance optimizations (Windows-compatible)
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--disable-extensions")
options.add_argument("--disable-plugins")
options.add_argument("--disable-images") # Don't load images for faster loading
options.add_argument("--disable-background-timer-throttling")
options.add_argument("--disable-backgrounding-occluded-windows")
options.add_argument("--disable-renderer-backgrounding")
# Set page load strategy to 'eager' for faster loading
options.page_load_strategy = 'eager'
# Set timeouts
options.add_argument("--timeout=10000")
# For Linux environments (skip on Windows)
import platform
if platform.system() == "Linux":
try:
options.binary_location = "/usr/bin/chromium-browser"
except:
pass # Use default Chrome location
driver = webdriver.Chrome(options=options)
# Set timeouts
driver.set_page_load_timeout(10) # 10 seconds max for page load
driver.implicitly_wait(2) # 2 seconds max for element finding
return driver
@contextmanager
def get_driver(self):
"""Context manager to get and return a driver"""
if self._closed:
raise RuntimeError("Driver pool is closed")
driver = None
try:
with self.lock:
if self.available_drivers:
driver = self.available_drivers.pop()
elif len(self.in_use_drivers) < self.max_drivers:
driver = self._create_driver()
else:
# Wait for a driver to become available
pass
if driver is None:
# Create a temporary driver if pool is full
driver = self._create_driver()
temp_driver = True
else:
temp_driver = False
with self.lock:
self.in_use_drivers.add(driver)
yield driver
finally:
if driver:
try:
# Clear any alerts or popups
driver.execute_script("window.stop();")
except:
pass
if temp_driver:
# Close temporary driver
try:
driver.quit()
except:
pass
else:
# Return driver to pool
with self.lock:
self.in_use_drivers.discard(driver)
if not self._closed and len(self.available_drivers) < self.max_drivers:
self.available_drivers.append(driver)
else:
try:
driver.quit()
except:
pass
def cleanup(self):
"""Clean up all drivers"""
self._closed = True
with self.lock:
for driver in self.available_drivers + list(self.in_use_drivers):
try:
driver.quit()
except:
pass
self.available_drivers.clear()
self.in_use_drivers.clear()
# Global driver pool instance
_driver_pool = DriverPool()
def safe_get_attributes_bulk(driver, tag_name: str, attr: str) -> List[str]:
"""Efficiently get attributes from multiple elements"""
try:
# Use JavaScript for faster bulk attribute extraction
script = f"""
var elements = document.getElementsByTagName('{tag_name}');
var results = [];
for (var i = 0; i < elements.length; i++) {{
var attr_value = elements[i].getAttribute('{attr}');
if (attr_value) {{
results.push(attr_value);
}}
}}
return results;
"""
return driver.execute_script(script) or []
except Exception:
# Fallback to Selenium method
try:
elements = driver.find_elements(By.TAG_NAME, tag_name)
return [elem.get_attribute(attr) for elem in elements
if elem.get_attribute(attr)]
except Exception:
return []
def scraper(link: str, timeout: int = 10) -> Dict:
"""
Optimized web scraper with driver pooling and performance enhancements
Args:
link: URL to scrape
timeout: Maximum time to wait for page load (seconds)
Returns:
Dictionary containing page_text, script_sources, and link_sources
"""
try:
with _driver_pool.get_driver() as driver:
# Navigate to page with timeout
driver.get(link)
# Smart wait for page readiness instead of fixed sleep
try:
WebDriverWait(driver, timeout).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
except TimeoutException:
pass # Continue with partial load
# Extract page text efficiently
try:
page_text = driver.execute_script("return document.body.innerText || '';")
if not page_text:
page_text = driver.find_element(By.TAG_NAME, "body").text
except Exception:
page_text = ""
# Extract script and link sources using bulk operations
script_sources = safe_get_attributes_bulk(driver, "script", "src")
link_sources = safe_get_attributes_bulk(driver, "link", "href")
return {
"page_text": page_text,
"script_sources": script_sources,
"link_sources": link_sources,
}
except Exception as e:
# Return empty result on error rather than crashing
return {
"page_text": "",
"script_sources": [],
"link_sources": [],
"error": str(e)
}
# Legacy function for backward compatibility
def scraper_legacy(link: str, options=None):
"""Legacy scraper function for backward compatibility"""
return scraper(link)