File size: 13,896 Bytes
f8f7469 1721cbb f8f7469 1721cbb 7145885 f8f7469 1721cbb f8f7469 1721cbb f8f7469 e227110 f8f7469 c5ebc9d 7145885 f8f7469 1721cbb 59117ec 1721cbb f8f7469 e227110 f8f7469 98a7a0d f8f7469 337bce0 32daccc e251ff4 e227110 337bce0 168160d 337bce0 f8f7469 e227110 f8f7469 01127e9 1721cbb 01127e9 168160d f8f7469 1721cbb f8f7469 1721cbb 59117ec 1721cbb 59117ec 1721cbb f8f7469 1721cbb 47633d8 f8f7469 1721cbb f8f7469 1721cbb f8f7469 1721cbb e227110 1721cbb f8f7469 5ec9f75 1721cbb 5ec9f75 1721cbb f8f7469 59117ec f8f7469 1721cbb f8f7469 c5ebc9d f8f7469 7145885 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 | import json
import pprint
from pathlib import Path
import anyio
from anyio import Path as AsyncPath
from anyio import create_task_group, CapacityLimiter, create_memory_object_stream, EndOfStream
from scrapling.core.utils import log
from scrapling.spiders.request import Request
from scrapling.spiders.scheduler import Scheduler
from scrapling.spiders.session import SessionManager
from scrapling.spiders.result import CrawlStats, ItemList
from scrapling.spiders.checkpoint import CheckpointManager, CheckpointData
from scrapling.core._types import Dict, Union, Optional, TYPE_CHECKING, Any, AsyncGenerator
if TYPE_CHECKING:
from scrapling.spiders.spider import Spider
def _dump(obj: Dict) -> str:
return json.dumps(obj, indent=4)
class CrawlerEngine:
"""Orchestrates the crawling process."""
def __init__(
self,
spider: "Spider",
session_manager: SessionManager,
crawldir: Optional[Union[str, Path, AsyncPath]] = None,
interval: float = 300.0,
):
self.spider = spider
self.session_manager = session_manager
self.scheduler = Scheduler(
include_kwargs=spider.fp_include_kwargs,
include_headers=spider.fp_include_headers,
keep_fragments=spider.fp_keep_fragments,
)
self.stats = CrawlStats()
self._global_limiter = CapacityLimiter(spider.concurrent_requests)
self._domain_limiters: dict[str, CapacityLimiter] = {}
self._allowed_domains: set[str] = spider.allowed_domains or set()
self._active_tasks: int = 0
self._running: bool = False
self._items: ItemList = ItemList()
self._item_stream: Any = None
self._checkpoint_system_enabled = bool(crawldir)
self._checkpoint_manager = CheckpointManager(crawldir or "", interval)
self._last_checkpoint_time: float = 0.0
self._pause_requested: bool = False
self._force_stop: bool = False
self.paused: bool = False
def _is_domain_allowed(self, request: Request) -> bool:
"""Check if the request's domain is in allowed_domains."""
if not self._allowed_domains:
return True
domain = request.domain
for allowed in self._allowed_domains:
if domain == allowed or domain.endswith("." + allowed):
return True
return False
def _rate_limiter(self, domain: str) -> CapacityLimiter:
"""Get or create a per-domain concurrency limiter if enabled, otherwise use the global limiter."""
if self.spider.concurrent_requests_per_domain:
if domain not in self._domain_limiters:
self._domain_limiters[domain] = CapacityLimiter(self.spider.concurrent_requests_per_domain)
return self._domain_limiters[domain]
return self._global_limiter
def _normalize_request(self, request: Request) -> None:
"""Normalize request fields before enqueueing.
Resolves empty sid to the session manager's default session ID.
This ensures consistent fingerprinting for requests using the same session.
"""
if not request.sid:
request.sid = self.session_manager.default_session_id
async def _process_request(self, request: Request) -> None:
"""Download and process a single request."""
async with self._rate_limiter(request.domain):
if self.spider.download_delay:
await anyio.sleep(self.spider.download_delay)
if request._session_kwargs.get("proxy"):
self.stats.proxies.append(request._session_kwargs["proxy"])
if request._session_kwargs.get("proxies"):
self.stats.proxies.append(dict(request._session_kwargs["proxies"]))
try:
response = await self.session_manager.fetch(request)
self.stats.increment_requests_count(request.sid or self.session_manager.default_session_id)
self.stats.increment_response_bytes(request.domain, len(response.body))
self.stats.increment_status(response.status)
except Exception as e:
self.stats.failed_requests_count += 1
await self.spider.on_error(request, e)
return
if await self.spider.is_blocked(response):
self.stats.blocked_requests_count += 1
if request._retry_count < self.spider.max_blocked_retries:
retry_request = request.copy()
retry_request._retry_count += 1
retry_request.priority -= 1 # Don't retry immediately
retry_request.dont_filter = True
retry_request._session_kwargs.pop("proxy", None)
retry_request._session_kwargs.pop("proxies", None)
new_request = await self.spider.retry_blocked_request(retry_request, response)
self._normalize_request(new_request)
await self.scheduler.enqueue(new_request)
log.info(
f"Scheduled blocked request for retry ({retry_request._retry_count}/{self.spider.max_blocked_retries}): {request.url}"
)
else:
log.warning(f"Max retries exceeded for blocked request: {request.url}")
return
callback = request.callback if request.callback else self.spider.parse
try:
async for result in callback(response):
if isinstance(result, Request):
if self._is_domain_allowed(result):
self._normalize_request(result)
await self.scheduler.enqueue(result)
else:
self.stats.offsite_requests_count += 1
log.debug(f"Filtered offsite request to: {result.url}")
elif isinstance(result, dict):
processed_result = await self.spider.on_scraped_item(result)
if processed_result:
self.stats.items_scraped += 1
log.debug(f"Scraped from {str(response)}\n{pprint.pformat(processed_result)}")
if self._item_stream:
await self._item_stream.send(processed_result)
else:
self._items.append(processed_result)
else:
self.stats.items_dropped += 1
log.warning(f"Dropped from {str(response)}\n{processed_result}")
elif result is not None:
log.error(f"Spider must return Request, dict or None, got '{type(result)}' in {request}")
except Exception as e:
msg = f"Spider error processing {request}:\n {e}"
log.error(msg, exc_info=e)
await self.spider.on_error(request, e)
async def _task_wrapper(self, request: Request) -> None:
"""Wrapper to track active task count."""
try:
await self._process_request(request)
finally:
self._active_tasks -= 1
def request_pause(self) -> None:
"""Request a graceful pause of the crawl.
First call: requests graceful pause (waits for active tasks).
Second call: forces immediate stop.
"""
if self._force_stop:
return # Already forcing stop
if self._pause_requested:
# Second Ctrl+C - force stop
self._force_stop = True
log.warning("Force stop requested, cancelling immediately...")
else:
self._pause_requested = True
log.info(
"Pause requested, waiting for in-flight requests to complete (press Ctrl+C again to force stop)..."
)
async def _save_checkpoint(self) -> None:
"""Save current state to checkpoint files."""
requests, seen = self.scheduler.snapshot()
data = CheckpointData(requests=requests, seen=seen)
await self._checkpoint_manager.save(data)
self._last_checkpoint_time = anyio.current_time()
def _is_checkpoint_time(self) -> bool:
"""Check if it's time for the periodic checkpoint."""
if not self._checkpoint_system_enabled:
return False
if self._checkpoint_manager.interval == 0:
return False
current_time = anyio.current_time()
return (current_time - self._last_checkpoint_time) >= self._checkpoint_manager.interval
async def _restore_from_checkpoint(self) -> bool:
"""Attempt to restore state from checkpoint.
Returns True if successfully restored, False otherwise.
"""
if not self._checkpoint_system_enabled:
raise
data = await self._checkpoint_manager.load()
if data is None:
return False
self.scheduler.restore(data)
# Restore callbacks from spider after scheduler restore
for request in data.requests:
request._restore_callback(self.spider)
return True
async def crawl(self) -> CrawlStats:
"""Run the spider and return CrawlStats."""
self._running = True
self._items.clear()
self.paused = False
self._pause_requested = False
self._force_stop = False
self.stats = CrawlStats(start_time=anyio.current_time())
# Check for existing checkpoint
resuming = (await self._restore_from_checkpoint()) if self._checkpoint_system_enabled else False
self._last_checkpoint_time = anyio.current_time()
async with self.session_manager:
self.stats.concurrent_requests = self.spider.concurrent_requests
self.stats.concurrent_requests_per_domain = self.spider.concurrent_requests_per_domain
self.stats.download_delay = self.spider.download_delay
await self.spider.on_start(resuming=resuming)
try:
if not resuming:
async for request in self.spider.start_requests():
self._normalize_request(request)
await self.scheduler.enqueue(request)
else:
log.info("Resuming from checkpoint, skipping start_requests()")
# Process queue
async with create_task_group() as tg:
while self._running:
if self._pause_requested:
if self._active_tasks == 0 or self._force_stop:
if self._force_stop:
log.warning(f"Force stopping with {self._active_tasks} active tasks")
tg.cancel_scope.cancel()
# Only save checkpoint if checkpoint system is enabled
if self._checkpoint_system_enabled:
await self._save_checkpoint()
self.paused = True
log.info("Spider paused, checkpoint saved")
else:
log.info("Spider stopped gracefully")
self._running = False
break
# Wait briefly and check again
await anyio.sleep(0.05)
continue
if self._checkpoint_system_enabled and self._is_checkpoint_time():
await self._save_checkpoint()
if self.scheduler.is_empty:
# Empty queue + no active tasks = done
if self._active_tasks == 0:
self._running = False
log.debug("Spider idle")
break
# Brief wait for callbacks to enqueue new requests
await anyio.sleep(0.05)
continue
# Only spawn tasks up to concurrent_requests limit
# This prevents spawning thousands of waiting tasks
if self._active_tasks >= self.spider.concurrent_requests:
await anyio.sleep(0.01)
continue
request = await self.scheduler.dequeue()
self._active_tasks += 1
tg.start_soon(self._task_wrapper, request)
finally:
await self.spider.on_close()
# Clean up checkpoint files on successful completion (not paused)
if not self.paused and self._checkpoint_system_enabled:
await self._checkpoint_manager.cleanup()
self.stats.log_levels_counter = self.spider._log_counter.get_counts()
self.stats.end_time = anyio.current_time()
log.info(_dump(self.stats.to_dict()))
return self.stats
@property
def items(self) -> ItemList:
"""Access scraped items."""
return self._items
def __aiter__(self) -> AsyncGenerator[dict, None]:
return self._stream()
async def _stream(self) -> AsyncGenerator[dict, None]:
"""Async generator that runs crawl and yields items."""
send, recv = create_memory_object_stream[dict](100)
self._item_stream = send
async def run():
try:
await self.crawl()
finally:
await send.aclose()
async with create_task_group() as tg:
tg.start_soon(run)
try:
async for item in recv:
yield item
except EndOfStream:
pass
|