Scrapling / tests /spiders /test_engine.py
Karim shoair
test: add tests for spiders engine
c7d1929
"""Tests for the CrawlerEngine class."""
import tempfile
from pathlib import Path
import anyio
import pytest
from scrapling.spiders.engine import CrawlerEngine, _dump
from scrapling.spiders.request import Request
from scrapling.spiders.session import SessionManager
from scrapling.spiders.result import CrawlStats, ItemList
from scrapling.spiders.checkpoint import CheckpointData
from scrapling.core._types import Any, Dict, Set, AsyncGenerator
# ---------------------------------------------------------------------------
# Mock helpers
# ---------------------------------------------------------------------------
class MockResponse:
"""Minimal Response stand-in."""
def __init__(self, status: int = 200, body: bytes = b"ok", url: str = "https://example.com"):
self.status = status
self.body = body
self.url = url
self.request: Any = None
self.meta: Dict[str, Any] = {}
def __str__(self) -> str:
return self.url
class MockSession:
"""Mock session that returns a canned response."""
def __init__(self, name: str = "mock", response: MockResponse | None = None):
self.name = name
self._is_alive = False
self._response = response or MockResponse()
self.fetch_calls: list[dict] = []
async def __aenter__(self):
self._is_alive = True
return self
async def __aexit__(self, *args):
self._is_alive = False
async def fetch(self, url: str, **kwargs):
self.fetch_calls.append({"url": url, **kwargs})
resp = MockResponse(status=self._response.status, body=self._response.body, url=url)
return resp
class ErrorSession(MockSession):
"""Session that raises on fetch."""
def __init__(self, error: Exception | None = None):
super().__init__("error")
self._error = error or RuntimeError("fetch failed")
async def fetch(self, url: str, **kwargs):
raise self._error
class MockSpider:
"""Lightweight spider stub for engine tests."""
def __init__(
self,
*,
concurrent_requests: int = 4,
concurrent_requests_per_domain: int = 0,
download_delay: float = 0.0,
max_blocked_retries: int = 3,
allowed_domains: Set[str] | None = None,
fp_include_kwargs: bool = False,
fp_include_headers: bool = False,
fp_keep_fragments: bool = False,
is_blocked_fn=None,
on_scraped_item_fn=None,
retry_blocked_request_fn=None,
):
self.concurrent_requests = concurrent_requests
self.concurrent_requests_per_domain = concurrent_requests_per_domain
self.download_delay = download_delay
self.max_blocked_retries = max_blocked_retries
self.allowed_domains = allowed_domains or set()
self.fp_include_kwargs = fp_include_kwargs
self.fp_include_headers = fp_include_headers
self.fp_keep_fragments = fp_keep_fragments
self.name = "test_spider"
# Tracking lists
self.on_start_calls: list[dict] = []
self.on_close_calls: int = 0
self.on_error_calls: list[tuple[Request, Exception]] = []
self.scraped_items: list[dict] = []
self.blocked_responses: list = []
self.retry_requests: list = []
# Pluggable behaviour
self._is_blocked_fn = is_blocked_fn
self._on_scraped_item_fn = on_scraped_item_fn
self._retry_blocked_request_fn = retry_blocked_request_fn
# Log counter stub
self._log_counter = _LogCounterStub()
async def parse(self, response) -> AsyncGenerator[Dict[str, Any] | Request | None, None]:
yield {"url": str(response)}
async def on_start(self, resuming: bool = False) -> None:
self.on_start_calls.append({"resuming": resuming})
async def on_close(self) -> None:
self.on_close_calls += 1
async def on_error(self, request: Request, error: Exception) -> None:
self.on_error_calls.append((request, error))
async def on_scraped_item(self, item: Dict[str, Any]) -> Dict[str, Any] | None:
if self._on_scraped_item_fn:
return self._on_scraped_item_fn(item)
self.scraped_items.append(item)
return item
async def is_blocked(self, response) -> bool:
if self._is_blocked_fn:
return self._is_blocked_fn(response)
return False
async def retry_blocked_request(self, request: Request, response) -> Request:
self.retry_requests.append(request)
if self._retry_blocked_request_fn:
return self._retry_blocked_request_fn(request, response)
return request
async def start_requests(self) -> AsyncGenerator[Request, None]:
yield Request("https://example.com", sid="default")
class _LogCounterStub:
"""Stub for LogCounterHandler."""
def get_counts(self) -> Dict[str, int]:
return {"debug": 0, "info": 0, "warning": 0, "error": 0, "critical": 0}
def _make_engine(
spider: MockSpider | None = None,
session: MockSession | None = None,
crawldir: str | None = None,
interval: float = 300.0,
) -> CrawlerEngine:
"""Create a CrawlerEngine wired to mock objects."""
spider = spider or MockSpider()
sm = SessionManager()
sm.add("default", session or MockSession())
return CrawlerEngine(spider, sm, crawldir=crawldir, interval=interval)
# ---------------------------------------------------------------------------
# Tests: _dump helper
# ---------------------------------------------------------------------------
class TestDumpHelper:
def test_dump_returns_json_string(self):
result = _dump({"key": "value"})
assert '"key": "value"' in result
def test_dump_handles_nested(self):
result = _dump({"a": {"b": 1}})
assert '"a"' in result
assert '"b"' in result
# ---------------------------------------------------------------------------
# Tests: __init__
# ---------------------------------------------------------------------------
class TestCrawlerEngineInit:
def test_default_initialisation(self):
engine = _make_engine()
assert engine._running is False
assert engine._active_tasks == 0
assert engine._pause_requested is False
assert engine._force_stop is False
assert engine.paused is False
assert isinstance(engine.stats, CrawlStats)
assert isinstance(engine.items, ItemList)
def test_checkpoint_system_disabled_by_default(self):
engine = _make_engine()
assert engine._checkpoint_system_enabled is False
def test_checkpoint_system_enabled_with_crawldir(self):
with tempfile.TemporaryDirectory() as tmpdir:
engine = _make_engine(crawldir=tmpdir)
assert engine._checkpoint_system_enabled is True
def test_global_limiter_uses_concurrent_requests(self):
spider = MockSpider(concurrent_requests=8)
engine = _make_engine(spider=spider)
assert engine._global_limiter.total_tokens == 8
def test_allowed_domains_from_spider(self):
spider = MockSpider(allowed_domains={"example.com", "test.org"})
engine = _make_engine(spider=spider)
assert engine._allowed_domains == {"example.com", "test.org"}
# ---------------------------------------------------------------------------
# Tests: _is_domain_allowed
# ---------------------------------------------------------------------------
class TestIsDomainAllowed:
def test_all_allowed_when_empty(self):
engine = _make_engine()
request = Request("https://anything.com/page")
assert engine._is_domain_allowed(request) is True
def test_exact_domain_match(self):
spider = MockSpider(allowed_domains={"example.com"})
engine = _make_engine(spider=spider)
assert engine._is_domain_allowed(Request("https://example.com/page")) is True
assert engine._is_domain_allowed(Request("https://other.com/page")) is False
def test_subdomain_match(self):
spider = MockSpider(allowed_domains={"example.com"})
engine = _make_engine(spider=spider)
assert engine._is_domain_allowed(Request("https://sub.example.com/page")) is True
assert engine._is_domain_allowed(Request("https://deep.sub.example.com/x")) is True
def test_partial_name_not_matched(self):
spider = MockSpider(allowed_domains={"example.com"})
engine = _make_engine(spider=spider)
# "notexample.com" should NOT match "example.com"
assert engine._is_domain_allowed(Request("https://notexample.com/x")) is False
def test_multiple_allowed_domains(self):
spider = MockSpider(allowed_domains={"a.com", "b.org"})
engine = _make_engine(spider=spider)
assert engine._is_domain_allowed(Request("https://a.com/")) is True
assert engine._is_domain_allowed(Request("https://b.org/")) is True
assert engine._is_domain_allowed(Request("https://c.net/")) is False
# ---------------------------------------------------------------------------
# Tests: _rate_limiter
# ---------------------------------------------------------------------------
class TestRateLimiter:
def test_returns_global_limiter_when_per_domain_disabled(self):
engine = _make_engine() # concurrent_requests_per_domain=0
limiter = engine._rate_limiter("example.com")
assert limiter is engine._global_limiter
def test_returns_per_domain_limiter_when_enabled(self):
spider = MockSpider(concurrent_requests_per_domain=2)
engine = _make_engine(spider=spider)
limiter = engine._rate_limiter("example.com")
assert limiter is not engine._global_limiter
assert limiter.total_tokens == 2
def test_same_domain_returns_same_limiter(self):
spider = MockSpider(concurrent_requests_per_domain=2)
engine = _make_engine(spider=spider)
l1 = engine._rate_limiter("example.com")
l2 = engine._rate_limiter("example.com")
assert l1 is l2
def test_different_domains_get_different_limiters(self):
spider = MockSpider(concurrent_requests_per_domain=2)
engine = _make_engine(spider=spider)
l1 = engine._rate_limiter("a.com")
l2 = engine._rate_limiter("b.com")
assert l1 is not l2
# ---------------------------------------------------------------------------
# Tests: _normalize_request
# ---------------------------------------------------------------------------
class TestNormalizeRequest:
def test_sets_default_sid_when_empty(self):
engine = _make_engine()
request = Request("https://example.com")
assert request.sid == ""
engine._normalize_request(request)
assert request.sid == "default"
def test_preserves_existing_sid(self):
engine = _make_engine()
request = Request("https://example.com", sid="custom")
engine._normalize_request(request)
assert request.sid == "custom"
# ---------------------------------------------------------------------------
# Tests: _process_request
# ---------------------------------------------------------------------------
class TestProcessRequest:
@pytest.mark.asyncio
async def test_successful_fetch_updates_stats(self):
spider = MockSpider()
session = MockSession(response=MockResponse(status=200, body=b"hello"))
engine = _make_engine(spider=spider, session=session)
request = Request("https://example.com", sid="default")
await engine._process_request(request)
assert engine.stats.requests_count == 1
assert engine.stats.response_bytes == 5 # len(b"hello") from MockSession
assert "status_200" in engine.stats.response_status_count
@pytest.mark.asyncio
async def test_failed_fetch_increments_failed_count(self):
spider = MockSpider()
sm = SessionManager()
sm.add("default", ErrorSession())
engine = CrawlerEngine(spider, sm)
request = Request("https://example.com", sid="default")
await engine._process_request(request)
assert engine.stats.failed_requests_count == 1
assert len(spider.on_error_calls) == 1
@pytest.mark.asyncio
async def test_failed_fetch_does_not_increment_requests_count(self):
spider = MockSpider()
sm = SessionManager()
sm.add("default", ErrorSession())
engine = CrawlerEngine(spider, sm)
request = Request("https://example.com", sid="default")
await engine._process_request(request)
assert engine.stats.requests_count == 0
@pytest.mark.asyncio
async def test_blocked_response_triggers_retry(self):
spider = MockSpider(is_blocked_fn=lambda r: True, max_blocked_retries=2)
engine = _make_engine(spider=spider)
request = Request("https://example.com", sid="default")
await engine._process_request(request)
assert engine.stats.blocked_requests_count == 1
# A retry request should be enqueued
assert not engine.scheduler.is_empty
@pytest.mark.asyncio
async def test_blocked_response_max_retries_exceeded(self):
spider = MockSpider(is_blocked_fn=lambda r: True, max_blocked_retries=2)
engine = _make_engine(spider=spider)
request = Request("https://example.com", sid="default")
request._retry_count = 2 # Already at max
await engine._process_request(request)
assert engine.stats.blocked_requests_count == 1
# No retry enqueued
assert engine.scheduler.is_empty
@pytest.mark.asyncio
async def test_retry_request_has_dont_filter(self):
spider = MockSpider(is_blocked_fn=lambda r: True, max_blocked_retries=3)
engine = _make_engine(spider=spider)
request = Request("https://example.com", sid="default")
await engine._process_request(request)
retry = await engine.scheduler.dequeue()
assert retry.dont_filter is True
assert retry._retry_count == 1
@pytest.mark.asyncio
async def test_retry_clears_proxy_kwargs(self):
spider = MockSpider(is_blocked_fn=lambda r: True, max_blocked_retries=3)
engine = _make_engine(spider=spider)
request = Request("https://example.com", sid="default", proxy="http://proxy:8080")
await engine._process_request(request)
retry = await engine.scheduler.dequeue()
assert "proxy" not in retry._session_kwargs
assert "proxies" not in retry._session_kwargs
@pytest.mark.asyncio
async def test_callback_yielding_dict_increments_items(self):
spider = MockSpider()
engine = _make_engine(spider=spider)
request = Request("https://example.com", sid="default")
await engine._process_request(request)
assert engine.stats.items_scraped == 1
assert len(engine.items) == 1
@pytest.mark.asyncio
async def test_callback_yielding_request_enqueues(self):
async def callback(response) -> AsyncGenerator:
yield Request("https://example.com/page2", sid="default")
spider = MockSpider()
engine = _make_engine(spider=spider)
request = Request("https://example.com", sid="default", callback=callback)
await engine._process_request(request)
assert not engine.scheduler.is_empty
@pytest.mark.asyncio
async def test_callback_yielding_offsite_request_filtered(self):
async def callback(response) -> AsyncGenerator:
yield Request("https://other.com/page", sid="default")
spider = MockSpider(allowed_domains={"example.com"})
engine = _make_engine(spider=spider)
request = Request("https://example.com", sid="default", callback=callback)
await engine._process_request(request)
assert engine.stats.offsite_requests_count == 1
assert engine.scheduler.is_empty
@pytest.mark.asyncio
async def test_dropped_item_when_on_scraped_item_returns_none(self):
spider = MockSpider(on_scraped_item_fn=lambda item: None)
engine = _make_engine(spider=spider)
request = Request("https://example.com", sid="default")
await engine._process_request(request)
assert engine.stats.items_dropped == 1
assert engine.stats.items_scraped == 0
assert len(engine.items) == 0
@pytest.mark.asyncio
async def test_callback_exception_calls_on_error(self):
async def bad_callback(response) -> AsyncGenerator:
raise ValueError("callback boom")
yield # noqa: unreachable
spider = MockSpider()
engine = _make_engine(spider=spider)
request = Request("https://example.com", sid="default", callback=bad_callback)
await engine._process_request(request)
assert len(spider.on_error_calls) == 1
assert isinstance(spider.on_error_calls[0][1], ValueError)
@pytest.mark.asyncio
async def test_proxy_tracked_in_stats(self):
spider = MockSpider()
engine = _make_engine(spider=spider)
request = Request("https://example.com", sid="default", proxy="http://p:8080")
await engine._process_request(request)
assert "http://p:8080" in engine.stats.proxies
@pytest.mark.asyncio
async def test_proxies_dict_tracked_in_stats(self):
spider = MockSpider()
engine = _make_engine(spider=spider)
proxies = {"http": "http://p:8080", "https": "https://p:8443"}
request = Request("https://example.com", sid="default", proxies=proxies)
await engine._process_request(request)
assert len(engine.stats.proxies) == 1
assert engine.stats.proxies[0] == proxies
@pytest.mark.asyncio
async def test_uses_parse_when_no_callback(self):
items_seen = []
async def custom_parse(response) -> AsyncGenerator:
yield {"from": "custom_parse"}
spider = MockSpider()
spider.parse = custom_parse # type: ignore[assignment]
engine = _make_engine(spider=spider)
request = Request("https://example.com", sid="default")
# No callback set → should use spider.parse
await engine._process_request(request)
assert engine.stats.items_scraped == 1
# ---------------------------------------------------------------------------
# Tests: _task_wrapper
# ---------------------------------------------------------------------------
class TestTaskWrapper:
@pytest.mark.asyncio
async def test_decrements_active_tasks(self):
engine = _make_engine()
engine._active_tasks = 1
request = Request("https://example.com", sid="default")
await engine._task_wrapper(request)
assert engine._active_tasks == 0
@pytest.mark.asyncio
async def test_decrements_even_on_error(self):
spider = MockSpider()
sm = SessionManager()
sm.add("default", ErrorSession())
engine = CrawlerEngine(spider, sm)
engine._active_tasks = 1
request = Request("https://example.com", sid="default")
await engine._task_wrapper(request)
assert engine._active_tasks == 0
# ---------------------------------------------------------------------------
# Tests: request_pause
# ---------------------------------------------------------------------------
class TestRequestPause:
def test_first_call_sets_pause_requested(self):
engine = _make_engine()
engine.request_pause()
assert engine._pause_requested is True
assert engine._force_stop is False
def test_second_call_sets_force_stop(self):
engine = _make_engine()
engine.request_pause() # first
engine.request_pause() # second
assert engine._pause_requested is True
assert engine._force_stop is True
def test_third_call_after_force_stop_is_noop(self):
engine = _make_engine()
engine.request_pause()
engine.request_pause()
engine.request_pause() # should not raise
assert engine._force_stop is True
# ---------------------------------------------------------------------------
# Tests: checkpoint methods
# ---------------------------------------------------------------------------
class TestCheckpointMethods:
def test_is_checkpoint_time_false_when_disabled(self):
engine = _make_engine() # no crawldir
assert engine._is_checkpoint_time() is False
@pytest.mark.asyncio
async def test_save_and_restore_checkpoint(self):
with tempfile.TemporaryDirectory() as tmpdir:
spider = MockSpider()
engine = _make_engine(spider=spider, crawldir=tmpdir)
# Enqueue a request so snapshot has data
req = Request("https://example.com", sid="default")
engine._normalize_request(req)
await engine.scheduler.enqueue(req)
await engine._save_checkpoint()
# Verify checkpoint file exists
checkpoint_path = Path(tmpdir) / "checkpoint.pkl"
assert checkpoint_path.exists()
@pytest.mark.asyncio
async def test_restore_when_no_checkpoint_returns_false(self):
with tempfile.TemporaryDirectory() as tmpdir:
engine = _make_engine(crawldir=tmpdir)
result = await engine._restore_from_checkpoint()
assert result is False
@pytest.mark.asyncio
async def test_restore_from_checkpoint_raises_when_disabled(self):
engine = _make_engine() # no crawldir → checkpoint disabled
with pytest.raises(RuntimeError):
await engine._restore_from_checkpoint()
# ---------------------------------------------------------------------------
# Tests: crawl
# ---------------------------------------------------------------------------
class TestCrawl:
@pytest.mark.asyncio
async def test_basic_crawl_returns_stats(self):
spider = MockSpider()
engine = _make_engine(spider=spider)
stats = await engine.crawl()
assert isinstance(stats, CrawlStats)
assert stats.requests_count >= 1
assert stats.items_scraped >= 1
@pytest.mark.asyncio
async def test_crawl_calls_on_start_and_on_close(self):
spider = MockSpider()
engine = _make_engine(spider=spider)
await engine.crawl()
assert len(spider.on_start_calls) == 1
assert spider.on_start_calls[0]["resuming"] is False
assert spider.on_close_calls == 1
@pytest.mark.asyncio
async def test_crawl_sets_stats_timing(self):
spider = MockSpider()
engine = _make_engine(spider=spider)
stats = await engine.crawl()
assert stats.start_time > 0
assert stats.end_time > 0
assert stats.end_time >= stats.start_time
@pytest.mark.asyncio
async def test_crawl_sets_concurrency_stats(self):
spider = MockSpider(concurrent_requests=16, concurrent_requests_per_domain=4)
engine = _make_engine(spider=spider)
stats = await engine.crawl()
assert stats.concurrent_requests == 16
assert stats.concurrent_requests_per_domain == 4
@pytest.mark.asyncio
async def test_crawl_processes_multiple_start_urls(self):
spider = MockSpider()
urls = ["https://example.com/1", "https://example.com/2", "https://example.com/3"]
async def multi_start_requests() -> AsyncGenerator[Request, None]:
for url in urls:
yield Request(url, sid="default")
spider.start_requests = multi_start_requests # type: ignore[assignment]
engine = _make_engine(spider=spider)
stats = await engine.crawl()
assert stats.requests_count == 3
assert stats.items_scraped == 3
@pytest.mark.asyncio
async def test_crawl_follows_yielded_requests(self):
"""Test that requests yielded from callbacks are processed."""
call_count = 0
async def parse_with_follow(response) -> AsyncGenerator:
nonlocal call_count
call_count += 1
if call_count == 1:
yield Request("https://example.com/page2", sid="default")
yield {"page": str(response)}
spider = MockSpider()
spider.parse = parse_with_follow # type: ignore[assignment]
engine = _make_engine(spider=spider)
stats = await engine.crawl()
assert stats.requests_count == 2
assert stats.items_scraped == 2
@pytest.mark.asyncio
async def test_crawl_with_download_delay(self):
spider = MockSpider(download_delay=0.01)
engine = _make_engine(spider=spider)
stats = await engine.crawl()
assert stats.download_delay == 0.01
assert stats.requests_count >= 1
@pytest.mark.asyncio
async def test_crawl_filters_offsite_requests(self):
async def parse_offsite(response) -> AsyncGenerator:
yield Request("https://other-domain.com/page", sid="default")
yield {"url": str(response)}
spider = MockSpider(allowed_domains={"example.com"})
spider.parse = parse_offsite # type: ignore[assignment]
engine = _make_engine(spider=spider)
stats = await engine.crawl()
assert stats.offsite_requests_count == 1
assert stats.requests_count == 1 # Only the initial request
@pytest.mark.asyncio
async def test_crawl_cleans_up_checkpoint_on_completion(self):
with tempfile.TemporaryDirectory() as tmpdir:
spider = MockSpider()
engine = _make_engine(spider=spider, crawldir=tmpdir)
await engine.crawl()
checkpoint_path = Path(tmpdir) / "checkpoint.pkl"
assert not checkpoint_path.exists() # Cleaned up
@pytest.mark.asyncio
async def test_crawl_handles_fetch_error_gracefully(self):
spider = MockSpider()
sm = SessionManager()
sm.add("default", ErrorSession())
engine = CrawlerEngine(spider, sm)
stats = await engine.crawl()
assert stats.failed_requests_count == 1
assert len(spider.on_error_calls) == 1
@pytest.mark.asyncio
async def test_crawl_log_levels_populated(self):
spider = MockSpider()
engine = _make_engine(spider=spider)
stats = await engine.crawl()
assert isinstance(stats.log_levels_counter, dict)
@pytest.mark.asyncio
async def test_crawl_resets_state_on_each_run(self):
spider = MockSpider()
engine = _make_engine(spider=spider)
# Run first crawl
await engine.crawl()
assert engine.stats.requests_count >= 1
# Run second crawl - stats should reset
stats = await engine.crawl()
# Items are cleared on each crawl
assert engine.paused is False
# ---------------------------------------------------------------------------
# Tests: items property
# ---------------------------------------------------------------------------
class TestItemsProperty:
def test_items_returns_item_list(self):
engine = _make_engine()
assert isinstance(engine.items, ItemList)
def test_items_initially_empty(self):
engine = _make_engine()
assert len(engine.items) == 0
@pytest.mark.asyncio
async def test_items_populated_after_crawl(self):
engine = _make_engine()
await engine.crawl()
assert len(engine.items) >= 1
# ---------------------------------------------------------------------------
# Tests: streaming (__aiter__ / _stream)
# ---------------------------------------------------------------------------
class TestStreaming:
@pytest.mark.asyncio
async def test_stream_yields_items(self):
spider = MockSpider()
engine = _make_engine(spider=spider)
items = []
async for item in engine:
items.append(item)
assert len(items) >= 1
assert isinstance(items[0], dict)
@pytest.mark.asyncio
async def test_stream_processes_follow_up_requests(self):
call_count = 0
async def parse_with_follow(response) -> AsyncGenerator:
nonlocal call_count
call_count += 1
if call_count == 1:
yield Request("https://example.com/page2", sid="default")
yield {"page": call_count}
spider = MockSpider()
spider.parse = parse_with_follow # type: ignore[assignment]
engine = _make_engine(spider=spider)
items = []
async for item in engine:
items.append(item)
assert len(items) == 2
@pytest.mark.asyncio
async def test_stream_items_not_stored_in_items_list(self):
"""When streaming, items go to the stream, not to engine._items."""
spider = MockSpider()
engine = _make_engine(spider=spider)
items = []
async for item in engine:
items.append(item)
# Items were sent through stream, not appended to _items
assert len(items) >= 1
assert len(engine.items) == 0
# ---------------------------------------------------------------------------
# Tests: pause during crawl
# ---------------------------------------------------------------------------
class TestPauseDuringCrawl:
@pytest.mark.asyncio
async def test_pause_stops_crawl_gracefully(self):
processed = 0
async def slow_parse(response) -> AsyncGenerator:
nonlocal processed
processed += 1
# Yield more requests to keep the crawl going
if processed <= 2:
yield Request(f"https://example.com/p{processed + 1}", sid="default")
yield {"n": processed}
spider = MockSpider()
spider.parse = slow_parse # type: ignore[assignment]
engine = _make_engine(spider=spider)
# Request pause immediately - the engine will stop as soon as active tasks complete
engine._pause_requested = True
stats = await engine.crawl()
# Should stop without processing everything
assert engine._running is False
@pytest.mark.asyncio
async def test_pause_with_checkpoint_sets_paused(self):
with tempfile.TemporaryDirectory() as tmpdir:
parse_count = 0
async def parse_and_pause(response) -> AsyncGenerator:
nonlocal parse_count
parse_count += 1
# Request pause after first request, but yield follow-ups
if parse_count == 1:
engine.request_pause()
yield Request("https://example.com/p2", sid="default")
yield {"n": parse_count}
spider = MockSpider()
spider.parse = parse_and_pause # type: ignore[assignment]
engine = _make_engine(spider=spider, crawldir=tmpdir)
await engine.crawl()
assert engine.paused is True
@pytest.mark.asyncio
async def test_pause_without_checkpoint_does_not_set_paused(self):
spider = MockSpider()
engine = _make_engine(spider=spider)
engine._pause_requested = True
await engine.crawl()
assert engine.paused is False