Spaces:
Paused
Paused
| """Tests for the CrawlerEngine class.""" | |
| import tempfile | |
| from pathlib import Path | |
| import anyio | |
| import pytest | |
| from scrapling.spiders.engine import CrawlerEngine, _dump | |
| from scrapling.spiders.request import Request | |
| from scrapling.spiders.session import SessionManager | |
| from scrapling.spiders.result import CrawlStats, ItemList | |
| from scrapling.spiders.checkpoint import CheckpointData | |
| from scrapling.core._types import Any, Dict, Set, AsyncGenerator | |
| # --------------------------------------------------------------------------- | |
| # Mock helpers | |
| # --------------------------------------------------------------------------- | |
| class MockResponse: | |
| """Minimal Response stand-in.""" | |
| def __init__(self, status: int = 200, body: bytes = b"ok", url: str = "https://example.com"): | |
| self.status = status | |
| self.body = body | |
| self.url = url | |
| self.request: Any = None | |
| self.meta: Dict[str, Any] = {} | |
| def __str__(self) -> str: | |
| return self.url | |
| class MockSession: | |
| """Mock session that returns a canned response.""" | |
| def __init__(self, name: str = "mock", response: MockResponse | None = None): | |
| self.name = name | |
| self._is_alive = False | |
| self._response = response or MockResponse() | |
| self.fetch_calls: list[dict] = [] | |
| async def __aenter__(self): | |
| self._is_alive = True | |
| return self | |
| async def __aexit__(self, *args): | |
| self._is_alive = False | |
| async def fetch(self, url: str, **kwargs): | |
| self.fetch_calls.append({"url": url, **kwargs}) | |
| resp = MockResponse(status=self._response.status, body=self._response.body, url=url) | |
| return resp | |
| class ErrorSession(MockSession): | |
| """Session that raises on fetch.""" | |
| def __init__(self, error: Exception | None = None): | |
| super().__init__("error") | |
| self._error = error or RuntimeError("fetch failed") | |
| async def fetch(self, url: str, **kwargs): | |
| raise self._error | |
| class MockSpider: | |
| """Lightweight spider stub for engine tests.""" | |
| def __init__( | |
| self, | |
| *, | |
| concurrent_requests: int = 4, | |
| concurrent_requests_per_domain: int = 0, | |
| download_delay: float = 0.0, | |
| max_blocked_retries: int = 3, | |
| allowed_domains: Set[str] | None = None, | |
| fp_include_kwargs: bool = False, | |
| fp_include_headers: bool = False, | |
| fp_keep_fragments: bool = False, | |
| is_blocked_fn=None, | |
| on_scraped_item_fn=None, | |
| retry_blocked_request_fn=None, | |
| ): | |
| self.concurrent_requests = concurrent_requests | |
| self.concurrent_requests_per_domain = concurrent_requests_per_domain | |
| self.download_delay = download_delay | |
| self.max_blocked_retries = max_blocked_retries | |
| self.allowed_domains = allowed_domains or set() | |
| self.fp_include_kwargs = fp_include_kwargs | |
| self.fp_include_headers = fp_include_headers | |
| self.fp_keep_fragments = fp_keep_fragments | |
| self.name = "test_spider" | |
| # Tracking lists | |
| self.on_start_calls: list[dict] = [] | |
| self.on_close_calls: int = 0 | |
| self.on_error_calls: list[tuple[Request, Exception]] = [] | |
| self.scraped_items: list[dict] = [] | |
| self.blocked_responses: list = [] | |
| self.retry_requests: list = [] | |
| # Pluggable behaviour | |
| self._is_blocked_fn = is_blocked_fn | |
| self._on_scraped_item_fn = on_scraped_item_fn | |
| self._retry_blocked_request_fn = retry_blocked_request_fn | |
| # Log counter stub | |
| self._log_counter = _LogCounterStub() | |
| async def parse(self, response) -> AsyncGenerator[Dict[str, Any] | Request | None, None]: | |
| yield {"url": str(response)} | |
| async def on_start(self, resuming: bool = False) -> None: | |
| self.on_start_calls.append({"resuming": resuming}) | |
| async def on_close(self) -> None: | |
| self.on_close_calls += 1 | |
| async def on_error(self, request: Request, error: Exception) -> None: | |
| self.on_error_calls.append((request, error)) | |
| async def on_scraped_item(self, item: Dict[str, Any]) -> Dict[str, Any] | None: | |
| if self._on_scraped_item_fn: | |
| return self._on_scraped_item_fn(item) | |
| self.scraped_items.append(item) | |
| return item | |
| async def is_blocked(self, response) -> bool: | |
| if self._is_blocked_fn: | |
| return self._is_blocked_fn(response) | |
| return False | |
| async def retry_blocked_request(self, request: Request, response) -> Request: | |
| self.retry_requests.append(request) | |
| if self._retry_blocked_request_fn: | |
| return self._retry_blocked_request_fn(request, response) | |
| return request | |
| async def start_requests(self) -> AsyncGenerator[Request, None]: | |
| yield Request("https://example.com", sid="default") | |
| class _LogCounterStub: | |
| """Stub for LogCounterHandler.""" | |
| def get_counts(self) -> Dict[str, int]: | |
| return {"debug": 0, "info": 0, "warning": 0, "error": 0, "critical": 0} | |
| def _make_engine( | |
| spider: MockSpider | None = None, | |
| session: MockSession | None = None, | |
| crawldir: str | None = None, | |
| interval: float = 300.0, | |
| ) -> CrawlerEngine: | |
| """Create a CrawlerEngine wired to mock objects.""" | |
| spider = spider or MockSpider() | |
| sm = SessionManager() | |
| sm.add("default", session or MockSession()) | |
| return CrawlerEngine(spider, sm, crawldir=crawldir, interval=interval) | |
| # --------------------------------------------------------------------------- | |
| # Tests: _dump helper | |
| # --------------------------------------------------------------------------- | |
| class TestDumpHelper: | |
| def test_dump_returns_json_string(self): | |
| result = _dump({"key": "value"}) | |
| assert '"key": "value"' in result | |
| def test_dump_handles_nested(self): | |
| result = _dump({"a": {"b": 1}}) | |
| assert '"a"' in result | |
| assert '"b"' in result | |
| # --------------------------------------------------------------------------- | |
| # Tests: __init__ | |
| # --------------------------------------------------------------------------- | |
| class TestCrawlerEngineInit: | |
| def test_default_initialisation(self): | |
| engine = _make_engine() | |
| assert engine._running is False | |
| assert engine._active_tasks == 0 | |
| assert engine._pause_requested is False | |
| assert engine._force_stop is False | |
| assert engine.paused is False | |
| assert isinstance(engine.stats, CrawlStats) | |
| assert isinstance(engine.items, ItemList) | |
| def test_checkpoint_system_disabled_by_default(self): | |
| engine = _make_engine() | |
| assert engine._checkpoint_system_enabled is False | |
| def test_checkpoint_system_enabled_with_crawldir(self): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| engine = _make_engine(crawldir=tmpdir) | |
| assert engine._checkpoint_system_enabled is True | |
| def test_global_limiter_uses_concurrent_requests(self): | |
| spider = MockSpider(concurrent_requests=8) | |
| engine = _make_engine(spider=spider) | |
| assert engine._global_limiter.total_tokens == 8 | |
| def test_allowed_domains_from_spider(self): | |
| spider = MockSpider(allowed_domains={"example.com", "test.org"}) | |
| engine = _make_engine(spider=spider) | |
| assert engine._allowed_domains == {"example.com", "test.org"} | |
| # --------------------------------------------------------------------------- | |
| # Tests: _is_domain_allowed | |
| # --------------------------------------------------------------------------- | |
| class TestIsDomainAllowed: | |
| def test_all_allowed_when_empty(self): | |
| engine = _make_engine() | |
| request = Request("https://anything.com/page") | |
| assert engine._is_domain_allowed(request) is True | |
| def test_exact_domain_match(self): | |
| spider = MockSpider(allowed_domains={"example.com"}) | |
| engine = _make_engine(spider=spider) | |
| assert engine._is_domain_allowed(Request("https://example.com/page")) is True | |
| assert engine._is_domain_allowed(Request("https://other.com/page")) is False | |
| def test_subdomain_match(self): | |
| spider = MockSpider(allowed_domains={"example.com"}) | |
| engine = _make_engine(spider=spider) | |
| assert engine._is_domain_allowed(Request("https://sub.example.com/page")) is True | |
| assert engine._is_domain_allowed(Request("https://deep.sub.example.com/x")) is True | |
| def test_partial_name_not_matched(self): | |
| spider = MockSpider(allowed_domains={"example.com"}) | |
| engine = _make_engine(spider=spider) | |
| # "notexample.com" should NOT match "example.com" | |
| assert engine._is_domain_allowed(Request("https://notexample.com/x")) is False | |
| def test_multiple_allowed_domains(self): | |
| spider = MockSpider(allowed_domains={"a.com", "b.org"}) | |
| engine = _make_engine(spider=spider) | |
| assert engine._is_domain_allowed(Request("https://a.com/")) is True | |
| assert engine._is_domain_allowed(Request("https://b.org/")) is True | |
| assert engine._is_domain_allowed(Request("https://c.net/")) is False | |
| # --------------------------------------------------------------------------- | |
| # Tests: _rate_limiter | |
| # --------------------------------------------------------------------------- | |
| class TestRateLimiter: | |
| def test_returns_global_limiter_when_per_domain_disabled(self): | |
| engine = _make_engine() # concurrent_requests_per_domain=0 | |
| limiter = engine._rate_limiter("example.com") | |
| assert limiter is engine._global_limiter | |
| def test_returns_per_domain_limiter_when_enabled(self): | |
| spider = MockSpider(concurrent_requests_per_domain=2) | |
| engine = _make_engine(spider=spider) | |
| limiter = engine._rate_limiter("example.com") | |
| assert limiter is not engine._global_limiter | |
| assert limiter.total_tokens == 2 | |
| def test_same_domain_returns_same_limiter(self): | |
| spider = MockSpider(concurrent_requests_per_domain=2) | |
| engine = _make_engine(spider=spider) | |
| l1 = engine._rate_limiter("example.com") | |
| l2 = engine._rate_limiter("example.com") | |
| assert l1 is l2 | |
| def test_different_domains_get_different_limiters(self): | |
| spider = MockSpider(concurrent_requests_per_domain=2) | |
| engine = _make_engine(spider=spider) | |
| l1 = engine._rate_limiter("a.com") | |
| l2 = engine._rate_limiter("b.com") | |
| assert l1 is not l2 | |
| # --------------------------------------------------------------------------- | |
| # Tests: _normalize_request | |
| # --------------------------------------------------------------------------- | |
| class TestNormalizeRequest: | |
| def test_sets_default_sid_when_empty(self): | |
| engine = _make_engine() | |
| request = Request("https://example.com") | |
| assert request.sid == "" | |
| engine._normalize_request(request) | |
| assert request.sid == "default" | |
| def test_preserves_existing_sid(self): | |
| engine = _make_engine() | |
| request = Request("https://example.com", sid="custom") | |
| engine._normalize_request(request) | |
| assert request.sid == "custom" | |
| # --------------------------------------------------------------------------- | |
| # Tests: _process_request | |
| # --------------------------------------------------------------------------- | |
| class TestProcessRequest: | |
| async def test_successful_fetch_updates_stats(self): | |
| spider = MockSpider() | |
| session = MockSession(response=MockResponse(status=200, body=b"hello")) | |
| engine = _make_engine(spider=spider, session=session) | |
| request = Request("https://example.com", sid="default") | |
| await engine._process_request(request) | |
| assert engine.stats.requests_count == 1 | |
| assert engine.stats.response_bytes == 5 # len(b"hello") from MockSession | |
| assert "status_200" in engine.stats.response_status_count | |
| async def test_failed_fetch_increments_failed_count(self): | |
| spider = MockSpider() | |
| sm = SessionManager() | |
| sm.add("default", ErrorSession()) | |
| engine = CrawlerEngine(spider, sm) | |
| request = Request("https://example.com", sid="default") | |
| await engine._process_request(request) | |
| assert engine.stats.failed_requests_count == 1 | |
| assert len(spider.on_error_calls) == 1 | |
| async def test_failed_fetch_does_not_increment_requests_count(self): | |
| spider = MockSpider() | |
| sm = SessionManager() | |
| sm.add("default", ErrorSession()) | |
| engine = CrawlerEngine(spider, sm) | |
| request = Request("https://example.com", sid="default") | |
| await engine._process_request(request) | |
| assert engine.stats.requests_count == 0 | |
| async def test_blocked_response_triggers_retry(self): | |
| spider = MockSpider(is_blocked_fn=lambda r: True, max_blocked_retries=2) | |
| engine = _make_engine(spider=spider) | |
| request = Request("https://example.com", sid="default") | |
| await engine._process_request(request) | |
| assert engine.stats.blocked_requests_count == 1 | |
| # A retry request should be enqueued | |
| assert not engine.scheduler.is_empty | |
| async def test_blocked_response_max_retries_exceeded(self): | |
| spider = MockSpider(is_blocked_fn=lambda r: True, max_blocked_retries=2) | |
| engine = _make_engine(spider=spider) | |
| request = Request("https://example.com", sid="default") | |
| request._retry_count = 2 # Already at max | |
| await engine._process_request(request) | |
| assert engine.stats.blocked_requests_count == 1 | |
| # No retry enqueued | |
| assert engine.scheduler.is_empty | |
| async def test_retry_request_has_dont_filter(self): | |
| spider = MockSpider(is_blocked_fn=lambda r: True, max_blocked_retries=3) | |
| engine = _make_engine(spider=spider) | |
| request = Request("https://example.com", sid="default") | |
| await engine._process_request(request) | |
| retry = await engine.scheduler.dequeue() | |
| assert retry.dont_filter is True | |
| assert retry._retry_count == 1 | |
| async def test_retry_clears_proxy_kwargs(self): | |
| spider = MockSpider(is_blocked_fn=lambda r: True, max_blocked_retries=3) | |
| engine = _make_engine(spider=spider) | |
| request = Request("https://example.com", sid="default", proxy="http://proxy:8080") | |
| await engine._process_request(request) | |
| retry = await engine.scheduler.dequeue() | |
| assert "proxy" not in retry._session_kwargs | |
| assert "proxies" not in retry._session_kwargs | |
| async def test_callback_yielding_dict_increments_items(self): | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| request = Request("https://example.com", sid="default") | |
| await engine._process_request(request) | |
| assert engine.stats.items_scraped == 1 | |
| assert len(engine.items) == 1 | |
| async def test_callback_yielding_request_enqueues(self): | |
| async def callback(response) -> AsyncGenerator: | |
| yield Request("https://example.com/page2", sid="default") | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| request = Request("https://example.com", sid="default", callback=callback) | |
| await engine._process_request(request) | |
| assert not engine.scheduler.is_empty | |
| async def test_callback_yielding_offsite_request_filtered(self): | |
| async def callback(response) -> AsyncGenerator: | |
| yield Request("https://other.com/page", sid="default") | |
| spider = MockSpider(allowed_domains={"example.com"}) | |
| engine = _make_engine(spider=spider) | |
| request = Request("https://example.com", sid="default", callback=callback) | |
| await engine._process_request(request) | |
| assert engine.stats.offsite_requests_count == 1 | |
| assert engine.scheduler.is_empty | |
| async def test_dropped_item_when_on_scraped_item_returns_none(self): | |
| spider = MockSpider(on_scraped_item_fn=lambda item: None) | |
| engine = _make_engine(spider=spider) | |
| request = Request("https://example.com", sid="default") | |
| await engine._process_request(request) | |
| assert engine.stats.items_dropped == 1 | |
| assert engine.stats.items_scraped == 0 | |
| assert len(engine.items) == 0 | |
| async def test_callback_exception_calls_on_error(self): | |
| async def bad_callback(response) -> AsyncGenerator: | |
| raise ValueError("callback boom") | |
| yield # noqa: unreachable | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| request = Request("https://example.com", sid="default", callback=bad_callback) | |
| await engine._process_request(request) | |
| assert len(spider.on_error_calls) == 1 | |
| assert isinstance(spider.on_error_calls[0][1], ValueError) | |
| async def test_proxy_tracked_in_stats(self): | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| request = Request("https://example.com", sid="default", proxy="http://p:8080") | |
| await engine._process_request(request) | |
| assert "http://p:8080" in engine.stats.proxies | |
| async def test_proxies_dict_tracked_in_stats(self): | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| proxies = {"http": "http://p:8080", "https": "https://p:8443"} | |
| request = Request("https://example.com", sid="default", proxies=proxies) | |
| await engine._process_request(request) | |
| assert len(engine.stats.proxies) == 1 | |
| assert engine.stats.proxies[0] == proxies | |
| async def test_uses_parse_when_no_callback(self): | |
| items_seen = [] | |
| async def custom_parse(response) -> AsyncGenerator: | |
| yield {"from": "custom_parse"} | |
| spider = MockSpider() | |
| spider.parse = custom_parse # type: ignore[assignment] | |
| engine = _make_engine(spider=spider) | |
| request = Request("https://example.com", sid="default") | |
| # No callback set → should use spider.parse | |
| await engine._process_request(request) | |
| assert engine.stats.items_scraped == 1 | |
| # --------------------------------------------------------------------------- | |
| # Tests: _task_wrapper | |
| # --------------------------------------------------------------------------- | |
| class TestTaskWrapper: | |
| async def test_decrements_active_tasks(self): | |
| engine = _make_engine() | |
| engine._active_tasks = 1 | |
| request = Request("https://example.com", sid="default") | |
| await engine._task_wrapper(request) | |
| assert engine._active_tasks == 0 | |
| async def test_decrements_even_on_error(self): | |
| spider = MockSpider() | |
| sm = SessionManager() | |
| sm.add("default", ErrorSession()) | |
| engine = CrawlerEngine(spider, sm) | |
| engine._active_tasks = 1 | |
| request = Request("https://example.com", sid="default") | |
| await engine._task_wrapper(request) | |
| assert engine._active_tasks == 0 | |
| # --------------------------------------------------------------------------- | |
| # Tests: request_pause | |
| # --------------------------------------------------------------------------- | |
| class TestRequestPause: | |
| def test_first_call_sets_pause_requested(self): | |
| engine = _make_engine() | |
| engine.request_pause() | |
| assert engine._pause_requested is True | |
| assert engine._force_stop is False | |
| def test_second_call_sets_force_stop(self): | |
| engine = _make_engine() | |
| engine.request_pause() # first | |
| engine.request_pause() # second | |
| assert engine._pause_requested is True | |
| assert engine._force_stop is True | |
| def test_third_call_after_force_stop_is_noop(self): | |
| engine = _make_engine() | |
| engine.request_pause() | |
| engine.request_pause() | |
| engine.request_pause() # should not raise | |
| assert engine._force_stop is True | |
| # --------------------------------------------------------------------------- | |
| # Tests: checkpoint methods | |
| # --------------------------------------------------------------------------- | |
| class TestCheckpointMethods: | |
| def test_is_checkpoint_time_false_when_disabled(self): | |
| engine = _make_engine() # no crawldir | |
| assert engine._is_checkpoint_time() is False | |
| async def test_save_and_restore_checkpoint(self): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider, crawldir=tmpdir) | |
| # Enqueue a request so snapshot has data | |
| req = Request("https://example.com", sid="default") | |
| engine._normalize_request(req) | |
| await engine.scheduler.enqueue(req) | |
| await engine._save_checkpoint() | |
| # Verify checkpoint file exists | |
| checkpoint_path = Path(tmpdir) / "checkpoint.pkl" | |
| assert checkpoint_path.exists() | |
| async def test_restore_when_no_checkpoint_returns_false(self): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| engine = _make_engine(crawldir=tmpdir) | |
| result = await engine._restore_from_checkpoint() | |
| assert result is False | |
| async def test_restore_from_checkpoint_raises_when_disabled(self): | |
| engine = _make_engine() # no crawldir → checkpoint disabled | |
| with pytest.raises(RuntimeError): | |
| await engine._restore_from_checkpoint() | |
| # --------------------------------------------------------------------------- | |
| # Tests: crawl | |
| # --------------------------------------------------------------------------- | |
| class TestCrawl: | |
| async def test_basic_crawl_returns_stats(self): | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| stats = await engine.crawl() | |
| assert isinstance(stats, CrawlStats) | |
| assert stats.requests_count >= 1 | |
| assert stats.items_scraped >= 1 | |
| async def test_crawl_calls_on_start_and_on_close(self): | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| await engine.crawl() | |
| assert len(spider.on_start_calls) == 1 | |
| assert spider.on_start_calls[0]["resuming"] is False | |
| assert spider.on_close_calls == 1 | |
| async def test_crawl_sets_stats_timing(self): | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| stats = await engine.crawl() | |
| assert stats.start_time > 0 | |
| assert stats.end_time > 0 | |
| assert stats.end_time >= stats.start_time | |
| async def test_crawl_sets_concurrency_stats(self): | |
| spider = MockSpider(concurrent_requests=16, concurrent_requests_per_domain=4) | |
| engine = _make_engine(spider=spider) | |
| stats = await engine.crawl() | |
| assert stats.concurrent_requests == 16 | |
| assert stats.concurrent_requests_per_domain == 4 | |
| async def test_crawl_processes_multiple_start_urls(self): | |
| spider = MockSpider() | |
| urls = ["https://example.com/1", "https://example.com/2", "https://example.com/3"] | |
| async def multi_start_requests() -> AsyncGenerator[Request, None]: | |
| for url in urls: | |
| yield Request(url, sid="default") | |
| spider.start_requests = multi_start_requests # type: ignore[assignment] | |
| engine = _make_engine(spider=spider) | |
| stats = await engine.crawl() | |
| assert stats.requests_count == 3 | |
| assert stats.items_scraped == 3 | |
| async def test_crawl_follows_yielded_requests(self): | |
| """Test that requests yielded from callbacks are processed.""" | |
| call_count = 0 | |
| async def parse_with_follow(response) -> AsyncGenerator: | |
| nonlocal call_count | |
| call_count += 1 | |
| if call_count == 1: | |
| yield Request("https://example.com/page2", sid="default") | |
| yield {"page": str(response)} | |
| spider = MockSpider() | |
| spider.parse = parse_with_follow # type: ignore[assignment] | |
| engine = _make_engine(spider=spider) | |
| stats = await engine.crawl() | |
| assert stats.requests_count == 2 | |
| assert stats.items_scraped == 2 | |
| async def test_crawl_with_download_delay(self): | |
| spider = MockSpider(download_delay=0.01) | |
| engine = _make_engine(spider=spider) | |
| stats = await engine.crawl() | |
| assert stats.download_delay == 0.01 | |
| assert stats.requests_count >= 1 | |
| async def test_crawl_filters_offsite_requests(self): | |
| async def parse_offsite(response) -> AsyncGenerator: | |
| yield Request("https://other-domain.com/page", sid="default") | |
| yield {"url": str(response)} | |
| spider = MockSpider(allowed_domains={"example.com"}) | |
| spider.parse = parse_offsite # type: ignore[assignment] | |
| engine = _make_engine(spider=spider) | |
| stats = await engine.crawl() | |
| assert stats.offsite_requests_count == 1 | |
| assert stats.requests_count == 1 # Only the initial request | |
| async def test_crawl_cleans_up_checkpoint_on_completion(self): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider, crawldir=tmpdir) | |
| await engine.crawl() | |
| checkpoint_path = Path(tmpdir) / "checkpoint.pkl" | |
| assert not checkpoint_path.exists() # Cleaned up | |
| async def test_crawl_handles_fetch_error_gracefully(self): | |
| spider = MockSpider() | |
| sm = SessionManager() | |
| sm.add("default", ErrorSession()) | |
| engine = CrawlerEngine(spider, sm) | |
| stats = await engine.crawl() | |
| assert stats.failed_requests_count == 1 | |
| assert len(spider.on_error_calls) == 1 | |
| async def test_crawl_log_levels_populated(self): | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| stats = await engine.crawl() | |
| assert isinstance(stats.log_levels_counter, dict) | |
| async def test_crawl_resets_state_on_each_run(self): | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| # Run first crawl | |
| await engine.crawl() | |
| assert engine.stats.requests_count >= 1 | |
| # Run second crawl - stats should reset | |
| stats = await engine.crawl() | |
| # Items are cleared on each crawl | |
| assert engine.paused is False | |
| # --------------------------------------------------------------------------- | |
| # Tests: items property | |
| # --------------------------------------------------------------------------- | |
| class TestItemsProperty: | |
| def test_items_returns_item_list(self): | |
| engine = _make_engine() | |
| assert isinstance(engine.items, ItemList) | |
| def test_items_initially_empty(self): | |
| engine = _make_engine() | |
| assert len(engine.items) == 0 | |
| async def test_items_populated_after_crawl(self): | |
| engine = _make_engine() | |
| await engine.crawl() | |
| assert len(engine.items) >= 1 | |
| # --------------------------------------------------------------------------- | |
| # Tests: streaming (__aiter__ / _stream) | |
| # --------------------------------------------------------------------------- | |
| class TestStreaming: | |
| async def test_stream_yields_items(self): | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| items = [] | |
| async for item in engine: | |
| items.append(item) | |
| assert len(items) >= 1 | |
| assert isinstance(items[0], dict) | |
| async def test_stream_processes_follow_up_requests(self): | |
| call_count = 0 | |
| async def parse_with_follow(response) -> AsyncGenerator: | |
| nonlocal call_count | |
| call_count += 1 | |
| if call_count == 1: | |
| yield Request("https://example.com/page2", sid="default") | |
| yield {"page": call_count} | |
| spider = MockSpider() | |
| spider.parse = parse_with_follow # type: ignore[assignment] | |
| engine = _make_engine(spider=spider) | |
| items = [] | |
| async for item in engine: | |
| items.append(item) | |
| assert len(items) == 2 | |
| async def test_stream_items_not_stored_in_items_list(self): | |
| """When streaming, items go to the stream, not to engine._items.""" | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| items = [] | |
| async for item in engine: | |
| items.append(item) | |
| # Items were sent through stream, not appended to _items | |
| assert len(items) >= 1 | |
| assert len(engine.items) == 0 | |
| # --------------------------------------------------------------------------- | |
| # Tests: pause during crawl | |
| # --------------------------------------------------------------------------- | |
| class TestPauseDuringCrawl: | |
| async def test_pause_stops_crawl_gracefully(self): | |
| processed = 0 | |
| async def slow_parse(response) -> AsyncGenerator: | |
| nonlocal processed | |
| processed += 1 | |
| # Yield more requests to keep the crawl going | |
| if processed <= 2: | |
| yield Request(f"https://example.com/p{processed + 1}", sid="default") | |
| yield {"n": processed} | |
| spider = MockSpider() | |
| spider.parse = slow_parse # type: ignore[assignment] | |
| engine = _make_engine(spider=spider) | |
| # Request pause immediately - the engine will stop as soon as active tasks complete | |
| engine._pause_requested = True | |
| stats = await engine.crawl() | |
| # Should stop without processing everything | |
| assert engine._running is False | |
| async def test_pause_with_checkpoint_sets_paused(self): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| parse_count = 0 | |
| async def parse_and_pause(response) -> AsyncGenerator: | |
| nonlocal parse_count | |
| parse_count += 1 | |
| # Request pause after first request, but yield follow-ups | |
| if parse_count == 1: | |
| engine.request_pause() | |
| yield Request("https://example.com/p2", sid="default") | |
| yield {"n": parse_count} | |
| spider = MockSpider() | |
| spider.parse = parse_and_pause # type: ignore[assignment] | |
| engine = _make_engine(spider=spider, crawldir=tmpdir) | |
| await engine.crawl() | |
| assert engine.paused is True | |
| async def test_pause_without_checkpoint_does_not_set_paused(self): | |
| spider = MockSpider() | |
| engine = _make_engine(spider=spider) | |
| engine._pause_requested = True | |
| await engine.crawl() | |
| assert engine.paused is False | |