Spaces:
No application file
No application file
| import chromedriver_autoinstaller | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.common.exceptions import StaleElementReferenceException, TimeoutException, WebDriverException | |
| import json | |
| import time | |
| import threading | |
| from typing import Optional, Dict, List | |
| import atexit | |
| from contextlib import contextmanager | |
| # Install chromedriver once at module level | |
| chromedriver_autoinstaller.install() | |
| class DriverPool: | |
| """Thread-safe driver pool for reusing Chrome instances""" | |
| def __init__(self, max_drivers: int = 3): | |
| self.max_drivers = max_drivers | |
| self.available_drivers = [] | |
| self.in_use_drivers = set() | |
| self.lock = threading.Lock() | |
| self._closed = False | |
| # Register cleanup on exit | |
| atexit.register(self.cleanup) | |
| def _create_driver(self) -> webdriver.Chrome: | |
| """Create a new optimized Chrome driver""" | |
| options = Options() | |
| # Performance optimizations (Windows-compatible) | |
| options.add_argument("--headless") | |
| options.add_argument("--no-sandbox") | |
| options.add_argument("--disable-dev-shm-usage") | |
| options.add_argument("--disable-gpu") | |
| options.add_argument("--disable-extensions") | |
| options.add_argument("--disable-plugins") | |
| options.add_argument("--disable-images") # Don't load images for faster loading | |
| options.add_argument("--disable-background-timer-throttling") | |
| options.add_argument("--disable-backgrounding-occluded-windows") | |
| options.add_argument("--disable-renderer-backgrounding") | |
| # Set page load strategy to 'eager' for faster loading | |
| options.page_load_strategy = 'eager' | |
| # Set timeouts | |
| options.add_argument("--timeout=10000") | |
| # For Linux environments (skip on Windows) | |
| import platform | |
| if platform.system() == "Linux": | |
| try: | |
| options.binary_location = "/usr/bin/chromium-browser" | |
| except: | |
| pass # Use default Chrome location | |
| driver = webdriver.Chrome(options=options) | |
| # Set timeouts | |
| driver.set_page_load_timeout(10) # 10 seconds max for page load | |
| driver.implicitly_wait(2) # 2 seconds max for element finding | |
| return driver | |
| def get_driver(self): | |
| """Context manager to get and return a driver""" | |
| if self._closed: | |
| raise RuntimeError("Driver pool is closed") | |
| driver = None | |
| try: | |
| with self.lock: | |
| if self.available_drivers: | |
| driver = self.available_drivers.pop() | |
| elif len(self.in_use_drivers) < self.max_drivers: | |
| driver = self._create_driver() | |
| else: | |
| # Wait for a driver to become available | |
| pass | |
| if driver is None: | |
| # Create a temporary driver if pool is full | |
| driver = self._create_driver() | |
| temp_driver = True | |
| else: | |
| temp_driver = False | |
| with self.lock: | |
| self.in_use_drivers.add(driver) | |
| yield driver | |
| finally: | |
| if driver: | |
| try: | |
| # Clear any alerts or popups | |
| driver.execute_script("window.stop();") | |
| except: | |
| pass | |
| if temp_driver: | |
| # Close temporary driver | |
| try: | |
| driver.quit() | |
| except: | |
| pass | |
| else: | |
| # Return driver to pool | |
| with self.lock: | |
| self.in_use_drivers.discard(driver) | |
| if not self._closed and len(self.available_drivers) < self.max_drivers: | |
| self.available_drivers.append(driver) | |
| else: | |
| try: | |
| driver.quit() | |
| except: | |
| pass | |
| def cleanup(self): | |
| """Clean up all drivers""" | |
| self._closed = True | |
| with self.lock: | |
| for driver in self.available_drivers + list(self.in_use_drivers): | |
| try: | |
| driver.quit() | |
| except: | |
| pass | |
| self.available_drivers.clear() | |
| self.in_use_drivers.clear() | |
| # Global driver pool instance | |
| _driver_pool = DriverPool() | |
| def safe_get_attributes_bulk(driver, tag_name: str, attr: str) -> List[str]: | |
| """Efficiently get attributes from multiple elements""" | |
| try: | |
| # Use JavaScript for faster bulk attribute extraction | |
| script = f""" | |
| var elements = document.getElementsByTagName('{tag_name}'); | |
| var results = []; | |
| for (var i = 0; i < elements.length; i++) {{ | |
| var attr_value = elements[i].getAttribute('{attr}'); | |
| if (attr_value) {{ | |
| results.push(attr_value); | |
| }} | |
| }} | |
| return results; | |
| """ | |
| return driver.execute_script(script) or [] | |
| except Exception: | |
| # Fallback to Selenium method | |
| try: | |
| elements = driver.find_elements(By.TAG_NAME, tag_name) | |
| return [elem.get_attribute(attr) for elem in elements | |
| if elem.get_attribute(attr)] | |
| except Exception: | |
| return [] | |
| def scraper(link: str, timeout: int = 10) -> Dict: | |
| """ | |
| Optimized web scraper with driver pooling and performance enhancements | |
| Args: | |
| link: URL to scrape | |
| timeout: Maximum time to wait for page load (seconds) | |
| Returns: | |
| Dictionary containing page_text, script_sources, and link_sources | |
| """ | |
| try: | |
| with _driver_pool.get_driver() as driver: | |
| # Navigate to page with timeout | |
| driver.get(link) | |
| # Smart wait for page readiness instead of fixed sleep | |
| try: | |
| WebDriverWait(driver, timeout).until( | |
| lambda d: d.execute_script("return document.readyState") == "complete" | |
| ) | |
| except TimeoutException: | |
| pass # Continue with partial load | |
| # Extract page text efficiently | |
| try: | |
| page_text = driver.execute_script("return document.body.innerText || '';") | |
| if not page_text: | |
| page_text = driver.find_element(By.TAG_NAME, "body").text | |
| except Exception: | |
| page_text = "" | |
| # Extract script and link sources using bulk operations | |
| script_sources = safe_get_attributes_bulk(driver, "script", "src") | |
| link_sources = safe_get_attributes_bulk(driver, "link", "href") | |
| return { | |
| "page_text": page_text, | |
| "script_sources": script_sources, | |
| "link_sources": link_sources, | |
| } | |
| except Exception as e: | |
| # Return empty result on error rather than crashing | |
| return { | |
| "page_text": "", | |
| "script_sources": [], | |
| "link_sources": [], | |
| "error": str(e) | |
| } | |
| # Legacy function for backward compatibility | |
| def scraper_legacy(link: str, options=None): | |
| """Legacy scraper function for backward compatibility""" | |
| return scraper(link) | |