File size: 6,149 Bytes
e227110 f8f7469 31c2447 1721cbb f8f7469 e227110 f8f7469 337bce0 f8f7469 337bce0 f8f7469 31c2447 f8f7469 337bce0 e227110 f8f7469 e227110 e30685e e227110 31c2447 e227110 f8f7469 e227110 f8f7469 1721cbb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | import hashlib
from io import BytesIO
from functools import cached_property
from urllib.parse import urlparse, urlencode
import orjson
from w3lib.url import canonicalize_url
from scrapling.engines.toolbelt.custom import Response
from scrapling.core._types import Any, AsyncGenerator, Callable, Dict, Optional, Union, Tuple, TYPE_CHECKING
if TYPE_CHECKING:
from scrapling.spiders.spider import Spider
def _convert_to_bytes(value: str | bytes) -> bytes:
if isinstance(value, bytes):
return value
if not isinstance(value, str):
raise TypeError(f"Can't convert {type(value).__name__} to bytes")
return value.encode(encoding="utf-8", errors="ignore")
class Request:
def __init__(
self,
url: str,
sid: str = "",
callback: Callable[[Response], AsyncGenerator[Union[Dict[str, Any], "Request", None], None]] | None = None,
priority: int = 0,
dont_filter: bool = False,
meta: dict[str, Any] | None = None,
_retry_count: int = 0,
**kwargs: Any,
) -> None:
self.url: str = url
self.sid: str = sid
self.callback = callback
self.priority: int = priority
self.dont_filter: bool = dont_filter
self.meta: dict[str, Any] = meta if meta else {}
self._retry_count: int = _retry_count
self._session_kwargs = kwargs if kwargs else {}
self._fp: Optional[bytes] = None
def copy(self) -> "Request":
"""Create a copy of this request."""
return Request(
url=self.url,
sid=self.sid,
callback=self.callback,
priority=self.priority,
dont_filter=self.dont_filter,
meta=self.meta.copy(),
_retry_count=self._retry_count,
**self._session_kwargs,
)
@cached_property
def domain(self) -> str:
return urlparse(self.url).netloc
def update_fingerprint(
self,
include_kwargs: bool = False,
include_headers: bool = False,
keep_fragments: bool = False,
) -> bytes:
"""Generate a unique fingerprint for deduplication.
Caches the result in self._fp after first computation.
"""
if self._fp is not None:
return self._fp
post_data = self._session_kwargs.get("data", {})
body = b""
if post_data:
if isinstance(post_data, dict | list | tuple):
body = urlencode(post_data).encode()
elif isinstance(post_data, str):
body = post_data.encode()
elif isinstance(post_data, BytesIO):
body = post_data.getvalue()
elif isinstance(post_data, bytes):
body = post_data
else:
post_data = self._session_kwargs.get("json", {})
body = orjson.dumps(post_data) if post_data else b""
data: Dict[str, str | Tuple] = {
"sid": self.sid,
"body": body.hex(),
"method": self._session_kwargs.get("method", "GET"),
"url": canonicalize_url(self.url, keep_fragments=keep_fragments),
}
if include_kwargs:
kwargs = (key.lower() for key in self._session_kwargs.keys() if key.lower() not in ("data", "json"))
data["kwargs"] = "".join(set(_convert_to_bytes(key).hex() for key in kwargs))
if include_headers:
headers = self._session_kwargs.get("headers") or self._session_kwargs.get("extra_headers") or {}
processed_headers = {}
# Some header normalization
for key, value in headers.items():
processed_headers[_convert_to_bytes(key.lower()).hex()] = _convert_to_bytes(value.lower()).hex()
data["headers"] = tuple(processed_headers.items())
fp = hashlib.sha1(orjson.dumps(data, option=orjson.OPT_SORT_KEYS), usedforsecurity=False).digest()
self._fp = fp
return fp
def __repr__(self) -> str:
callback_name = getattr(self.callback, "__name__", None) or "None"
return f"<Request({self.url}) priority={self.priority} callback={callback_name}>"
def __str__(self) -> str:
return self.url
def __lt__(self, other: object) -> bool:
"""Compare requests by priority"""
if not isinstance(other, Request):
return NotImplemented
return self.priority < other.priority
def __gt__(self, other: object) -> bool:
"""Compare requests by priority"""
if not isinstance(other, Request):
return NotImplemented
return self.priority > other.priority
def __eq__(self, other: object) -> bool:
"""Requests are equal if they have the same fingerprint."""
if not isinstance(other, Request):
return NotImplemented
if self._fp is None or other._fp is None:
raise RuntimeError("Cannot compare requests before generating their fingerprints!")
return self._fp == other._fp
def __getstate__(self) -> dict[str, Any]:
"""Prepare state for pickling - store callback as name string for pickle compatibility."""
state = self.__dict__.copy()
state["_callback_name"] = getattr(self.callback, "__name__", None) if self.callback is not None else None
state["callback"] = None # Don't pickle the actual callable
return state
def __setstate__(self, state: dict[str, Any]) -> None:
"""Restore state from pickle - callback restored later via _restore_callback()."""
self._callback_name: str | None = state.pop("_callback_name", None)
self.__dict__.update(state)
def _restore_callback(self, spider: "Spider") -> None:
"""Restore callback from spider after unpickling.
:param spider: Spider instance to look up callback method on
"""
if hasattr(self, "_callback_name") and self._callback_name:
self.callback = getattr(spider, self._callback_name, None) or spider.parse
del self._callback_name
elif hasattr(self, "_callback_name"):
del self._callback_name
|