Spaces:
No application file
No application file
File size: 7,604 Bytes
f2c46e7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 | import chromedriver_autoinstaller
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException, WebDriverException
import json
import time
import threading
from typing import Optional, Dict, List
import atexit
from contextlib import contextmanager
# Install chromedriver once at module level
chromedriver_autoinstaller.install()
class DriverPool:
"""Thread-safe driver pool for reusing Chrome instances"""
def __init__(self, max_drivers: int = 3):
self.max_drivers = max_drivers
self.available_drivers = []
self.in_use_drivers = set()
self.lock = threading.Lock()
self._closed = False
# Register cleanup on exit
atexit.register(self.cleanup)
def _create_driver(self) -> webdriver.Chrome:
"""Create a new optimized Chrome driver"""
options = Options()
# Performance optimizations (Windows-compatible)
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--disable-extensions")
options.add_argument("--disable-plugins")
options.add_argument("--disable-images") # Don't load images for faster loading
options.add_argument("--disable-background-timer-throttling")
options.add_argument("--disable-backgrounding-occluded-windows")
options.add_argument("--disable-renderer-backgrounding")
# Set page load strategy to 'eager' for faster loading
options.page_load_strategy = 'eager'
# Set timeouts
options.add_argument("--timeout=10000")
# For Linux environments (skip on Windows)
import platform
if platform.system() == "Linux":
try:
options.binary_location = "/usr/bin/chromium-browser"
except:
pass # Use default Chrome location
driver = webdriver.Chrome(options=options)
# Set timeouts
driver.set_page_load_timeout(10) # 10 seconds max for page load
driver.implicitly_wait(2) # 2 seconds max for element finding
return driver
@contextmanager
def get_driver(self):
"""Context manager to get and return a driver"""
if self._closed:
raise RuntimeError("Driver pool is closed")
driver = None
try:
with self.lock:
if self.available_drivers:
driver = self.available_drivers.pop()
elif len(self.in_use_drivers) < self.max_drivers:
driver = self._create_driver()
else:
# Wait for a driver to become available
pass
if driver is None:
# Create a temporary driver if pool is full
driver = self._create_driver()
temp_driver = True
else:
temp_driver = False
with self.lock:
self.in_use_drivers.add(driver)
yield driver
finally:
if driver:
try:
# Clear any alerts or popups
driver.execute_script("window.stop();")
except:
pass
if temp_driver:
# Close temporary driver
try:
driver.quit()
except:
pass
else:
# Return driver to pool
with self.lock:
self.in_use_drivers.discard(driver)
if not self._closed and len(self.available_drivers) < self.max_drivers:
self.available_drivers.append(driver)
else:
try:
driver.quit()
except:
pass
def cleanup(self):
"""Clean up all drivers"""
self._closed = True
with self.lock:
for driver in self.available_drivers + list(self.in_use_drivers):
try:
driver.quit()
except:
pass
self.available_drivers.clear()
self.in_use_drivers.clear()
# Global driver pool instance
_driver_pool = DriverPool()
def safe_get_attributes_bulk(driver, tag_name: str, attr: str) -> List[str]:
"""Efficiently get attributes from multiple elements"""
try:
# Use JavaScript for faster bulk attribute extraction
script = f"""
var elements = document.getElementsByTagName('{tag_name}');
var results = [];
for (var i = 0; i < elements.length; i++) {{
var attr_value = elements[i].getAttribute('{attr}');
if (attr_value) {{
results.push(attr_value);
}}
}}
return results;
"""
return driver.execute_script(script) or []
except Exception:
# Fallback to Selenium method
try:
elements = driver.find_elements(By.TAG_NAME, tag_name)
return [elem.get_attribute(attr) for elem in elements
if elem.get_attribute(attr)]
except Exception:
return []
def scraper(link: str, timeout: int = 10) -> Dict:
"""
Optimized web scraper with driver pooling and performance enhancements
Args:
link: URL to scrape
timeout: Maximum time to wait for page load (seconds)
Returns:
Dictionary containing page_text, script_sources, and link_sources
"""
try:
with _driver_pool.get_driver() as driver:
# Navigate to page with timeout
driver.get(link)
# Smart wait for page readiness instead of fixed sleep
try:
WebDriverWait(driver, timeout).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
except TimeoutException:
pass # Continue with partial load
# Extract page text efficiently
try:
page_text = driver.execute_script("return document.body.innerText || '';")
if not page_text:
page_text = driver.find_element(By.TAG_NAME, "body").text
except Exception:
page_text = ""
# Extract script and link sources using bulk operations
script_sources = safe_get_attributes_bulk(driver, "script", "src")
link_sources = safe_get_attributes_bulk(driver, "link", "href")
return {
"page_text": page_text,
"script_sources": script_sources,
"link_sources": link_sources,
}
except Exception as e:
# Return empty result on error rather than crashing
return {
"page_text": "",
"script_sources": [],
"link_sources": [],
"error": str(e)
}
# Legacy function for backward compatibility
def scraper_legacy(link: str, options=None):
"""Legacy scraper function for backward compatibility"""
return scraper(link)
|