| from time import time |
| from asyncio import sleep as asyncio_sleep, Lock |
| from contextlib import contextmanager, asynccontextmanager |
|
|
| from playwright.sync_api._generated import Page |
| from playwright.sync_api import ( |
| Frame, |
| BrowserContext, |
| Response as SyncPlaywrightResponse, |
| ) |
| from playwright.async_api._generated import Page as AsyncPage |
| from playwright.async_api import ( |
| Frame as AsyncFrame, |
| Response as AsyncPlaywrightResponse, |
| BrowserContext as AsyncBrowserContext, |
| ) |
| from playwright._impl._errors import Error as PlaywrightError |
|
|
| from scrapling.parser import Selector |
| from scrapling.engines._browsers._page import PageInfo, PagePool |
| from scrapling.engines._browsers._validators import validate, PlaywrightConfig, StealthConfig |
| from scrapling.engines._browsers._config_tools import __default_chrome_useragent__, __default_useragent__ |
| from scrapling.engines.toolbelt.navigation import ( |
| construct_proxy_dict, |
| create_intercept_handler, |
| create_async_intercept_handler, |
| ) |
| from scrapling.core._types import ( |
| Any, |
| Dict, |
| List, |
| Set, |
| Optional, |
| Callable, |
| TYPE_CHECKING, |
| cast, |
| overload, |
| Tuple, |
| ProxyType, |
| Generator, |
| AsyncGenerator, |
| ) |
| from scrapling.engines.constants import STEALTH_ARGS, HARMFUL_ARGS, DEFAULT_ARGS |
|
|
|
|
| class SyncSession: |
| _config: "PlaywrightConfig | StealthConfig" |
| _context_options: Dict[str, Any] |
|
|
| def _build_context_with_proxy(self, proxy: Optional[ProxyType] = None) -> Dict[str, Any]: |
| raise NotImplementedError |
|
|
| def __init__(self, max_pages: int = 1): |
| self.max_pages = max_pages |
| self.page_pool = PagePool(max_pages) |
| self._max_wait_for_page = 60 |
| self.playwright: Any = None |
| self.context: Any = None |
| self.browser: Any = None |
| self._is_alive = False |
|
|
| def start(self) -> None: |
| pass |
|
|
| def close(self): |
| """Close all resources""" |
| if not self._is_alive: |
| return |
|
|
| if self.context: |
| self.context.close() |
| self.context = None |
|
|
| if self.browser: |
| self.browser.close() |
| self.browser = None |
|
|
| if self.playwright: |
| self.playwright.stop() |
| self.playwright = None |
|
|
| self._is_alive = False |
|
|
| def __enter__(self): |
| self.start() |
| return self |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self.close() |
|
|
| def _initialize_context(self, config: PlaywrightConfig | StealthConfig, ctx: BrowserContext) -> BrowserContext: |
| """Initialize the browser context.""" |
| if config.init_script: |
| ctx.add_init_script(path=config.init_script) |
|
|
| if config.cookies: |
| ctx.add_cookies(config.cookies) |
|
|
| return ctx |
|
|
| def _get_page( |
| self, |
| timeout: int | float, |
| extra_headers: Optional[Dict[str, str]], |
| disable_resources: bool, |
| blocked_domains: Optional[Set[str]] = None, |
| context: Optional[BrowserContext] = None, |
| ) -> PageInfo[Page]: |
| """Get a new page to use""" |
| |
| ctx = context if context is not None else self.context |
| assert ctx is not None, "Browser context not initialized" |
| page = ctx.new_page() |
| page.set_default_navigation_timeout(timeout) |
| page.set_default_timeout(timeout) |
| if extra_headers: |
| page.set_extra_http_headers(extra_headers) |
|
|
| if disable_resources or blocked_domains: |
| page.route("**/*", create_intercept_handler(disable_resources, blocked_domains)) |
| page_info = self.page_pool.add_page(page) |
| page_info.mark_busy() |
| return page_info |
|
|
| def get_pool_stats(self) -> Dict[str, int]: |
| """Get statistics about the current page pool""" |
| return { |
| "total_pages": self.page_pool.pages_count, |
| "busy_pages": self.page_pool.busy_count, |
| "max_pages": self.max_pages, |
| } |
|
|
| @staticmethod |
| def _wait_for_networkidle(page: Page | Frame, timeout: Optional[int] = None): |
| """Wait for the page to become idle (no network activity) even if there are never-ending requests.""" |
| try: |
| page.wait_for_load_state("networkidle", timeout=timeout) |
| except (PlaywrightError, Exception): |
| pass |
|
|
| def _wait_for_page_stability(self, page: Page | Frame, load_dom: bool, network_idle: bool): |
| page.wait_for_load_state(state="load") |
| if load_dom: |
| page.wait_for_load_state(state="domcontentloaded") |
| if network_idle: |
| self._wait_for_networkidle(page) |
|
|
| @staticmethod |
| def _create_response_handler(page_info: PageInfo[Page], response_container: List) -> Callable: |
| """Create a response handler that captures the final navigation response. |
| |
| :param page_info: The PageInfo object containing the page |
| :param response_container: A list to store the final response (mutable container) |
| :return: A callback function for page.on("response", ...) |
| """ |
|
|
| def handle_response(finished_response: SyncPlaywrightResponse): |
| if ( |
| finished_response.request.resource_type == "document" |
| and finished_response.request.is_navigation_request() |
| and finished_response.request.frame == page_info.page.main_frame |
| ): |
| response_container[0] = finished_response |
|
|
| return handle_response |
|
|
| @contextmanager |
| def _page_generator( |
| self, |
| timeout: int | float, |
| extra_headers: Optional[Dict[str, str]], |
| disable_resources: bool, |
| proxy: Optional[ProxyType] = None, |
| blocked_domains: Optional[Set[str]] = None, |
| ) -> Generator["PageInfo[Page]", None, None]: |
| """Acquire a page - either from persistent context or fresh context with proxy.""" |
| if proxy: |
| |
| if not self.browser: |
| raise RuntimeError("Browser not initialized for proxy rotation mode") |
| context_options = self._build_context_with_proxy(proxy) |
| context: BrowserContext = self.browser.new_context(**context_options) |
|
|
| try: |
| context = self._initialize_context(self._config, context) |
| page_info = self._get_page(timeout, extra_headers, disable_resources, blocked_domains, context=context) |
| yield page_info |
| finally: |
| context.close() |
| else: |
| |
| page_info = self._get_page(timeout, extra_headers, disable_resources, blocked_domains) |
| try: |
| yield page_info |
| finally: |
| page_info.page.close() |
| self.page_pool.pages.remove(page_info) |
|
|
|
|
| class AsyncSession: |
| _config: "PlaywrightConfig | StealthConfig" |
| _context_options: Dict[str, Any] |
|
|
| def _build_context_with_proxy(self, proxy: Optional[ProxyType] = None) -> Dict[str, Any]: |
| raise NotImplementedError |
|
|
| def __init__(self, max_pages: int = 1): |
| self.max_pages = max_pages |
| self.page_pool = PagePool(max_pages) |
| self._max_wait_for_page = 60 |
| self.playwright: Any = None |
| self.context: Any = None |
| self.browser: Any = None |
| self._is_alive = False |
| self._lock = Lock() |
|
|
| async def start(self) -> None: |
| pass |
|
|
| async def close(self): |
| """Close all resources""" |
| if not self._is_alive: |
| return |
|
|
| if self.context: |
| await self.context.close() |
| self.context = None |
|
|
| if self.browser: |
| await self.browser.close() |
| self.browser = None |
|
|
| if self.playwright: |
| await self.playwright.stop() |
| self.playwright = None |
|
|
| self._is_alive = False |
|
|
| async def __aenter__(self): |
| await self.start() |
| return self |
|
|
| async def __aexit__(self, exc_type, exc_val, exc_tb): |
| await self.close() |
|
|
| async def _initialize_context( |
| self, config: PlaywrightConfig | StealthConfig, ctx: AsyncBrowserContext |
| ) -> AsyncBrowserContext: |
| """Initialize the browser context.""" |
| if config.init_script: |
| await ctx.add_init_script(path=config.init_script) |
|
|
| if config.cookies: |
| await ctx.add_cookies(config.cookies) |
|
|
| return ctx |
|
|
| async def _get_page( |
| self, |
| timeout: int | float, |
| extra_headers: Optional[Dict[str, str]], |
| disable_resources: bool, |
| blocked_domains: Optional[Set[str]] = None, |
| context: Optional[AsyncBrowserContext] = None, |
| ) -> PageInfo[AsyncPage]: |
| """Get a new page to use""" |
| ctx = context if context is not None else self.context |
| if TYPE_CHECKING: |
| assert ctx is not None, "Browser context not initialized" |
|
|
| async with self._lock: |
| |
| if context is None and self.page_pool.pages_count >= self.max_pages: |
| |
| start_time = time() |
| while time() - start_time < self._max_wait_for_page: |
| await asyncio_sleep(0.05) |
| if self.page_pool.pages_count < self.max_pages: |
| break |
| else: |
| raise TimeoutError( |
| f"No pages finished to clear place in the pool within the {self._max_wait_for_page}s timeout period" |
| ) |
|
|
| page = await ctx.new_page() |
| page.set_default_navigation_timeout(timeout) |
| page.set_default_timeout(timeout) |
| if extra_headers: |
| await page.set_extra_http_headers(extra_headers) |
|
|
| if disable_resources or blocked_domains: |
| await page.route("**/*", create_async_intercept_handler(disable_resources, blocked_domains)) |
|
|
| return self.page_pool.add_page(page) |
|
|
| def get_pool_stats(self) -> Dict[str, int]: |
| """Get statistics about the current page pool""" |
| return { |
| "total_pages": self.page_pool.pages_count, |
| "busy_pages": self.page_pool.busy_count, |
| "max_pages": self.max_pages, |
| } |
|
|
| @staticmethod |
| async def _wait_for_networkidle(page: AsyncPage | AsyncFrame, timeout: Optional[int] = None): |
| """Wait for the page to become idle (no network activity) even if there are never-ending requests.""" |
| try: |
| await page.wait_for_load_state("networkidle", timeout=timeout) |
| except (PlaywrightError, Exception): |
| pass |
|
|
| async def _wait_for_page_stability(self, page: AsyncPage | AsyncFrame, load_dom: bool, network_idle: bool): |
| await page.wait_for_load_state(state="load") |
| if load_dom: |
| await page.wait_for_load_state(state="domcontentloaded") |
| if network_idle: |
| await self._wait_for_networkidle(page) |
|
|
| @staticmethod |
| def _create_response_handler(page_info: PageInfo[AsyncPage], response_container: List) -> Callable: |
| """Create an async response handler that captures the final navigation response. |
| |
| :param page_info: The PageInfo object containing the page |
| :param response_container: A list to store the final response (mutable container) |
| :return: A callback function for page.on("response", ...) |
| """ |
|
|
| async def handle_response(finished_response: AsyncPlaywrightResponse): |
| if ( |
| finished_response.request.resource_type == "document" |
| and finished_response.request.is_navigation_request() |
| and finished_response.request.frame == page_info.page.main_frame |
| ): |
| response_container[0] = finished_response |
|
|
| return handle_response |
|
|
| @asynccontextmanager |
| async def _page_generator( |
| self, |
| timeout: int | float, |
| extra_headers: Optional[Dict[str, str]], |
| disable_resources: bool, |
| proxy: Optional[ProxyType] = None, |
| blocked_domains: Optional[Set[str]] = None, |
| ) -> AsyncGenerator["PageInfo[AsyncPage]", None]: |
| """Acquire a page - either from persistent context or fresh context with proxy.""" |
| if proxy: |
| |
| if not self.browser: |
| raise RuntimeError("Browser not initialized for proxy rotation mode") |
| context_options = self._build_context_with_proxy(proxy) |
| context: AsyncBrowserContext = await self.browser.new_context(**context_options) |
|
|
| try: |
| context = await self._initialize_context(self._config, context) |
| page_info = await self._get_page( |
| timeout, extra_headers, disable_resources, blocked_domains, context=context |
| ) |
| yield page_info |
| finally: |
| await context.close() |
| else: |
| |
| page_info = await self._get_page(timeout, extra_headers, disable_resources, blocked_domains) |
| try: |
| yield page_info |
| finally: |
| await page_info.page.close() |
| self.page_pool.pages.remove(page_info) |
|
|
|
|
| class BaseSessionMixin: |
| _config: "PlaywrightConfig | StealthConfig" |
|
|
| @overload |
| def __validate_routine__(self, params: Dict, model: type[StealthConfig]) -> StealthConfig: ... |
|
|
| @overload |
| def __validate_routine__(self, params: Dict, model: type[PlaywrightConfig]) -> PlaywrightConfig: ... |
|
|
| def __validate_routine__( |
| self, params: Dict, model: type[PlaywrightConfig] | type[StealthConfig] |
| ) -> PlaywrightConfig | StealthConfig: |
| |
| self._context_options: Dict[str, Any] = {"color_scheme": "dark", "device_scale_factor": 2} |
| self._browser_options: Dict[str, Any] = { |
| "args": DEFAULT_ARGS, |
| "ignore_default_args": HARMFUL_ARGS, |
| } |
| if "__max_pages" in params: |
| params["max_pages"] = params.pop("__max_pages") |
|
|
| config = validate(params, model=model) |
| self._headers_keys = ( |
| {header.lower() for header in config.extra_headers.keys()} if config.extra_headers else set() |
| ) |
|
|
| return config |
|
|
| def __generate_options__(self, extra_flags: Tuple | None = None) -> None: |
| config: PlaywrightConfig | StealthConfig = self._config |
| self._context_options.update( |
| { |
| "proxy": config.proxy, |
| "locale": config.locale, |
| "timezone_id": config.timezone_id, |
| "extra_http_headers": config.extra_headers, |
| } |
| ) |
| |
| if config.useragent: |
| self._context_options["user_agent"] = config.useragent |
| elif not config.useragent and config.headless: |
| self._context_options["user_agent"] = ( |
| __default_chrome_useragent__ if config.real_chrome else __default_useragent__ |
| ) |
|
|
| if not config.cdp_url: |
| flags = self._browser_options["args"] |
| if config.extra_flags or extra_flags: |
| flags = list(set(tuple(flags) + tuple(config.extra_flags or extra_flags or ()))) |
|
|
| self._browser_options.update( |
| { |
| "args": flags, |
| "headless": config.headless, |
| "channel": "chrome" if config.real_chrome else "chromium", |
| } |
| ) |
|
|
| self._user_data_dir = config.user_data_dir |
| else: |
| self._browser_options = {} |
|
|
| if config.additional_args: |
| self._context_options.update(config.additional_args) |
|
|
| def _build_context_with_proxy(self, proxy: Optional[ProxyType] = None) -> Dict[str, Any]: |
| """ |
| Build context options with a specific proxy for rotation mode. |
| |
| :param proxy: Proxy URL string or Playwright-style proxy dict to use for this context. |
| :return: Dictionary of context options for browser.new_context(). |
| """ |
|
|
| context_options = self._context_options.copy() |
|
|
| |
| if proxy: |
| context_options["proxy"] = construct_proxy_dict(proxy) |
|
|
| return context_options |
|
|
|
|
| class DynamicSessionMixin(BaseSessionMixin): |
| def __validate__(self, **params): |
| self._config = self.__validate_routine__(params, model=PlaywrightConfig) |
| self.__generate_options__() |
|
|
|
|
| class StealthySessionMixin(BaseSessionMixin): |
| def __validate__(self, **params): |
| self._config = self.__validate_routine__(params, model=StealthConfig) |
| self._context_options.update( |
| { |
| "is_mobile": False, |
| "has_touch": False, |
| |
| "service_workers": "allow", |
| "ignore_https_errors": True, |
| "screen": {"width": 1920, "height": 1080}, |
| "viewport": {"width": 1920, "height": 1080}, |
| "permissions": ["geolocation", "notifications"], |
| } |
| ) |
| self.__generate_stealth_options() |
|
|
| def __generate_stealth_options(self) -> None: |
| config = cast(StealthConfig, self._config) |
| flags: Tuple[str, ...] = tuple() |
| if not config.cdp_url: |
| flags = tuple(DEFAULT_ARGS) + tuple(STEALTH_ARGS) |
|
|
| if config.block_webrtc: |
| flags += ( |
| "--webrtc-ip-handling-policy=disable_non_proxied_udp", |
| "--force-webrtc-ip-handling-policy", |
| ) |
| if not config.allow_webgl: |
| flags += ( |
| "--disable-webgl", |
| "--disable-webgl-image-chromium", |
| "--disable-webgl2", |
| ) |
| if config.hide_canvas: |
| flags += ("--fingerprinting-canvas-image-data-noise",) |
|
|
| super(StealthySessionMixin, self).__generate_options__(flags) |
|
|
| @staticmethod |
| def _detect_cloudflare(page_content: str) -> str | None: |
| """ |
| Detect the type of Cloudflare challenge present in the provided page content. |
| |
| This function analyzes the given page content to identify whether a specific |
| type of Cloudflare challenge is present. It checks for three predefined |
| challenge types: non-interactive, managed, and interactive. If a challenge |
| type is detected, it returns the corresponding type as a string. If no |
| challenge type is detected, it returns None. |
| |
| Args: |
| page_content (str): The content of the page to analyze for Cloudflare |
| challenge types. |
| |
| Returns: |
| str: A string representing the detected Cloudflare challenge type, if |
| found. Returns None if no challenge matches. |
| """ |
| challenge_types = ( |
| "non-interactive", |
| "managed", |
| "interactive", |
| ) |
| for ctype in challenge_types: |
| if f"cType: '{ctype}'" in page_content: |
| return ctype |
|
|
| |
| selector = Selector(content=page_content) |
| if selector.css('script[src*="challenges.cloudflare.com/turnstile/v"]'): |
| return "embedded" |
|
|
| return None |
|
|