| """Tests for the Scheduler class.""" |
|
|
| import pytest |
|
|
| from scrapling.spiders.request import Request |
| from scrapling.spiders.scheduler import Scheduler |
| from scrapling.spiders.checkpoint import CheckpointData |
|
|
|
|
| class TestSchedulerInit: |
| """Test Scheduler initialization.""" |
|
|
| def test_scheduler_starts_empty(self): |
| """Test that scheduler starts with empty queue.""" |
| scheduler = Scheduler() |
|
|
| assert len(scheduler) == 0 |
| assert scheduler.is_empty is True |
|
|
|
|
| class TestSchedulerEnqueue: |
| """Test Scheduler enqueue functionality.""" |
|
|
| @pytest.mark.asyncio |
| async def test_enqueue_single_request(self): |
| """Test enqueueing a single request.""" |
| scheduler = Scheduler() |
| request = Request("https://example.com") |
|
|
| result = await scheduler.enqueue(request) |
|
|
| assert result is True |
| assert len(scheduler) == 1 |
| assert scheduler.is_empty is False |
|
|
| @pytest.mark.asyncio |
| async def test_enqueue_multiple_requests(self): |
| """Test enqueueing multiple requests.""" |
| scheduler = Scheduler() |
|
|
| for i in range(5): |
| request = Request(f"https://example.com/{i}") |
| await scheduler.enqueue(request) |
|
|
| assert len(scheduler) == 5 |
|
|
| @pytest.mark.asyncio |
| async def test_enqueue_duplicate_filtered(self): |
| """Test that duplicate requests are filtered by default.""" |
| scheduler = Scheduler() |
|
|
| request1 = Request("https://example.com", sid="s1") |
| request2 = Request("https://example.com", sid="s1") |
|
|
| result1 = await scheduler.enqueue(request1) |
| result2 = await scheduler.enqueue(request2) |
|
|
| assert result1 is True |
| assert result2 is False |
| assert len(scheduler) == 1 |
|
|
| @pytest.mark.asyncio |
| async def test_enqueue_duplicate_allowed_with_dont_filter(self): |
| """Test that dont_filter allows duplicate requests.""" |
| scheduler = Scheduler() |
|
|
| request1 = Request("https://example.com", sid="s1") |
| request2 = Request("https://example.com", sid="s1", dont_filter=True) |
|
|
| result1 = await scheduler.enqueue(request1) |
| result2 = await scheduler.enqueue(request2) |
|
|
| assert result1 is True |
| assert result2 is True |
| assert len(scheduler) == 2 |
|
|
| @pytest.mark.asyncio |
| async def test_enqueue_different_methods_not_duplicate(self): |
| """Test that same URL with different methods are not duplicates.""" |
| scheduler = Scheduler() |
|
|
| request1 = Request("https://example.com", method="GET") |
| request2 = Request("https://example.com", method="POST") |
|
|
| result1 = await scheduler.enqueue(request1) |
| result2 = await scheduler.enqueue(request2) |
|
|
| assert result1 is True |
| assert result2 is True |
| assert len(scheduler) == 2 |
|
|
|
|
| class TestSchedulerDequeue: |
| """Test Scheduler dequeue functionality.""" |
|
|
| @pytest.mark.asyncio |
| async def test_dequeue_returns_request(self): |
| """Test that dequeue returns the enqueued request.""" |
| scheduler = Scheduler() |
| original = Request("https://example.com") |
|
|
| await scheduler.enqueue(original) |
| dequeued = await scheduler.dequeue() |
|
|
| assert dequeued.url == original.url |
|
|
| @pytest.mark.asyncio |
| async def test_dequeue_respects_priority_order(self): |
| """Test that higher priority requests are dequeued first.""" |
| scheduler = Scheduler() |
|
|
| low = Request("https://example.com/low", priority=1) |
| high = Request("https://example.com/high", priority=10) |
| medium = Request("https://example.com/medium", priority=5) |
|
|
| await scheduler.enqueue(low) |
| await scheduler.enqueue(high) |
| await scheduler.enqueue(medium) |
|
|
| |
| first = await scheduler.dequeue() |
| assert first.url == "https://example.com/high" |
|
|
| second = await scheduler.dequeue() |
| assert second.url == "https://example.com/medium" |
|
|
| third = await scheduler.dequeue() |
| assert third.url == "https://example.com/low" |
|
|
| @pytest.mark.asyncio |
| async def test_dequeue_fifo_for_same_priority(self): |
| """Test FIFO ordering for requests with same priority.""" |
| scheduler = Scheduler() |
|
|
| for i in range(3): |
| request = Request(f"https://example.com/{i}", priority=5) |
| await scheduler.enqueue(request) |
|
|
| first = await scheduler.dequeue() |
| second = await scheduler.dequeue() |
| third = await scheduler.dequeue() |
|
|
| |
| assert first.url == "https://example.com/0" |
| assert second.url == "https://example.com/1" |
| assert third.url == "https://example.com/2" |
|
|
| @pytest.mark.asyncio |
| async def test_dequeue_updates_length(self): |
| """Test that dequeue decreases the queue length.""" |
| scheduler = Scheduler() |
|
|
| await scheduler.enqueue(Request("https://example.com/1")) |
| await scheduler.enqueue(Request("https://example.com/2")) |
|
|
| assert len(scheduler) == 2 |
|
|
| await scheduler.dequeue() |
| assert len(scheduler) == 1 |
|
|
| await scheduler.dequeue() |
| assert len(scheduler) == 0 |
| assert scheduler.is_empty is True |
|
|
|
|
| class TestSchedulerSnapshot: |
| """Test Scheduler snapshot functionality for checkpointing.""" |
|
|
| @pytest.mark.asyncio |
| async def test_snapshot_empty_scheduler(self): |
| """Test snapshot of empty scheduler.""" |
| scheduler = Scheduler() |
|
|
| requests, seen = scheduler.snapshot() |
|
|
| assert requests == [] |
| assert seen == set() |
|
|
| @pytest.mark.asyncio |
| async def test_snapshot_captures_pending_requests(self): |
| """Test snapshot captures all pending requests.""" |
| scheduler = Scheduler() |
|
|
| await scheduler.enqueue(Request("https://example.com/1", priority=5)) |
| await scheduler.enqueue(Request("https://example.com/2", priority=10)) |
| await scheduler.enqueue(Request("https://example.com/3", priority=1)) |
|
|
| requests, seen = scheduler.snapshot() |
|
|
| assert len(requests) == 3 |
| |
| assert requests[0].url == "https://example.com/2" |
| assert requests[1].url == "https://example.com/1" |
| assert requests[2].url == "https://example.com/3" |
|
|
| @pytest.mark.asyncio |
| async def test_snapshot_captures_seen_set(self): |
| """Test snapshot captures seen fingerprints.""" |
| scheduler = Scheduler() |
|
|
| await scheduler.enqueue(Request("https://example.com/1")) |
| await scheduler.enqueue(Request("https://example.com/2")) |
|
|
| requests, seen = scheduler.snapshot() |
|
|
| assert len(seen) == 2 |
| |
| for fp in seen: |
| assert isinstance(fp, bytes) |
| assert len(fp) == 20 |
|
|
| @pytest.mark.asyncio |
| async def test_snapshot_returns_copies(self): |
| """Test that snapshot returns copies, not references.""" |
| scheduler = Scheduler() |
|
|
| await scheduler.enqueue(Request("https://example.com")) |
|
|
| requests, seen = scheduler.snapshot() |
|
|
| |
| requests.append(Request("https://modified.com")) |
| seen.add(b"new_fingerprint_bytes") |
|
|
| original_requests, original_seen = scheduler.snapshot() |
|
|
| assert len(original_requests) == 1 |
| assert b"new_fingerprint_bytes" not in original_seen |
|
|
| @pytest.mark.asyncio |
| async def test_snapshot_excludes_dequeued_requests(self): |
| """Test snapshot only includes pending requests.""" |
| scheduler = Scheduler() |
|
|
| await scheduler.enqueue(Request("https://example.com/1")) |
| await scheduler.enqueue(Request("https://example.com/2")) |
| await scheduler.enqueue(Request("https://example.com/3")) |
|
|
| |
| await scheduler.dequeue() |
|
|
| requests, seen = scheduler.snapshot() |
|
|
| |
| assert len(requests) == 2 |
| |
| assert len(seen) == 3 |
|
|
|
|
| class TestSchedulerRestore: |
| """Test Scheduler restore functionality from checkpoint.""" |
|
|
| @pytest.mark.asyncio |
| async def test_restore_requests(self): |
| """Test restoring requests from checkpoint data.""" |
| scheduler = Scheduler() |
|
|
| checkpoint_requests = [ |
| Request("https://example.com/1", priority=10), |
| Request("https://example.com/2", priority=5), |
| ] |
| checkpoint_seen = {b"fp1_bytes_padded!", b"fp2_bytes_padded!", b"fp3_bytes_padded!"} |
|
|
| data = CheckpointData(requests=checkpoint_requests, seen=checkpoint_seen) |
|
|
| scheduler.restore(data) |
|
|
| assert len(scheduler) == 2 |
|
|
| @pytest.mark.asyncio |
| async def test_restore_seen_set(self): |
| """Test that restore sets up seen fingerprints.""" |
| scheduler = Scheduler() |
|
|
| data = CheckpointData( |
| requests=[], |
| seen={b"fp1_bytes_here_pad", b"fp2_bytes_here_pad"}, |
| ) |
|
|
| scheduler.restore(data) |
|
|
| |
| _, seen = scheduler.snapshot() |
| assert seen == {b"fp1_bytes_here_pad", b"fp2_bytes_here_pad"} |
|
|
| @pytest.mark.asyncio |
| async def test_restore_maintains_priority_order(self): |
| """Test that restored requests maintain priority order.""" |
| scheduler = Scheduler() |
|
|
| |
| checkpoint_requests = [ |
| Request("https://example.com/high", priority=10), |
| Request("https://example.com/low", priority=1), |
| ] |
|
|
| data = CheckpointData(requests=checkpoint_requests, seen=set()) |
| scheduler.restore(data) |
|
|
| |
| first = await scheduler.dequeue() |
| assert first.url == "https://example.com/high" |
|
|
| second = await scheduler.dequeue() |
| assert second.url == "https://example.com/low" |
|
|
| @pytest.mark.asyncio |
| async def test_restore_empty_checkpoint(self): |
| """Test restoring from empty checkpoint.""" |
| scheduler = Scheduler() |
|
|
| data = CheckpointData(requests=[], seen=set()) |
| scheduler.restore(data) |
|
|
| assert len(scheduler) == 0 |
| assert scheduler.is_empty is True |
|
|
|
|
| class TestSchedulerIntegration: |
| """Integration tests for Scheduler with checkpoint roundtrip.""" |
|
|
| @pytest.mark.asyncio |
| async def test_snapshot_and_restore_roundtrip(self): |
| """Test that snapshot -> restore works correctly.""" |
| |
| original = Scheduler() |
|
|
| await original.enqueue(Request("https://example.com/1", sid="s1", priority=10)) |
| await original.enqueue(Request("https://example.com/2", sid="s1", priority=5)) |
| await original.enqueue(Request("https://example.com/3", sid="s2", priority=7)) |
|
|
| |
| requests, seen = original.snapshot() |
| data = CheckpointData(requests=requests, seen=seen) |
|
|
| |
| restored = Scheduler() |
| restored.restore(data) |
|
|
| |
| assert len(restored) == len(original) |
|
|
| |
| for _ in range(3): |
| orig_req = await original.dequeue() |
| rest_req = await restored.dequeue() |
| assert orig_req.url == rest_req.url |
| assert orig_req.priority == rest_req.priority |
|
|
| @pytest.mark.asyncio |
| async def test_partial_processing_then_checkpoint(self): |
| """Test checkpointing after partial processing.""" |
| scheduler = Scheduler() |
|
|
| |
| for i in range(5): |
| await scheduler.enqueue(Request(f"https://example.com/{i}")) |
|
|
| |
| await scheduler.dequeue() |
| await scheduler.dequeue() |
|
|
| |
| requests, seen = scheduler.snapshot() |
|
|
| assert len(requests) == 3 |
| assert len(seen) == 5 |
|
|
| @pytest.mark.asyncio |
| async def test_deduplication_after_restore(self): |
| """Test that deduplication works after restore.""" |
| scheduler = Scheduler() |
|
|
| await scheduler.enqueue(Request("https://example.com", sid="s1")) |
|
|
| requests, seen = scheduler.snapshot() |
| data = CheckpointData(requests=requests, seen=seen) |
|
|
| |
| new_scheduler = Scheduler() |
| new_scheduler.restore(data) |
|
|
| |
| result = await new_scheduler.enqueue(Request("https://example.com", sid="s1")) |
|
|
| assert result is False |
|
|