| import json |
| import pprint |
| from pathlib import Path |
|
|
| import anyio |
| from anyio import Path as AsyncPath |
| from anyio import create_task_group, CapacityLimiter, create_memory_object_stream, EndOfStream |
|
|
| from scrapling.core.utils import log |
| from scrapling.spiders.request import Request |
| from scrapling.spiders.scheduler import Scheduler |
| from scrapling.spiders.session import SessionManager |
| from scrapling.spiders.result import CrawlStats, ItemList |
| from scrapling.spiders.checkpoint import CheckpointManager, CheckpointData |
| from scrapling.core._types import Dict, Union, Optional, TYPE_CHECKING, Any, AsyncGenerator |
|
|
| if TYPE_CHECKING: |
| from scrapling.spiders.spider import Spider |
|
|
|
|
| def _dump(obj: Dict) -> str: |
| return json.dumps(obj, indent=4) |
|
|
|
|
| class CrawlerEngine: |
| """Orchestrates the crawling process.""" |
|
|
| def __init__( |
| self, |
| spider: "Spider", |
| session_manager: SessionManager, |
| crawldir: Optional[Union[str, Path, AsyncPath]] = None, |
| interval: float = 300.0, |
| ): |
| self.spider = spider |
| self.session_manager = session_manager |
| self.scheduler = Scheduler( |
| include_kwargs=spider.fp_include_kwargs, |
| include_headers=spider.fp_include_headers, |
| keep_fragments=spider.fp_keep_fragments, |
| ) |
| self.stats = CrawlStats() |
|
|
| self._global_limiter = CapacityLimiter(spider.concurrent_requests) |
| self._domain_limiters: dict[str, CapacityLimiter] = {} |
| self._allowed_domains: set[str] = spider.allowed_domains or set() |
|
|
| self._active_tasks: int = 0 |
| self._running: bool = False |
| self._items: ItemList = ItemList() |
| self._item_stream: Any = None |
|
|
| self._checkpoint_system_enabled = bool(crawldir) |
| self._checkpoint_manager = CheckpointManager(crawldir or "", interval) |
| self._last_checkpoint_time: float = 0.0 |
| self._pause_requested: bool = False |
| self._force_stop: bool = False |
| self.paused: bool = False |
|
|
| def _is_domain_allowed(self, request: Request) -> bool: |
| """Check if the request's domain is in allowed_domains.""" |
| if not self._allowed_domains: |
| return True |
|
|
| domain = request.domain |
| for allowed in self._allowed_domains: |
| if domain == allowed or domain.endswith("." + allowed): |
| return True |
| return False |
|
|
| def _rate_limiter(self, domain: str) -> CapacityLimiter: |
| """Get or create a per-domain concurrency limiter if enabled, otherwise use the global limiter.""" |
| if self.spider.concurrent_requests_per_domain: |
| if domain not in self._domain_limiters: |
| self._domain_limiters[domain] = CapacityLimiter(self.spider.concurrent_requests_per_domain) |
| return self._domain_limiters[domain] |
| return self._global_limiter |
|
|
| def _normalize_request(self, request: Request) -> None: |
| """Normalize request fields before enqueueing. |
| |
| Resolves empty sid to the session manager's default session ID. |
| This ensures consistent fingerprinting for requests using the same session. |
| """ |
| if not request.sid: |
| request.sid = self.session_manager.default_session_id |
|
|
| async def _process_request(self, request: Request) -> None: |
| """Download and process a single request.""" |
| async with self._rate_limiter(request.domain): |
| if self.spider.download_delay: |
| await anyio.sleep(self.spider.download_delay) |
|
|
| if request._session_kwargs.get("proxy"): |
| self.stats.proxies.append(request._session_kwargs["proxy"]) |
| if request._session_kwargs.get("proxies"): |
| self.stats.proxies.append(dict(request._session_kwargs["proxies"])) |
| try: |
| response = await self.session_manager.fetch(request) |
| self.stats.increment_requests_count(request.sid or self.session_manager.default_session_id) |
| self.stats.increment_response_bytes(request.domain, len(response.body)) |
| self.stats.increment_status(response.status) |
|
|
| except Exception as e: |
| self.stats.failed_requests_count += 1 |
| await self.spider.on_error(request, e) |
| return |
|
|
| if await self.spider.is_blocked(response): |
| self.stats.blocked_requests_count += 1 |
| if request._retry_count < self.spider.max_blocked_retries: |
| retry_request = request.copy() |
| retry_request._retry_count += 1 |
| retry_request.priority -= 1 |
| retry_request.dont_filter = True |
| retry_request._session_kwargs.pop("proxy", None) |
| retry_request._session_kwargs.pop("proxies", None) |
|
|
| new_request = await self.spider.retry_blocked_request(retry_request, response) |
| self._normalize_request(new_request) |
| await self.scheduler.enqueue(new_request) |
| log.info( |
| f"Scheduled blocked request for retry ({retry_request._retry_count}/{self.spider.max_blocked_retries}): {request.url}" |
| ) |
| else: |
| log.warning(f"Max retries exceeded for blocked request: {request.url}") |
| return |
|
|
| callback = request.callback if request.callback else self.spider.parse |
| try: |
| async for result in callback(response): |
| if isinstance(result, Request): |
| if self._is_domain_allowed(result): |
| self._normalize_request(result) |
| await self.scheduler.enqueue(result) |
| else: |
| self.stats.offsite_requests_count += 1 |
| log.debug(f"Filtered offsite request to: {result.url}") |
| elif isinstance(result, dict): |
| processed_result = await self.spider.on_scraped_item(result) |
| if processed_result: |
| self.stats.items_scraped += 1 |
| log.debug(f"Scraped from {str(response)}\n{pprint.pformat(processed_result)}") |
| if self._item_stream: |
| await self._item_stream.send(processed_result) |
| else: |
| self._items.append(processed_result) |
| else: |
| self.stats.items_dropped += 1 |
| log.warning(f"Dropped from {str(response)}\n{processed_result}") |
| elif result is not None: |
| log.error(f"Spider must return Request, dict or None, got '{type(result)}' in {request}") |
| except Exception as e: |
| msg = f"Spider error processing {request}:\n {e}" |
| log.error(msg, exc_info=e) |
| await self.spider.on_error(request, e) |
|
|
| async def _task_wrapper(self, request: Request) -> None: |
| """Wrapper to track active task count.""" |
| try: |
| await self._process_request(request) |
| finally: |
| self._active_tasks -= 1 |
|
|
| def request_pause(self) -> None: |
| """Request a graceful pause of the crawl. |
| |
| First call: requests graceful pause (waits for active tasks). |
| Second call: forces immediate stop. |
| """ |
| if self._force_stop: |
| return |
|
|
| if self._pause_requested: |
| |
| self._force_stop = True |
| log.warning("Force stop requested, cancelling immediately...") |
| else: |
| self._pause_requested = True |
| log.info( |
| "Pause requested, waiting for in-flight requests to complete (press Ctrl+C again to force stop)..." |
| ) |
|
|
| async def _save_checkpoint(self) -> None: |
| """Save current state to checkpoint files.""" |
| requests, seen = self.scheduler.snapshot() |
| data = CheckpointData(requests=requests, seen=seen) |
| await self._checkpoint_manager.save(data) |
| self._last_checkpoint_time = anyio.current_time() |
|
|
| def _is_checkpoint_time(self) -> bool: |
| """Check if it's time for the periodic checkpoint.""" |
| if not self._checkpoint_system_enabled: |
| return False |
|
|
| if self._checkpoint_manager.interval == 0: |
| return False |
|
|
| current_time = anyio.current_time() |
| return (current_time - self._last_checkpoint_time) >= self._checkpoint_manager.interval |
|
|
| async def _restore_from_checkpoint(self) -> bool: |
| """Attempt to restore state from checkpoint. |
| |
| Returns True if successfully restored, False otherwise. |
| """ |
| if not self._checkpoint_system_enabled: |
| raise |
|
|
| data = await self._checkpoint_manager.load() |
| if data is None: |
| return False |
|
|
| self.scheduler.restore(data) |
|
|
| |
| for request in data.requests: |
| request._restore_callback(self.spider) |
|
|
| return True |
|
|
| async def crawl(self) -> CrawlStats: |
| """Run the spider and return CrawlStats.""" |
| self._running = True |
| self._items.clear() |
| self.paused = False |
| self._pause_requested = False |
| self._force_stop = False |
| self.stats = CrawlStats(start_time=anyio.current_time()) |
|
|
| |
| resuming = (await self._restore_from_checkpoint()) if self._checkpoint_system_enabled else False |
| self._last_checkpoint_time = anyio.current_time() |
|
|
| async with self.session_manager: |
| self.stats.concurrent_requests = self.spider.concurrent_requests |
| self.stats.concurrent_requests_per_domain = self.spider.concurrent_requests_per_domain |
| self.stats.download_delay = self.spider.download_delay |
| await self.spider.on_start(resuming=resuming) |
|
|
| try: |
| if not resuming: |
| async for request in self.spider.start_requests(): |
| self._normalize_request(request) |
| await self.scheduler.enqueue(request) |
| else: |
| log.info("Resuming from checkpoint, skipping start_requests()") |
|
|
| |
| async with create_task_group() as tg: |
| while self._running: |
| if self._pause_requested: |
| if self._active_tasks == 0 or self._force_stop: |
| if self._force_stop: |
| log.warning(f"Force stopping with {self._active_tasks} active tasks") |
| tg.cancel_scope.cancel() |
|
|
| |
| if self._checkpoint_system_enabled: |
| await self._save_checkpoint() |
| self.paused = True |
| log.info("Spider paused, checkpoint saved") |
| else: |
| log.info("Spider stopped gracefully") |
|
|
| self._running = False |
| break |
|
|
| |
| await anyio.sleep(0.05) |
| continue |
|
|
| if self._checkpoint_system_enabled and self._is_checkpoint_time(): |
| await self._save_checkpoint() |
|
|
| if self.scheduler.is_empty: |
| |
| if self._active_tasks == 0: |
| self._running = False |
| log.debug("Spider idle") |
| break |
|
|
| |
| await anyio.sleep(0.05) |
| continue |
|
|
| |
| |
| if self._active_tasks >= self.spider.concurrent_requests: |
| await anyio.sleep(0.01) |
| continue |
|
|
| request = await self.scheduler.dequeue() |
| self._active_tasks += 1 |
| tg.start_soon(self._task_wrapper, request) |
|
|
| finally: |
| await self.spider.on_close() |
| |
| if not self.paused and self._checkpoint_system_enabled: |
| await self._checkpoint_manager.cleanup() |
|
|
| self.stats.log_levels_counter = self.spider._log_counter.get_counts() |
| self.stats.end_time = anyio.current_time() |
| log.info(_dump(self.stats.to_dict())) |
| return self.stats |
|
|
| @property |
| def items(self) -> ItemList: |
| """Access scraped items.""" |
| return self._items |
|
|
| def __aiter__(self) -> AsyncGenerator[dict, None]: |
| return self._stream() |
|
|
| async def _stream(self) -> AsyncGenerator[dict, None]: |
| """Async generator that runs crawl and yields items.""" |
| send, recv = create_memory_object_stream[dict](100) |
| self._item_stream = send |
|
|
| async def run(): |
| try: |
| await self.crawl() |
| finally: |
| await send.aclose() |
|
|
| async with create_task_group() as tg: |
| tg.start_soon(run) |
| try: |
| async for item in recv: |
| yield item |
| except EndOfStream: |
| pass |
|
|