from time import time from asyncio import sleep as asyncio_sleep, Lock from contextlib import contextmanager, asynccontextmanager from playwright.sync_api._generated import Page from playwright.sync_api import ( Frame, BrowserContext, Response as SyncPlaywrightResponse, ) from playwright.async_api._generated import Page as AsyncPage from playwright.async_api import ( Frame as AsyncFrame, Response as AsyncPlaywrightResponse, BrowserContext as AsyncBrowserContext, ) from playwright._impl._errors import Error as PlaywrightError from scrapling.parser import Selector from scrapling.engines._browsers._page import PageInfo, PagePool from scrapling.engines._browsers._validators import validate, PlaywrightConfig, StealthConfig from scrapling.engines._browsers._config_tools import __default_chrome_useragent__, __default_useragent__ from scrapling.engines.toolbelt.navigation import ( construct_proxy_dict, create_intercept_handler, create_async_intercept_handler, ) from scrapling.core._types import ( Any, Dict, List, Set, Optional, Callable, TYPE_CHECKING, cast, overload, Tuple, ProxyType, Generator, AsyncGenerator, ) from scrapling.engines.constants import STEALTH_ARGS, HARMFUL_ARGS, DEFAULT_ARGS class SyncSession: _config: "PlaywrightConfig | StealthConfig" _context_options: Dict[str, Any] def _build_context_with_proxy(self, proxy: Optional[ProxyType] = None) -> Dict[str, Any]: raise NotImplementedError # pragma: no cover def __init__(self, max_pages: int = 1): self.max_pages = max_pages self.page_pool = PagePool(max_pages) self._max_wait_for_page = 60 self.playwright: Any = None self.context: Any = None self.browser: Any = None self._is_alive = False def start(self) -> None: pass def close(self): # pragma: no cover """Close all resources""" if not self._is_alive: return if self.context: self.context.close() self.context = None if self.browser: self.browser.close() self.browser = None if self.playwright: self.playwright.stop() self.playwright = None # pyright: ignore self._is_alive = False def __enter__(self): self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def _initialize_context(self, config: PlaywrightConfig | StealthConfig, ctx: BrowserContext) -> BrowserContext: """Initialize the browser context.""" if config.init_script: ctx.add_init_script(path=config.init_script) if config.cookies: # pragma: no cover ctx.add_cookies(config.cookies) return ctx def _get_page( self, timeout: int | float, extra_headers: Optional[Dict[str, str]], disable_resources: bool, blocked_domains: Optional[Set[str]] = None, context: Optional[BrowserContext] = None, ) -> PageInfo[Page]: # pragma: no cover """Get a new page to use""" # No need to check if a page is available or not in sync code because the code blocked before reaching here till the page closed, ofc. ctx = context if context is not None else self.context assert ctx is not None, "Browser context not initialized" page = ctx.new_page() page.set_default_navigation_timeout(timeout) page.set_default_timeout(timeout) if extra_headers: page.set_extra_http_headers(extra_headers) if disable_resources or blocked_domains: page.route("**/*", create_intercept_handler(disable_resources, blocked_domains)) page_info = self.page_pool.add_page(page) page_info.mark_busy() return page_info def get_pool_stats(self) -> Dict[str, int]: """Get statistics about the current page pool""" return { "total_pages": self.page_pool.pages_count, "busy_pages": self.page_pool.busy_count, "max_pages": self.max_pages, } @staticmethod def _wait_for_networkidle(page: Page | Frame, timeout: Optional[int] = None): """Wait for the page to become idle (no network activity) even if there are never-ending requests.""" try: page.wait_for_load_state("networkidle", timeout=timeout) except (PlaywrightError, Exception): pass def _wait_for_page_stability(self, page: Page | Frame, load_dom: bool, network_idle: bool): page.wait_for_load_state(state="load") if load_dom: page.wait_for_load_state(state="domcontentloaded") if network_idle: self._wait_for_networkidle(page) @staticmethod def _create_response_handler(page_info: PageInfo[Page], response_container: List) -> Callable: """Create a response handler that captures the final navigation response. :param page_info: The PageInfo object containing the page :param response_container: A list to store the final response (mutable container) :return: A callback function for page.on("response", ...) """ def handle_response(finished_response: SyncPlaywrightResponse): if ( finished_response.request.resource_type == "document" and finished_response.request.is_navigation_request() and finished_response.request.frame == page_info.page.main_frame ): response_container[0] = finished_response return handle_response @contextmanager def _page_generator( self, timeout: int | float, extra_headers: Optional[Dict[str, str]], disable_resources: bool, proxy: Optional[ProxyType] = None, blocked_domains: Optional[Set[str]] = None, ) -> Generator["PageInfo[Page]", None, None]: """Acquire a page - either from persistent context or fresh context with proxy.""" if proxy: # Rotation mode: create fresh context with the provided proxy if not self.browser: # pragma: no cover raise RuntimeError("Browser not initialized for proxy rotation mode") context_options = self._build_context_with_proxy(proxy) context: BrowserContext = self.browser.new_context(**context_options) try: context = self._initialize_context(self._config, context) page_info = self._get_page(timeout, extra_headers, disable_resources, blocked_domains, context=context) yield page_info finally: context.close() else: # Standard mode: use PagePool with persistent context page_info = self._get_page(timeout, extra_headers, disable_resources, blocked_domains) try: yield page_info finally: page_info.page.close() self.page_pool.pages.remove(page_info) class AsyncSession: _config: "PlaywrightConfig | StealthConfig" _context_options: Dict[str, Any] def _build_context_with_proxy(self, proxy: Optional[ProxyType] = None) -> Dict[str, Any]: raise NotImplementedError # pragma: no cover def __init__(self, max_pages: int = 1): self.max_pages = max_pages self.page_pool = PagePool(max_pages) self._max_wait_for_page = 60 self.playwright: Any = None self.context: Any = None self.browser: Any = None self._is_alive = False self._lock = Lock() async def start(self) -> None: pass async def close(self): """Close all resources""" if not self._is_alive: # pragma: no cover return if self.context: await self.context.close() self.context = None # pyright: ignore if self.browser: await self.browser.close() self.browser = None if self.playwright: await self.playwright.stop() self.playwright = None # pyright: ignore self._is_alive = False async def __aenter__(self): await self.start() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def _initialize_context( self, config: PlaywrightConfig | StealthConfig, ctx: AsyncBrowserContext ) -> AsyncBrowserContext: """Initialize the browser context.""" if config.init_script: # pragma: no cover await ctx.add_init_script(path=config.init_script) if config.cookies: # pragma: no cover await ctx.add_cookies(config.cookies) return ctx async def _get_page( self, timeout: int | float, extra_headers: Optional[Dict[str, str]], disable_resources: bool, blocked_domains: Optional[Set[str]] = None, context: Optional[AsyncBrowserContext] = None, ) -> PageInfo[AsyncPage]: # pragma: no cover """Get a new page to use""" ctx = context if context is not None else self.context if TYPE_CHECKING: assert ctx is not None, "Browser context not initialized" async with self._lock: # If we're at max capacity after cleanup, wait for busy pages to finish if context is None and self.page_pool.pages_count >= self.max_pages: # Only applies when using persistent context start_time = time() while time() - start_time < self._max_wait_for_page: await asyncio_sleep(0.05) if self.page_pool.pages_count < self.max_pages: break else: raise TimeoutError( f"No pages finished to clear place in the pool within the {self._max_wait_for_page}s timeout period" ) page = await ctx.new_page() page.set_default_navigation_timeout(timeout) page.set_default_timeout(timeout) if extra_headers: await page.set_extra_http_headers(extra_headers) if disable_resources or blocked_domains: await page.route("**/*", create_async_intercept_handler(disable_resources, blocked_domains)) return self.page_pool.add_page(page) def get_pool_stats(self) -> Dict[str, int]: """Get statistics about the current page pool""" return { "total_pages": self.page_pool.pages_count, "busy_pages": self.page_pool.busy_count, "max_pages": self.max_pages, } @staticmethod async def _wait_for_networkidle(page: AsyncPage | AsyncFrame, timeout: Optional[int] = None): """Wait for the page to become idle (no network activity) even if there are never-ending requests.""" try: await page.wait_for_load_state("networkidle", timeout=timeout) except (PlaywrightError, Exception): pass async def _wait_for_page_stability(self, page: AsyncPage | AsyncFrame, load_dom: bool, network_idle: bool): await page.wait_for_load_state(state="load") if load_dom: await page.wait_for_load_state(state="domcontentloaded") if network_idle: await self._wait_for_networkidle(page) @staticmethod def _create_response_handler(page_info: PageInfo[AsyncPage], response_container: List) -> Callable: """Create an async response handler that captures the final navigation response. :param page_info: The PageInfo object containing the page :param response_container: A list to store the final response (mutable container) :return: A callback function for page.on("response", ...) """ async def handle_response(finished_response: AsyncPlaywrightResponse): if ( finished_response.request.resource_type == "document" and finished_response.request.is_navigation_request() and finished_response.request.frame == page_info.page.main_frame ): response_container[0] = finished_response return handle_response @asynccontextmanager async def _page_generator( self, timeout: int | float, extra_headers: Optional[Dict[str, str]], disable_resources: bool, proxy: Optional[ProxyType] = None, blocked_domains: Optional[Set[str]] = None, ) -> AsyncGenerator["PageInfo[AsyncPage]", None]: """Acquire a page - either from persistent context or fresh context with proxy.""" if proxy: # Rotation mode: create fresh context with the provided proxy if not self.browser: # pragma: no cover raise RuntimeError("Browser not initialized for proxy rotation mode") context_options = self._build_context_with_proxy(proxy) context: AsyncBrowserContext = await self.browser.new_context(**context_options) try: context = await self._initialize_context(self._config, context) page_info = await self._get_page( timeout, extra_headers, disable_resources, blocked_domains, context=context ) yield page_info finally: await context.close() else: # Standard mode: use PagePool with persistent context page_info = await self._get_page(timeout, extra_headers, disable_resources, blocked_domains) try: yield page_info finally: await page_info.page.close() self.page_pool.pages.remove(page_info) class BaseSessionMixin: _config: "PlaywrightConfig | StealthConfig" @overload def __validate_routine__(self, params: Dict, model: type[StealthConfig]) -> StealthConfig: ... @overload def __validate_routine__(self, params: Dict, model: type[PlaywrightConfig]) -> PlaywrightConfig: ... def __validate_routine__( self, params: Dict, model: type[PlaywrightConfig] | type[StealthConfig] ) -> PlaywrightConfig | StealthConfig: # Dark color scheme bypasses the 'prefersLightColor' check in creepjs self._context_options: Dict[str, Any] = {"color_scheme": "dark", "device_scale_factor": 2} self._browser_options: Dict[str, Any] = { "args": DEFAULT_ARGS, "ignore_default_args": HARMFUL_ARGS, } if "__max_pages" in params: params["max_pages"] = params.pop("__max_pages") config = validate(params, model=model) self._headers_keys = ( {header.lower() for header in config.extra_headers.keys()} if config.extra_headers else set() ) return config def __generate_options__(self, extra_flags: Tuple | None = None) -> None: config: PlaywrightConfig | StealthConfig = self._config self._context_options.update( { "proxy": config.proxy, "locale": config.locale, "timezone_id": config.timezone_id, "extra_http_headers": config.extra_headers, } ) # The default useragent in the headful is always correct now in the current versions of Playwright if config.useragent: self._context_options["user_agent"] = config.useragent elif not config.useragent and config.headless: self._context_options["user_agent"] = ( __default_chrome_useragent__ if config.real_chrome else __default_useragent__ ) if not config.cdp_url: flags = self._browser_options["args"] if config.extra_flags or extra_flags: flags = list(set(tuple(flags) + tuple(config.extra_flags or extra_flags or ()))) self._browser_options.update( { "args": flags, "headless": config.headless, "channel": "chrome" if config.real_chrome else "chromium", } ) self._user_data_dir = config.user_data_dir else: self._browser_options = {} if config.additional_args: self._context_options.update(config.additional_args) def _build_context_with_proxy(self, proxy: Optional[ProxyType] = None) -> Dict[str, Any]: """ Build context options with a specific proxy for rotation mode. :param proxy: Proxy URL string or Playwright-style proxy dict to use for this context. :return: Dictionary of context options for browser.new_context(). """ context_options = self._context_options.copy() # Override proxy if provided if proxy: context_options["proxy"] = construct_proxy_dict(proxy) return context_options class DynamicSessionMixin(BaseSessionMixin): def __validate__(self, **params): self._config = self.__validate_routine__(params, model=PlaywrightConfig) self.__generate_options__() class StealthySessionMixin(BaseSessionMixin): def __validate__(self, **params): self._config = self.__validate_routine__(params, model=StealthConfig) self._context_options.update( { "is_mobile": False, "has_touch": False, # I'm thinking about disabling it to rest from all Service Workers' headache, but let's keep it as it is for now "service_workers": "allow", "ignore_https_errors": True, "screen": {"width": 1920, "height": 1080}, "viewport": {"width": 1920, "height": 1080}, "permissions": ["geolocation", "notifications"], } ) self.__generate_stealth_options() def __generate_stealth_options(self) -> None: config = cast(StealthConfig, self._config) flags: Tuple[str, ...] = tuple() if not config.cdp_url: flags = tuple(DEFAULT_ARGS) + tuple(STEALTH_ARGS) if config.block_webrtc: flags += ( "--webrtc-ip-handling-policy=disable_non_proxied_udp", "--force-webrtc-ip-handling-policy", # Ensures the policy is enforced ) if not config.allow_webgl: flags += ( "--disable-webgl", "--disable-webgl-image-chromium", "--disable-webgl2", ) if config.hide_canvas: flags += ("--fingerprinting-canvas-image-data-noise",) super(StealthySessionMixin, self).__generate_options__(flags) @staticmethod def _detect_cloudflare(page_content: str) -> str | None: """ Detect the type of Cloudflare challenge present in the provided page content. This function analyzes the given page content to identify whether a specific type of Cloudflare challenge is present. It checks for three predefined challenge types: non-interactive, managed, and interactive. If a challenge type is detected, it returns the corresponding type as a string. If no challenge type is detected, it returns None. Args: page_content (str): The content of the page to analyze for Cloudflare challenge types. Returns: str: A string representing the detected Cloudflare challenge type, if found. Returns None if no challenge matches. """ challenge_types = ( "non-interactive", "managed", "interactive", ) for ctype in challenge_types: if f"cType: '{ctype}'" in page_content: return ctype # Check if turnstile captcha is embedded inside the page (Usually inside a closed Shadow iframe) selector = Selector(content=page_content) if selector.css('script[src*="challenges.cloudflare.com/turnstile/v"]'): return "embedded" return None