Karim shoair commited on
Commit
1721cbb
·
1 Parent(s): 3147d70

feat(spiders): Add pause/resume system for crawls

Browse files
scrapling/spiders/checkpoint.py ADDED
@@ -0,0 +1,90 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pickle
2
+ from pathlib import Path
3
+ from dataclasses import dataclass, field
4
+
5
+ import anyio
6
+ from anyio import Path as AsyncPath
7
+
8
+ from scrapling.core.utils import log
9
+ from scrapling.core._types import Set, List, Optional, TYPE_CHECKING
10
+
11
+ if TYPE_CHECKING:
12
+ from scrapling.spiders.request import Request
13
+
14
+
15
+ @dataclass
16
+ class CheckpointData:
17
+ """Container for checkpoint state."""
18
+
19
+ requests: List["Request"] = field(default_factory=list)
20
+ seen: Set[str] = field(default_factory=set)
21
+
22
+
23
+ class CheckpointManager:
24
+ """Manages saving and loading checkpoint state to/from disk."""
25
+
26
+ CHECKPOINT_FILE = "checkpoint.pkl"
27
+
28
+ def __init__(self, crawldir: str | Path | AsyncPath, interval: float = 300.0):
29
+ self.crawldir = AsyncPath(crawldir)
30
+ self._checkpoint_path = self.crawldir / self.CHECKPOINT_FILE
31
+ self.interval = interval
32
+ if not isinstance(interval, (int, float)):
33
+ raise TypeError("Checkpoints interval must be integer or float.")
34
+ else:
35
+ if interval < 0:
36
+ raise ValueError("Checkpoints interval must be greater than 0.")
37
+
38
+ async def has_checkpoint(self) -> bool:
39
+ """Check if a checkpoint exists."""
40
+ return await self._checkpoint_path.exists()
41
+
42
+ async def save(self, data: CheckpointData) -> None:
43
+ """Save checkpoint data to disk atomically."""
44
+ await self.crawldir.mkdir(parents=True, exist_ok=True)
45
+
46
+ temp_path = self._checkpoint_path.with_suffix(".tmp")
47
+
48
+ try:
49
+ serialized = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL)
50
+ async with await anyio.open_file(temp_path, "wb") as f:
51
+ await f.write(serialized)
52
+
53
+ await temp_path.rename(self._checkpoint_path)
54
+
55
+ log.info(f"Checkpoint saved: {len(data.requests)} requests, {len(data.seen)} seen URLs")
56
+ except Exception as e:
57
+ # Clean up temp file if it exists
58
+ if await temp_path.exists():
59
+ await temp_path.unlink()
60
+ log.error(f"Failed to save checkpoint: {e}")
61
+ raise
62
+
63
+ async def load(self) -> Optional[CheckpointData]:
64
+ """Load checkpoint data from disk.
65
+
66
+ Returns None if no checkpoint exists or if loading fails.
67
+ """
68
+ if not await self.has_checkpoint():
69
+ return None
70
+
71
+ try:
72
+ async with await anyio.open_file(self._checkpoint_path, "rb") as f:
73
+ content = await f.read()
74
+ data: CheckpointData = pickle.loads(content)
75
+
76
+ log.info(f"Checkpoint loaded: {len(data.requests)} requests, {len(data.seen)} seen URLs")
77
+ return data
78
+
79
+ except Exception as e:
80
+ log.error(f"Failed to load checkpoint (starting fresh): {e}")
81
+ return None
82
+
83
+ async def cleanup(self) -> None:
84
+ """Delete checkpoint file after successful completion."""
85
+ try:
86
+ if await self._checkpoint_path.exists():
87
+ await self._checkpoint_path.unlink()
88
+ log.debug("Checkpoint file cleaned up")
89
+ except Exception as e:
90
+ log.warning(f"Failed to cleanup checkpoint file: {e}")
scrapling/spiders/engine.py CHANGED
@@ -1,14 +1,18 @@
1
  import json
 
 
2
 
3
  import anyio
 
4
  from anyio import create_task_group, CapacityLimiter, create_memory_object_stream, EndOfStream
5
 
6
  from scrapling.core.utils import log
7
  from scrapling.spiders.request import Request
8
- from scrapling.spiders.result import CrawlStats, ItemList
9
  from scrapling.spiders.scheduler import Scheduler
10
  from scrapling.spiders.session import SessionManager
11
- from scrapling.core._types import Dict, TYPE_CHECKING, Any, AsyncGenerator
 
 
12
 
13
  if TYPE_CHECKING:
14
  from scrapling.spiders.spider import Spider
@@ -21,10 +25,16 @@ def _dump(obj: Dict) -> str:
21
  class CrawlerEngine:
22
  """Orchestrates the crawling process."""
23
 
24
- def __init__(self, spider: "Spider", session_manager: SessionManager, scheduler: Scheduler | None = None):
 
 
 
 
 
 
25
  self.spider = spider
26
  self.session_manager = session_manager
27
- self.scheduler = scheduler or Scheduler()
28
  self.stats = CrawlStats()
29
 
30
  self._global_limiter = CapacityLimiter(spider.concurrent_requests)
@@ -36,6 +46,12 @@ class CrawlerEngine:
36
  self._items: ItemList = ItemList()
37
  self._item_stream: Any = None
38
 
 
 
 
 
 
 
39
  def _is_domain_allowed(self, request: Request) -> bool:
40
  """Check if the request's domain is in allowed_domains."""
41
  if not self._allowed_domains:
@@ -105,7 +121,7 @@ class CrawlerEngine:
105
  processed_result = await self.spider.on_scraped_item(result)
106
  if processed_result:
107
  self.stats.items_scraped += 1
108
- log.debug(f"Scraped from {str(response)}\n{processed_result}")
109
  if self._item_stream:
110
  await self._item_stream.send(processed_result)
111
  else:
@@ -116,6 +132,8 @@ class CrawlerEngine:
116
  elif result is not None:
117
  log.error(f"Spider must return Request, dict or None, got '{type(result)}' in {request}")
118
  except Exception as e:
 
 
119
  await self.spider.on_error(request, e)
120
 
121
  async def _task_wrapper(self, request: Request) -> None:
@@ -125,25 +143,95 @@ class CrawlerEngine:
125
  finally:
126
  self._active_tasks -= 1
127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128
  async def crawl(self) -> CrawlStats:
129
  """Run the spider and return CrawlStats."""
130
  self._running = True
131
  self._items.clear()
 
 
132
  self.stats = CrawlStats(start_time=anyio.current_time())
133
 
 
 
 
 
134
  async with self.session_manager:
135
  self.stats.concurrent_requests = self.spider.concurrent_requests
136
  self.stats.concurrent_requests_per_domain = self.spider.concurrent_requests_per_domain
137
  self.stats.download_delay = self.spider.download_delay
138
- await self.spider.on_start()
139
 
140
  try:
141
- async for request in self.spider.start_requests():
142
- await self.scheduler.enqueue(request)
 
 
 
143
 
144
  # Process queue
145
  async with create_task_group() as tg:
146
  while self._running:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  if self.scheduler.is_empty:
148
  # Empty queue + no active tasks = done
149
  if self._active_tasks == 0:
@@ -161,6 +249,9 @@ class CrawlerEngine:
161
 
162
  finally:
163
  await self.spider.on_close()
 
 
 
164
 
165
  self.stats.log_levels_counter = self.spider._log_counter.get_counts()
166
  self.stats.end_time = anyio.current_time()
 
1
  import json
2
+ import pprint
3
+ from pathlib import Path
4
 
5
  import anyio
6
+ from anyio import Path as AsyncPath
7
  from anyio import create_task_group, CapacityLimiter, create_memory_object_stream, EndOfStream
8
 
9
  from scrapling.core.utils import log
10
  from scrapling.spiders.request import Request
 
11
  from scrapling.spiders.scheduler import Scheduler
12
  from scrapling.spiders.session import SessionManager
13
+ from scrapling.spiders.result import CrawlStats, ItemList
14
+ from scrapling.spiders.checkpoint import CheckpointManager, CheckpointData
15
+ from scrapling.core._types import Dict, Union, Optional, TYPE_CHECKING, Any, AsyncGenerator
16
 
17
  if TYPE_CHECKING:
18
  from scrapling.spiders.spider import Spider
 
25
  class CrawlerEngine:
26
  """Orchestrates the crawling process."""
27
 
28
+ def __init__(
29
+ self,
30
+ spider: "Spider",
31
+ session_manager: SessionManager,
32
+ crawldir: Optional[Union[str, Path, AsyncPath]] = None,
33
+ interval: float = 300.0,
34
+ ):
35
  self.spider = spider
36
  self.session_manager = session_manager
37
+ self.scheduler = Scheduler()
38
  self.stats = CrawlStats()
39
 
40
  self._global_limiter = CapacityLimiter(spider.concurrent_requests)
 
46
  self._items: ItemList = ItemList()
47
  self._item_stream: Any = None
48
 
49
+ self._checkpoint_system_enabled = bool(crawldir)
50
+ self._checkpoint_manager = CheckpointManager(crawldir or "", interval)
51
+ self._last_checkpoint_time: float = 0.0
52
+ self._pause_requested: bool = False
53
+ self.paused: bool = False
54
+
55
  def _is_domain_allowed(self, request: Request) -> bool:
56
  """Check if the request's domain is in allowed_domains."""
57
  if not self._allowed_domains:
 
121
  processed_result = await self.spider.on_scraped_item(result)
122
  if processed_result:
123
  self.stats.items_scraped += 1
124
+ log.debug(f"Scraped from {str(response)}\n{pprint.pformat(processed_result)}")
125
  if self._item_stream:
126
  await self._item_stream.send(processed_result)
127
  else:
 
132
  elif result is not None:
133
  log.error(f"Spider must return Request, dict or None, got '{type(result)}' in {request}")
134
  except Exception as e:
135
+ msg = f"Spider error processing {request}:\n {e}"
136
+ log.error(msg, exc_info=e)
137
  await self.spider.on_error(request, e)
138
 
139
  async def _task_wrapper(self, request: Request) -> None:
 
143
  finally:
144
  self._active_tasks -= 1
145
 
146
+ def request_pause(self) -> None:
147
+ """Request a graceful pause of the crawl."""
148
+ if not self._pause_requested:
149
+ self._pause_requested = True
150
+ log.info("Pause requested, waiting for in-flight requests to complete...")
151
+
152
+ async def _save_checkpoint(self) -> None:
153
+ """Save current state to checkpoint files."""
154
+ requests, seen = self.scheduler.snapshot()
155
+ data = CheckpointData(requests=requests, seen=seen)
156
+ await self._checkpoint_manager.save(data)
157
+ self._last_checkpoint_time = anyio.current_time()
158
+
159
+ def _is_checkpoint_time(self) -> bool:
160
+ """Check if it's time for the periodic checkpoint."""
161
+ if not self._checkpoint_system_enabled:
162
+ return False
163
+
164
+ if self._checkpoint_manager.interval == 0:
165
+ return False
166
+
167
+ current_time = anyio.current_time()
168
+ return (current_time - self._last_checkpoint_time) >= self._checkpoint_manager.interval
169
+
170
+ async def _restore_from_checkpoint(self) -> bool:
171
+ """Attempt to restore state from checkpoint.
172
+
173
+ Returns True if successfully restored, False otherwise.
174
+ """
175
+ if not self._checkpoint_system_enabled:
176
+ raise
177
+
178
+ data = await self._checkpoint_manager.load()
179
+ if data is None:
180
+ return False
181
+
182
+ self.scheduler.restore(data)
183
+
184
+ # Restore callbacks from spider after scheduler restore
185
+ for request in data.requests:
186
+ request._restore_callback(self.spider)
187
+
188
+ return True
189
+
190
  async def crawl(self) -> CrawlStats:
191
  """Run the spider and return CrawlStats."""
192
  self._running = True
193
  self._items.clear()
194
+ self.paused = False
195
+ self._pause_requested = False
196
  self.stats = CrawlStats(start_time=anyio.current_time())
197
 
198
+ # Check for existing checkpoint
199
+ resuming = (await self._restore_from_checkpoint()) if self._checkpoint_system_enabled else False
200
+ self._last_checkpoint_time = anyio.current_time()
201
+
202
  async with self.session_manager:
203
  self.stats.concurrent_requests = self.spider.concurrent_requests
204
  self.stats.concurrent_requests_per_domain = self.spider.concurrent_requests_per_domain
205
  self.stats.download_delay = self.spider.download_delay
206
+ await self.spider.on_start(resuming=resuming)
207
 
208
  try:
209
+ if not resuming:
210
+ async for request in self.spider.start_requests():
211
+ await self.scheduler.enqueue(request)
212
+ else:
213
+ log.info("Resuming from checkpoint, skipping start_requests()")
214
 
215
  # Process queue
216
  async with create_task_group() as tg:
217
  while self._running:
218
+ # Check for pause request
219
+ if self._checkpoint_system_enabled:
220
+ if self._pause_requested:
221
+ # Wait for active tasks to complete
222
+ if self._active_tasks == 0:
223
+ await self._save_checkpoint()
224
+ self.paused = True
225
+ self._running = False
226
+ log.info("Spider paused, checkpoint saved")
227
+ break
228
+ # Wait briefly and check again
229
+ await anyio.sleep(0.05)
230
+ continue
231
+
232
+ if self._is_checkpoint_time():
233
+ await self._save_checkpoint()
234
+
235
  if self.scheduler.is_empty:
236
  # Empty queue + no active tasks = done
237
  if self._active_tasks == 0:
 
249
 
250
  finally:
251
  await self.spider.on_close()
252
+ # Clean up checkpoint files on successful completion (not paused)
253
+ if not self.paused and self._checkpoint_system_enabled:
254
+ await self._checkpoint_manager.cleanup()
255
 
256
  self.stats.log_levels_counter = self.spider._log_counter.get_counts()
257
  self.stats.end_time = anyio.current_time()
scrapling/spiders/request.py CHANGED
@@ -1,7 +1,10 @@
1
  from urllib.parse import urlparse
2
 
3
  from scrapling.engines.toolbelt.custom import Response
4
- from scrapling.core._types import Any, AsyncGenerator, Callable, Dict, Union
 
 
 
5
 
6
 
7
  class Request:
@@ -72,3 +75,26 @@ class Request:
72
  if not isinstance(other, Request):
73
  return NotImplemented
74
  return self._fp == other._fp
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from urllib.parse import urlparse
2
 
3
  from scrapling.engines.toolbelt.custom import Response
4
+ from scrapling.core._types import Any, AsyncGenerator, Callable, Dict, Union, TYPE_CHECKING
5
+
6
+ if TYPE_CHECKING:
7
+ from scrapling.spiders.spider import Spider
8
 
9
 
10
  class Request:
 
75
  if not isinstance(other, Request):
76
  return NotImplemented
77
  return self._fp == other._fp
78
+
79
+ def __getstate__(self) -> dict[str, Any]:
80
+ """Prepare state for pickling - store callback as name string for pickle compatibility."""
81
+ state = self.__dict__.copy()
82
+ state["_callback_name"] = getattr(self.callback, "__name__", None) if self.callback is not None else None
83
+ state["callback"] = None # Don't pickle the actual callable
84
+ return state
85
+
86
+ def __setstate__(self, state: dict[str, Any]) -> None:
87
+ """Restore state from pickle - callback restored later via _restore_callback()."""
88
+ self._callback_name: str | None = state.pop("_callback_name", None)
89
+ self.__dict__.update(state)
90
+
91
+ def _restore_callback(self, spider: "Spider") -> None:
92
+ """Restore callback from spider after unpickling.
93
+
94
+ :param spider: Spider instance to look up callback method on
95
+ """
96
+ if hasattr(self, "_callback_name") and self._callback_name:
97
+ self.callback = getattr(spider, self._callback_name, None) or spider.parse
98
+ del self._callback_name
99
+ elif hasattr(self, "_callback_name"):
100
+ del self._callback_name
scrapling/spiders/result.py CHANGED
@@ -112,6 +112,12 @@ class CrawlResult:
112
 
113
  stats: CrawlStats
114
  items: ItemList
 
 
 
 
 
 
115
 
116
  def __len__(self) -> int:
117
  return len(self.items)
 
112
 
113
  stats: CrawlStats
114
  items: ItemList
115
+ paused: bool = False
116
+
117
+ @property
118
+ def completed(self) -> bool:
119
+ """True if the crawl completed normally (not paused)."""
120
+ return not self.paused
121
 
122
  def __len__(self) -> int:
123
  return len(self.items)
scrapling/spiders/scheduler.py CHANGED
@@ -3,6 +3,10 @@ from itertools import count
3
 
4
  from scrapling.core.utils import log
5
  from scrapling.spiders.request import Request
 
 
 
 
6
 
7
 
8
  class Scheduler:
@@ -17,6 +21,8 @@ class Scheduler:
17
  self._queue: asyncio.PriorityQueue[tuple[int, int, Request]] = asyncio.PriorityQueue()
18
  self._seen: set[str] = set()
19
  self._counter = count()
 
 
20
 
21
  async def enqueue(self, request: Request) -> bool:
22
  """Add a request to the queue."""
@@ -29,12 +35,16 @@ class Scheduler:
29
  self._seen.add(fingerprint)
30
 
31
  # Negative priority so higher priority = dequeued first
32
- await self._queue.put((-request.priority, next(self._counter), request))
 
 
 
33
  return True
34
 
35
  async def dequeue(self) -> Request:
36
  """Get the next request to process."""
37
- _, _, request = await self._queue.get()
 
38
  return request
39
 
40
  def __len__(self) -> int:
@@ -43,3 +53,25 @@ class Scheduler:
43
  @property
44
  def is_empty(self) -> bool:
45
  return self._queue.empty()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
 
4
  from scrapling.core.utils import log
5
  from scrapling.spiders.request import Request
6
+ from scrapling.core._types import List, Set, Tuple, TYPE_CHECKING
7
+
8
+ if TYPE_CHECKING:
9
+ from scrapling.spiders.checkpoint import CheckpointData
10
 
11
 
12
  class Scheduler:
 
21
  self._queue: asyncio.PriorityQueue[tuple[int, int, Request]] = asyncio.PriorityQueue()
22
  self._seen: set[str] = set()
23
  self._counter = count()
24
+ # Mirror dict for snapshot without draining queue
25
+ self._pending: dict[int, tuple[int, int, Request]] = {}
26
 
27
  async def enqueue(self, request: Request) -> bool:
28
  """Add a request to the queue."""
 
35
  self._seen.add(fingerprint)
36
 
37
  # Negative priority so higher priority = dequeued first
38
+ counter = next(self._counter)
39
+ item = (-request.priority, counter, request)
40
+ self._pending[counter] = item
41
+ await self._queue.put(item)
42
  return True
43
 
44
  async def dequeue(self) -> Request:
45
  """Get the next request to process."""
46
+ _, counter, request = await self._queue.get()
47
+ self._pending.pop(counter, None)
48
  return request
49
 
50
  def __len__(self) -> int:
 
53
  @property
54
  def is_empty(self) -> bool:
55
  return self._queue.empty()
56
+
57
+ def snapshot(self) -> Tuple[List[Request], Set[str]]:
58
+ """Create a snapshot of the current state for checkpoints."""
59
+ sorted_items = sorted(self._pending.values(), key=lambda x: (x[0], x[1])) # Maintain queue order
60
+ requests = [item[2] for item in sorted_items]
61
+ return requests, self._seen.copy()
62
+
63
+ def restore(self, data: "CheckpointData") -> None:
64
+ """Restore scheduler state from checkpoint data.
65
+
66
+ :param data: CheckpointData containing requests and seen set
67
+ """
68
+ self._seen = data.seen.copy()
69
+
70
+ # Restore pending requests in order (they're already sorted by priority)
71
+ for request in data.requests:
72
+ counter = next(self._counter)
73
+ item = (-request.priority, counter, request)
74
+ self._pending[counter] = item
75
+ self._queue.put_nowait(item)
76
+
77
+ log.info(f"Scheduler restored: {len(data.requests)} requests, {len(data.seen)} seen")
scrapling/spiders/spider.py CHANGED
@@ -1,15 +1,17 @@
 
1
  import logging
2
  from pathlib import Path
3
  from abc import ABC, abstractmethod
4
 
5
  import anyio
 
6
 
7
  from scrapling.spiders.request import Request
8
  from scrapling.spiders.engine import CrawlerEngine
9
  from scrapling.spiders.session import SessionManager
10
  from scrapling.core.utils import set_logger, reset_logger
11
  from scrapling.spiders.result import CrawlResult, CrawlStats
12
- from scrapling.core._types import Set, Any, Dict, Optional, TYPE_CHECKING, AsyncGenerator
13
 
14
  BLOCKED_CODES = {401, 403, 407, 429, 444, 500, 502, 503, 504}
15
  if TYPE_CHECKING:
@@ -82,7 +84,12 @@ class Spider(ABC):
82
  logging_date_format: str = "%Y-%m-%d %H:%M:%S"
83
  log_file: Optional[str] = None
84
 
85
- def __init__(self):
 
 
 
 
 
86
  if self.name is None:
87
  raise ValueError(f"{self.__class__.__name__} must have a name.")
88
 
@@ -109,8 +116,12 @@ class Spider(ABC):
109
  file_handler.setFormatter(formatter)
110
  self.logger.addHandler(file_handler)
111
 
 
 
 
 
 
112
  self._session_manager = SessionManager()
113
- self._stream_engine: CrawlerEngine | None = None
114
 
115
  try:
116
  self.configure_sessions(self._session_manager)
@@ -143,11 +154,17 @@ class Spider(ABC):
143
  async def parse(self, response: "Response") -> AsyncGenerator[Dict[str, Any] | Request | None, None]:
144
  """Default callback for processing responses"""
145
  raise NotImplementedError(f"{self.__class__.__name__} must implement parse() method")
146
- yield # Make this a generator
 
 
 
147
 
148
- async def on_start(self) -> None:
149
- """Called before crawling starts. Override for setup logic."""
150
- self.logger.debug("Starting spider")
 
 
 
151
 
152
  async def on_close(self) -> None:
153
  """Called after crawling finishes. Override for cleanup logic."""
@@ -159,7 +176,7 @@ class Spider(ABC):
159
 
160
  Override for custom error handling.
161
  """
162
- self.logger.error(error, exc_info=error)
163
 
164
  async def on_scraped_item(self, item: Dict[str, Any]) -> Dict[str, Any] | None:
165
  """A hook to be overridden by users to do some processing on scraped items, return `None` to drop the item silently."""
@@ -193,13 +210,45 @@ class Spider(ABC):
193
 
194
  manager.add("default", FetcherSession())
195
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
196
  async def __run(self) -> CrawlResult:
197
  token = set_logger(self.logger)
198
  try:
199
- engine = CrawlerEngine(self, session_manager=self._session_manager)
200
- stats = await engine.crawl()
201
- return CrawlResult(stats=stats, items=engine.items)
 
202
  finally:
 
203
  reset_logger(token)
204
  # Close any file handlers to release file resources.
205
  if self.log_file:
@@ -213,27 +262,41 @@ class Spider(ABC):
213
  This is the main entry point for running a spider.
214
  Handles async execution internally via anyio.
215
 
 
 
 
216
  :param use_uvloop: Whether to use the faster uvloop/winloop event loop implementation, if available.
217
  :param backend_options: Asyncio backend options to be used with `anyio.run`
218
  """
219
  backend_options = backend_options or {}
220
  if use_uvloop:
221
  backend_options.update({"use_uvloop": True})
222
- return anyio.run(self.__run, backend="asyncio", backend_options=backend_options)
 
 
 
 
 
 
 
 
 
223
 
224
  async def stream(self) -> AsyncGenerator[Dict[str, Any], None]:
225
  """Stream items as they're scraped. Ideal for long-running spiders or building applications on top of the spiders.
226
 
227
  Must be called from an async context. Yields items one by one as they are scraped.
228
  Access `spider.stats` during iteration for real-time statistics.
 
 
229
  """
230
  token = set_logger(self.logger)
231
  try:
232
- self._stream_engine = CrawlerEngine(self, self._session_manager)
233
- async for item in self._stream_engine:
234
  yield item
235
  finally:
236
- self._stream_engine = None
237
  reset_logger(token)
238
  if self.log_file:
239
  for handler in self.logger.handlers:
@@ -243,6 +306,6 @@ class Spider(ABC):
243
  @property
244
  def stats(self) -> CrawlStats:
245
  """Access current crawl stats (works during streaming)."""
246
- if self._stream_engine:
247
- return self._stream_engine.stats
248
  raise RuntimeError("No active crawl. Use this property inside `async for item in spider.stream():`")
 
1
+ import signal
2
  import logging
3
  from pathlib import Path
4
  from abc import ABC, abstractmethod
5
 
6
  import anyio
7
+ from anyio import Path as AsyncPath
8
 
9
  from scrapling.spiders.request import Request
10
  from scrapling.spiders.engine import CrawlerEngine
11
  from scrapling.spiders.session import SessionManager
12
  from scrapling.core.utils import set_logger, reset_logger
13
  from scrapling.spiders.result import CrawlResult, CrawlStats
14
+ from scrapling.core._types import Set, Any, Dict, Optional, Union, TYPE_CHECKING, AsyncGenerator
15
 
16
  BLOCKED_CODES = {401, 403, 407, 429, 444, 500, 502, 503, 504}
17
  if TYPE_CHECKING:
 
84
  logging_date_format: str = "%Y-%m-%d %H:%M:%S"
85
  log_file: Optional[str] = None
86
 
87
+ def __init__(self, crawldir: Optional[Union[str, Path, AsyncPath]] = None, interval: float = 300.0):
88
+ """Initialize the spider.
89
+
90
+ :param crawldir: Directory for checkpoint files. If provided, enables pause/resume.
91
+ :param interval: Seconds between periodic checkpoint saves (default 5 minutes).
92
+ """
93
  if self.name is None:
94
  raise ValueError(f"{self.__class__.__name__} must have a name.")
95
 
 
116
  file_handler.setFormatter(formatter)
117
  self.logger.addHandler(file_handler)
118
 
119
+ self.crawldir: Optional[Path] = Path(crawldir) if crawldir else None
120
+ self._interval = interval
121
+ self._engine: Optional[CrawlerEngine] = None
122
+ self._original_sigint_handler: Any = None
123
+
124
  self._session_manager = SessionManager()
 
125
 
126
  try:
127
  self.configure_sessions(self._session_manager)
 
154
  async def parse(self, response: "Response") -> AsyncGenerator[Dict[str, Any] | Request | None, None]:
155
  """Default callback for processing responses"""
156
  raise NotImplementedError(f"{self.__class__.__name__} must implement parse() method")
157
+ yield # Make this a generator for type checkers
158
+
159
+ async def on_start(self, resuming: bool = False) -> None:
160
+ """Called before crawling starts. Override for setup logic.
161
 
162
+ :param resuming: It's enabled if the spider is resuming from a checkpoint, left for the user to use.
163
+ """
164
+ if resuming:
165
+ self.logger.debug("Resuming spider from checkpoint")
166
+ else:
167
+ self.logger.debug("Starting spider")
168
 
169
  async def on_close(self) -> None:
170
  """Called after crawling finishes. Override for cleanup logic."""
 
176
 
177
  Override for custom error handling.
178
  """
179
+ pass
180
 
181
  async def on_scraped_item(self, item: Dict[str, Any]) -> Dict[str, Any] | None:
182
  """A hook to be overridden by users to do some processing on scraped items, return `None` to drop the item silently."""
 
210
 
211
  manager.add("default", FetcherSession())
212
 
213
+ def pause(self):
214
+ """Pause the crawling process if checkpoint system is enabled."""
215
+ if self._engine:
216
+ self._engine.request_pause()
217
+ else:
218
+ raise RuntimeError("Spider doesn't have active crawl to pause, no crawl engine started!")
219
+
220
+ def _setup_signal_handler(self) -> None:
221
+ """Set up SIGINT handler for graceful pause."""
222
+
223
+ def handler(_signum: int, _frame: Any) -> None:
224
+ if self._engine:
225
+ self._engine.request_pause()
226
+ else:
227
+ # No engine yet, just raise KeyboardInterrupt
228
+ raise KeyboardInterrupt
229
+
230
+ try:
231
+ self._original_sigint_handler = signal.signal(signal.SIGINT, handler)
232
+ except ValueError:
233
+ self._original_sigint_handler = None
234
+
235
+ def _restore_signal_handler(self) -> None:
236
+ """Restore original SIGINT handler."""
237
+ if self._original_sigint_handler is not None:
238
+ try:
239
+ signal.signal(signal.SIGINT, self._original_sigint_handler)
240
+ except ValueError:
241
+ pass
242
+
243
  async def __run(self) -> CrawlResult:
244
  token = set_logger(self.logger)
245
  try:
246
+ self._engine = CrawlerEngine(self, self._session_manager, self.crawldir, self._interval)
247
+ stats = await self._engine.crawl()
248
+ paused = self._engine.paused
249
+ return CrawlResult(stats=stats, items=self._engine.items, paused=paused)
250
  finally:
251
+ self._engine = None
252
  reset_logger(token)
253
  # Close any file handlers to release file resources.
254
  if self.log_file:
 
262
  This is the main entry point for running a spider.
263
  Handles async execution internally via anyio.
264
 
265
+ If crawldir is set, pressing Ctrl+C will pause the spider and save a checkpoint.
266
+ Running the spider again with the same crawldir will resume from the checkpoint.
267
+
268
  :param use_uvloop: Whether to use the faster uvloop/winloop event loop implementation, if available.
269
  :param backend_options: Asyncio backend options to be used with `anyio.run`
270
  """
271
  backend_options = backend_options or {}
272
  if use_uvloop:
273
  backend_options.update({"use_uvloop": True})
274
+
275
+ # Set up SIGINT handler for graceful pause (only if crawldir is set)
276
+ if self.crawldir:
277
+ self._setup_signal_handler()
278
+
279
+ try:
280
+ return anyio.run(self.__run, backend="asyncio", backend_options=backend_options)
281
+ finally:
282
+ if self.crawldir:
283
+ self._restore_signal_handler()
284
 
285
  async def stream(self) -> AsyncGenerator[Dict[str, Any], None]:
286
  """Stream items as they're scraped. Ideal for long-running spiders or building applications on top of the spiders.
287
 
288
  Must be called from an async context. Yields items one by one as they are scraped.
289
  Access `spider.stats` during iteration for real-time statistics.
290
+
291
+ Note: SIGINT handling for pause/resume is not available in stream mode.
292
  """
293
  token = set_logger(self.logger)
294
  try:
295
+ self._engine = CrawlerEngine(self, self._session_manager, self.crawldir, self._interval)
296
+ async for item in self._engine:
297
  yield item
298
  finally:
299
+ self._engine = None
300
  reset_logger(token)
301
  if self.log_file:
302
  for handler in self.logger.handlers:
 
306
  @property
307
  def stats(self) -> CrawlStats:
308
  """Access current crawl stats (works during streaming)."""
309
+ if self._engine:
310
+ return self._engine.stats
311
  raise RuntimeError("No active crawl. Use this property inside `async for item in spider.stream():`")