import chromedriver_autoinstaller from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import StaleElementReferenceException, TimeoutException, WebDriverException import json import time import threading from typing import Optional, Dict, List import atexit from contextlib import contextmanager # Install chromedriver once at module level chromedriver_autoinstaller.install() class DriverPool: """Thread-safe driver pool for reusing Chrome instances""" def __init__(self, max_drivers: int = 3): self.max_drivers = max_drivers self.available_drivers = [] self.in_use_drivers = set() self.lock = threading.Lock() self._closed = False # Register cleanup on exit atexit.register(self.cleanup) def _create_driver(self) -> webdriver.Chrome: """Create a new optimized Chrome driver""" options = Options() # Performance optimizations (Windows-compatible) options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("--disable-extensions") options.add_argument("--disable-plugins") options.add_argument("--disable-images") # Don't load images for faster loading options.add_argument("--disable-background-timer-throttling") options.add_argument("--disable-backgrounding-occluded-windows") options.add_argument("--disable-renderer-backgrounding") # Set page load strategy to 'eager' for faster loading options.page_load_strategy = 'eager' # Set timeouts options.add_argument("--timeout=10000") # For Linux environments (skip on Windows) import platform if platform.system() == "Linux": try: options.binary_location = "/usr/bin/chromium-browser" except: pass # Use default Chrome location driver = webdriver.Chrome(options=options) # Set timeouts driver.set_page_load_timeout(10) # 10 seconds max for page load driver.implicitly_wait(2) # 2 seconds max for element finding return driver @contextmanager def get_driver(self): """Context manager to get and return a driver""" if self._closed: raise RuntimeError("Driver pool is closed") driver = None try: with self.lock: if self.available_drivers: driver = self.available_drivers.pop() elif len(self.in_use_drivers) < self.max_drivers: driver = self._create_driver() else: # Wait for a driver to become available pass if driver is None: # Create a temporary driver if pool is full driver = self._create_driver() temp_driver = True else: temp_driver = False with self.lock: self.in_use_drivers.add(driver) yield driver finally: if driver: try: # Clear any alerts or popups driver.execute_script("window.stop();") except: pass if temp_driver: # Close temporary driver try: driver.quit() except: pass else: # Return driver to pool with self.lock: self.in_use_drivers.discard(driver) if not self._closed and len(self.available_drivers) < self.max_drivers: self.available_drivers.append(driver) else: try: driver.quit() except: pass def cleanup(self): """Clean up all drivers""" self._closed = True with self.lock: for driver in self.available_drivers + list(self.in_use_drivers): try: driver.quit() except: pass self.available_drivers.clear() self.in_use_drivers.clear() # Global driver pool instance _driver_pool = DriverPool() def safe_get_attributes_bulk(driver, tag_name: str, attr: str) -> List[str]: """Efficiently get attributes from multiple elements""" try: # Use JavaScript for faster bulk attribute extraction script = f""" var elements = document.getElementsByTagName('{tag_name}'); var results = []; for (var i = 0; i < elements.length; i++) {{ var attr_value = elements[i].getAttribute('{attr}'); if (attr_value) {{ results.push(attr_value); }} }} return results; """ return driver.execute_script(script) or [] except Exception: # Fallback to Selenium method try: elements = driver.find_elements(By.TAG_NAME, tag_name) return [elem.get_attribute(attr) for elem in elements if elem.get_attribute(attr)] except Exception: return [] def scraper(link: str, timeout: int = 10) -> Dict: """ Optimized web scraper with driver pooling and performance enhancements Args: link: URL to scrape timeout: Maximum time to wait for page load (seconds) Returns: Dictionary containing page_text, script_sources, and link_sources """ try: with _driver_pool.get_driver() as driver: # Navigate to page with timeout driver.get(link) # Smart wait for page readiness instead of fixed sleep try: WebDriverWait(driver, timeout).until( lambda d: d.execute_script("return document.readyState") == "complete" ) except TimeoutException: pass # Continue with partial load # Extract page text efficiently try: page_text = driver.execute_script("return document.body.innerText || '';") if not page_text: page_text = driver.find_element(By.TAG_NAME, "body").text except Exception: page_text = "" # Extract script and link sources using bulk operations script_sources = safe_get_attributes_bulk(driver, "script", "src") link_sources = safe_get_attributes_bulk(driver, "link", "href") return { "page_text": page_text, "script_sources": script_sources, "link_sources": link_sources, } except Exception as e: # Return empty result on error rather than crashing return { "page_text": "", "script_sources": [], "link_sources": [], "error": str(e) } # Legacy function for backward compatibility def scraper_legacy(link: str, options=None): """Legacy scraper function for backward compatibility""" return scraper(link)