Spaces:
Paused
Paused
| import json | |
| import pprint | |
| from pathlib import Path | |
| import anyio | |
| from anyio import Path as AsyncPath | |
| from anyio import create_task_group, CapacityLimiter, create_memory_object_stream, EndOfStream | |
| from scrapling.core.utils import log | |
| from scrapling.spiders.request import Request | |
| from scrapling.spiders.scheduler import Scheduler | |
| from scrapling.spiders.session import SessionManager | |
| from scrapling.spiders.result import CrawlStats, ItemList | |
| from scrapling.spiders.checkpoint import CheckpointManager, CheckpointData | |
| from scrapling.core._types import Dict, Union, Optional, TYPE_CHECKING, Any, AsyncGenerator | |
| if TYPE_CHECKING: | |
| from scrapling.spiders.spider import Spider | |
| def _dump(obj: Dict) -> str: | |
| return json.dumps(obj, indent=4) | |
| class CrawlerEngine: | |
| """Orchestrates the crawling process.""" | |
| def __init__( | |
| self, | |
| spider: "Spider", | |
| session_manager: SessionManager, | |
| crawldir: Optional[Union[str, Path, AsyncPath]] = None, | |
| interval: float = 300.0, | |
| ): | |
| self.spider = spider | |
| self.session_manager = session_manager | |
| self.scheduler = Scheduler( | |
| include_kwargs=spider.fp_include_kwargs, | |
| include_headers=spider.fp_include_headers, | |
| keep_fragments=spider.fp_keep_fragments, | |
| ) | |
| self.stats = CrawlStats() | |
| self._global_limiter = CapacityLimiter(spider.concurrent_requests) | |
| self._domain_limiters: dict[str, CapacityLimiter] = {} | |
| self._allowed_domains: set[str] = spider.allowed_domains or set() | |
| self._active_tasks: int = 0 | |
| self._running: bool = False | |
| self._items: ItemList = ItemList() | |
| self._item_stream: Any = None | |
| self._checkpoint_system_enabled = bool(crawldir) | |
| self._checkpoint_manager = CheckpointManager(crawldir or "", interval) | |
| self._last_checkpoint_time: float = 0.0 | |
| self._pause_requested: bool = False | |
| self._force_stop: bool = False | |
| self.paused: bool = False | |
| def _is_domain_allowed(self, request: Request) -> bool: | |
| """Check if the request's domain is in allowed_domains.""" | |
| if not self._allowed_domains: | |
| return True | |
| domain = request.domain | |
| for allowed in self._allowed_domains: | |
| if domain == allowed or domain.endswith("." + allowed): | |
| return True | |
| return False | |
| def _rate_limiter(self, domain: str) -> CapacityLimiter: | |
| """Get or create a per-domain concurrency limiter if enabled, otherwise use the global limiter.""" | |
| if self.spider.concurrent_requests_per_domain: | |
| if domain not in self._domain_limiters: | |
| self._domain_limiters[domain] = CapacityLimiter(self.spider.concurrent_requests_per_domain) | |
| return self._domain_limiters[domain] | |
| return self._global_limiter | |
| def _normalize_request(self, request: Request) -> None: | |
| """Normalize request fields before enqueueing. | |
| Resolves empty sid to the session manager's default session ID. | |
| This ensures consistent fingerprinting for requests using the same session. | |
| """ | |
| if not request.sid: | |
| request.sid = self.session_manager.default_session_id | |
| async def _process_request(self, request: Request) -> None: | |
| """Download and process a single request.""" | |
| async with self._rate_limiter(request.domain): | |
| if self.spider.download_delay: | |
| await anyio.sleep(self.spider.download_delay) | |
| if request._session_kwargs.get("proxy"): | |
| self.stats.proxies.append(request._session_kwargs["proxy"]) | |
| if request._session_kwargs.get("proxies"): | |
| self.stats.proxies.append(dict(request._session_kwargs["proxies"])) | |
| try: | |
| response = await self.session_manager.fetch(request) | |
| self.stats.increment_requests_count(request.sid or self.session_manager.default_session_id) | |
| self.stats.increment_response_bytes(request.domain, len(response.body)) | |
| self.stats.increment_status(response.status) | |
| except Exception as e: | |
| self.stats.failed_requests_count += 1 | |
| await self.spider.on_error(request, e) | |
| return | |
| if await self.spider.is_blocked(response): | |
| self.stats.blocked_requests_count += 1 | |
| if request._retry_count < self.spider.max_blocked_retries: | |
| retry_request = request.copy() | |
| retry_request._retry_count += 1 | |
| retry_request.priority -= 1 # Don't retry immediately | |
| retry_request.dont_filter = True | |
| retry_request._session_kwargs.pop("proxy", None) | |
| retry_request._session_kwargs.pop("proxies", None) | |
| new_request = await self.spider.retry_blocked_request(retry_request, response) | |
| self._normalize_request(new_request) | |
| await self.scheduler.enqueue(new_request) | |
| log.info( | |
| f"Scheduled blocked request for retry ({retry_request._retry_count}/{self.spider.max_blocked_retries}): {request.url}" | |
| ) | |
| else: | |
| log.warning(f"Max retries exceeded for blocked request: {request.url}") | |
| return | |
| callback = request.callback if request.callback else self.spider.parse | |
| try: | |
| async for result in callback(response): | |
| if isinstance(result, Request): | |
| if self._is_domain_allowed(result): | |
| self._normalize_request(result) | |
| await self.scheduler.enqueue(result) | |
| else: | |
| self.stats.offsite_requests_count += 1 | |
| log.debug(f"Filtered offsite request to: {result.url}") | |
| elif isinstance(result, dict): | |
| processed_result = await self.spider.on_scraped_item(result) | |
| if processed_result: | |
| self.stats.items_scraped += 1 | |
| log.debug(f"Scraped from {str(response)}\n{pprint.pformat(processed_result)}") | |
| if self._item_stream: | |
| await self._item_stream.send(processed_result) | |
| else: | |
| self._items.append(processed_result) | |
| else: | |
| self.stats.items_dropped += 1 | |
| log.warning(f"Dropped from {str(response)}\n{processed_result}") | |
| elif result is not None: | |
| log.error(f"Spider must return Request, dict or None, got '{type(result)}' in {request}") | |
| except Exception as e: | |
| msg = f"Spider error processing {request}:\n {e}" | |
| log.error(msg, exc_info=e) | |
| await self.spider.on_error(request, e) | |
| async def _task_wrapper(self, request: Request) -> None: | |
| """Wrapper to track active task count.""" | |
| try: | |
| await self._process_request(request) | |
| finally: | |
| self._active_tasks -= 1 | |
| def request_pause(self) -> None: | |
| """Request a graceful pause of the crawl. | |
| First call: requests graceful pause (waits for active tasks). | |
| Second call: forces immediate stop. | |
| """ | |
| if self._force_stop: | |
| return # Already forcing stop | |
| if self._pause_requested: | |
| # Second Ctrl+C - force stop | |
| self._force_stop = True | |
| log.warning("Force stop requested, cancelling immediately...") | |
| else: | |
| self._pause_requested = True | |
| log.info( | |
| "Pause requested, waiting for in-flight requests to complete (press Ctrl+C again to force stop)..." | |
| ) | |
| async def _save_checkpoint(self) -> None: | |
| """Save current state to checkpoint files.""" | |
| requests, seen = self.scheduler.snapshot() | |
| data = CheckpointData(requests=requests, seen=seen) | |
| await self._checkpoint_manager.save(data) | |
| self._last_checkpoint_time = anyio.current_time() | |
| def _is_checkpoint_time(self) -> bool: | |
| """Check if it's time for the periodic checkpoint.""" | |
| if not self._checkpoint_system_enabled: | |
| return False | |
| if self._checkpoint_manager.interval == 0: | |
| return False | |
| current_time = anyio.current_time() | |
| return (current_time - self._last_checkpoint_time) >= self._checkpoint_manager.interval | |
| async def _restore_from_checkpoint(self) -> bool: | |
| """Attempt to restore state from checkpoint. | |
| Returns True if successfully restored, False otherwise. | |
| """ | |
| if not self._checkpoint_system_enabled: | |
| raise | |
| data = await self._checkpoint_manager.load() | |
| if data is None: | |
| return False | |
| self.scheduler.restore(data) | |
| # Restore callbacks from spider after scheduler restore | |
| for request in data.requests: | |
| request._restore_callback(self.spider) | |
| return True | |
| async def crawl(self) -> CrawlStats: | |
| """Run the spider and return CrawlStats.""" | |
| self._running = True | |
| self._items.clear() | |
| self.paused = False | |
| self._pause_requested = False | |
| self._force_stop = False | |
| self.stats = CrawlStats(start_time=anyio.current_time()) | |
| # Check for existing checkpoint | |
| resuming = (await self._restore_from_checkpoint()) if self._checkpoint_system_enabled else False | |
| self._last_checkpoint_time = anyio.current_time() | |
| async with self.session_manager: | |
| self.stats.concurrent_requests = self.spider.concurrent_requests | |
| self.stats.concurrent_requests_per_domain = self.spider.concurrent_requests_per_domain | |
| self.stats.download_delay = self.spider.download_delay | |
| await self.spider.on_start(resuming=resuming) | |
| try: | |
| if not resuming: | |
| async for request in self.spider.start_requests(): | |
| self._normalize_request(request) | |
| await self.scheduler.enqueue(request) | |
| else: | |
| log.info("Resuming from checkpoint, skipping start_requests()") | |
| # Process queue | |
| async with create_task_group() as tg: | |
| while self._running: | |
| if self._pause_requested: | |
| if self._active_tasks == 0 or self._force_stop: | |
| if self._force_stop: | |
| log.warning(f"Force stopping with {self._active_tasks} active tasks") | |
| tg.cancel_scope.cancel() | |
| # Only save checkpoint if checkpoint system is enabled | |
| if self._checkpoint_system_enabled: | |
| await self._save_checkpoint() | |
| self.paused = True | |
| log.info("Spider paused, checkpoint saved") | |
| else: | |
| log.info("Spider stopped gracefully") | |
| self._running = False | |
| break | |
| # Wait briefly and check again | |
| await anyio.sleep(0.05) | |
| continue | |
| if self._checkpoint_system_enabled and self._is_checkpoint_time(): | |
| await self._save_checkpoint() | |
| if self.scheduler.is_empty: | |
| # Empty queue + no active tasks = done | |
| if self._active_tasks == 0: | |
| self._running = False | |
| log.debug("Spider idle") | |
| break | |
| # Brief wait for callbacks to enqueue new requests | |
| await anyio.sleep(0.05) | |
| continue | |
| # Only spawn tasks up to concurrent_requests limit | |
| # This prevents spawning thousands of waiting tasks | |
| if self._active_tasks >= self.spider.concurrent_requests: | |
| await anyio.sleep(0.01) | |
| continue | |
| request = await self.scheduler.dequeue() | |
| self._active_tasks += 1 | |
| tg.start_soon(self._task_wrapper, request) | |
| finally: | |
| await self.spider.on_close() | |
| # Clean up checkpoint files on successful completion (not paused) | |
| if not self.paused and self._checkpoint_system_enabled: | |
| await self._checkpoint_manager.cleanup() | |
| self.stats.log_levels_counter = self.spider._log_counter.get_counts() | |
| self.stats.end_time = anyio.current_time() | |
| log.info(_dump(self.stats.to_dict())) | |
| return self.stats | |
| def items(self) -> ItemList: | |
| """Access scraped items.""" | |
| return self._items | |
| def __aiter__(self) -> AsyncGenerator[dict, None]: | |
| return self._stream() | |
| async def _stream(self) -> AsyncGenerator[dict, None]: | |
| """Async generator that runs crawl and yields items.""" | |
| send, recv = create_memory_object_stream[dict](100) | |
| self._item_stream = send | |
| async def run(): | |
| try: | |
| await self.crawl() | |
| finally: | |
| await send.aclose() | |
| async with create_task_group() as tg: | |
| tg.start_soon(run) | |
| try: | |
| async for item in recv: | |
| yield item | |
| except EndOfStream: | |
| pass | |