import pickle from pathlib import Path from dataclasses import dataclass, field import anyio from anyio import Path as AsyncPath from scrapling.core.utils import log from scrapling.core._types import Set, List, Optional, TYPE_CHECKING if TYPE_CHECKING: from scrapling.spiders.request import Request @dataclass class CheckpointData: """Container for checkpoint state.""" requests: List["Request"] = field(default_factory=list) seen: Set[bytes] = field(default_factory=set) class CheckpointManager: """Manages saving and loading checkpoint state to/from disk.""" CHECKPOINT_FILE = "checkpoint.pkl" def __init__(self, crawldir: str | Path | AsyncPath, interval: float = 300.0): self.crawldir = AsyncPath(crawldir) self._checkpoint_path = self.crawldir / self.CHECKPOINT_FILE self.interval = interval if not isinstance(interval, (int, float)): raise TypeError("Checkpoints interval must be integer or float.") else: if interval < 0: raise ValueError("Checkpoints interval must be equal or greater than 0.") async def has_checkpoint(self) -> bool: """Check if a checkpoint exists.""" return await self._checkpoint_path.exists() async def save(self, data: CheckpointData) -> None: """Save checkpoint data to disk atomically.""" await self.crawldir.mkdir(parents=True, exist_ok=True) temp_path = self._checkpoint_path.with_suffix(".tmp") try: serialized = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL) async with await anyio.open_file(temp_path, "wb") as f: await f.write(serialized) await temp_path.rename(self._checkpoint_path) log.info(f"Checkpoint saved: {len(data.requests)} requests, {len(data.seen)} seen URLs") except Exception as e: # Clean up temp file if it exists if await temp_path.exists(): await temp_path.unlink() log.error(f"Failed to save checkpoint: {e}") raise async def load(self) -> Optional[CheckpointData]: """Load checkpoint data from disk. Returns None if no checkpoint exists or if loading fails. """ if not await self.has_checkpoint(): return None try: async with await anyio.open_file(self._checkpoint_path, "rb") as f: content = await f.read() data: CheckpointData = pickle.loads(content) log.info(f"Checkpoint loaded: {len(data.requests)} requests, {len(data.seen)} seen URLs") return data except Exception as e: log.error(f"Failed to load checkpoint (starting fresh): {e}") return None async def cleanup(self) -> None: """Delete checkpoint file after successful completion.""" try: if await self._checkpoint_path.exists(): await self._checkpoint_path.unlink() log.debug("Checkpoint file cleaned up") except Exception as e: log.warning(f"Failed to cleanup checkpoint file: {e}")